import argparse import csv import os import shutil import sys import time import boto3 import pandas as pd from decimal import Decimal # Import pyspark and build Spark session from pyspark import SparkContext, SparkConf from pyspark.sql import SparkSession, SQLContext from feature_store_manager import FeatureStoreManager def write_to_s3(df, output_path): # Write processed data after transformations to S3... processed_features_output_path = output_path + 'processed_features.csv' print("Saving processed features to {}".format(processed_features_output_path)) df.write.csv(processed_features_output_path) def ingest_to_feature_store(df, feature_group): # Ingest data to Feature Store using the Feature Store Manager... print(f"Ingesting processed features into Feature Group {feature_group}...") feature_store_manager = FeatureStoreManager() feature_store_manager.ingest_data( input_data_frame=df, feature_group_arn=f'{feature_group}', target_stores=['OfflineStore', 'OnlineStore'] ) def main(): parser = argparse.ArgumentParser(description="app inputs and outputs") parser.add_argument("--region", type=str, help="AWS region") parser.add_argument("--table", type=str, help="Delta Table URL") parser.add_argument("--feature-group", type=str, help="Name of the Feature Group") parser.add_argument("--output-path", type=str, help="S3 prefix for storing resulting dataset") args = parser.parse_args() # Instantiate Spark via builder # Note: we use the `ContainerCredentialsProvider` to give us access to underlying IAM role permissions spark = (SparkSession .builder .appName("PySparkApp") .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .config("fs.s3a.aws.credentials.provider",'com.amazonaws.auth.ContainerCredentialsProvider') .getOrCreate()) sc = spark.sparkContext print('Spark version: '+str(sc.version)) s3a_delta_table_uri=args.table print(s3a_delta_table_uri) # Create SQL command inserting the S3 path location sql_cmd = f'SELECT * FROM delta.`{s3a_delta_table_uri}` ORDER BY timestamp' print(f'SQL command: {sql_cmd}') # Execute SQL command which returns dataframe sql_results = spark.sql(sql_cmd) print(type(sql_results)) # ---------------- # Transformations - Pandas code generated by sagemaker_datawrangler: processed_features = sql_results.toPandas().copy(deep=True) # Code to Replace with new value for column: userID to resolve warning: Disguised missing values generic_value = 'Other' processed_features['userID']=processed_features['userID'].replace('na', 'Other', regex=False) processed_features['userID']=processed_features['userID'].replace('nA', 'Other', regex=False) # Code to Drop column for column: ratingID to resolve warning: ID column processed_features=processed_features.drop(columns=['ratingID']) # Complete with EventTime feature processed_features['EventTime']=str(pd.to_datetime('now').strftime('%Y-%m-%dT%H:%M:%SZ')) print(processed_features.head()) # Capture resulting data frame in Spark: columns = ['rowID', 'timestamp', 'userID', 'placeID', 'rating_overall', 'rating_food', 'rating_service', 'EventTime'] df = spark.createDataFrame(processed_features).toDF(*columns) # ---------------- # Write processed data after transformations to S3... write_to_s3(df, args.output_path) # Ingesting the resulting data into our Feature Group... ingest_to_feature_store(df, args.feature_group) print("All done.") if __name__ == "__main__": main()