# Module 5: Offline Batch ingestion via SageMaker Processing job using Feature Store Spark Connector

---

**Note:** Please set kernel to `Python 3 (Data Science)` and select instance to `ml.t3.medium`

Please be aware, that you need to run through the notebook [m5_nb0_partition_data](https://github.com/aws-samples/amazon-sagemaker-feature-store-end-to-end-workshop/blob/main/05-module-scalable-batch-ingestion/m5_nb0_partition_data.ipynb) in this section of the workshop, to setup the needed data. 

## Contents

1. [Setup](#Setup)
1. [Create PySpark SageMaker Processing script](#Create-PySpark-SageMaker-Processing-script)
1. [Run batch ingestion job](#Run-batch-ingestion-job)
1. [Verify processing job results](#Verify-processing-job-results)



In this example, an alternative route through the batch ingestion via PySpark Processing containers will be explored to ingest data direclty into the Offline Store. We will use the `.ingest_data()` api instead of the `.put_record()` api. 

This notebook will display how to use the the [Sagemaker Feature Store Manager](https://pypi.org/project/sagemaker-feature-store-pyspark/). 


To achieve this, the package [Sagemaker Feature Store PySpark](https://pypi.org/project/sagemaker-feature-store-pyspark/) is needed. If you want to use other means of Spark, please see the [Documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/batch-ingestion-spark-connector-setup.html) for further guidance.


# Setup

#### Imports 

In [None]:
from sagemaker.spark.processing import PySparkProcessor
from sagemaker import get_execution_role
from random import randint
import sagemaker
import logging
import boto3
import json

In [None]:
logging.basicConfig(format='SMFS BATCH INGEST PYSPARK - %(levelname)s - %(message)s')

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)



In [None]:
logger.info(f'Using SageMaker version: {sagemaker.__version__}')

### Essentials

In [None]:
try:
 role = get_execution_role()
except:
 # for local dev, please set your sagemaker role here
 role = #'arn:aws:iam::XXXXXXXX:role/service-role/role-name'
logger.info(f'Role = {role}')
sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name
featurestore_runtime_client = sagemaker_session.boto_session.client('sagemaker-featurestore-runtime', region_name=region)
default_bucket = sagemaker_session.default_bucket()
logger.info(f'Default bucket = {default_bucket}')
prefix = 'sagemaker-feature-store'

spark_version = '3.1'

### Getting the packages needed
To use the feature store manager directly, we only need two things. 
- the right `jar` that we can get from [maven](https://mvnrepository.com/artifact/software.amazon.sagemaker.featurestore/sagemaker-feature-store-spark-sdk) or from installing of the `sagemaker-feature-store-pyspark` packages. 
- the right python packages that we can get from [PyPi](https://pypi.org/project/sagemaker-feature-store-pyspark/). From PyPi You can download the latest package and copy the `feature-store-manager.py` and `wrapper.py` to your local file system. This has already been done for you in the folder `feature-store-pyspark`. For the packages, please look into the feature_store_pyspark folder. We will need to provide both to the pyspark processor. 


You can either get the jars for needed directly from [maven](https://mvnrepository.com/artifact/software.amazon.sagemaker.featurestore/sagemaker-feature-store-spark-sdk), or simply install the feature-store-manager via pip, which will get us the jars without the hassle of copying them. 

In [None]:
# Install a pip package in the current Jupyter kernel
import sys
!{sys.executable} -m pip install sagemaker-feature-store-pyspark-{spark_version}
!{sys.executable} -m pip install pyathena

In [None]:
jar_path = !feature-store-pyspark-dependency-jars
jar_path = jar_path[0]
jar_path

Now that we have all the needed packages, the PySpark processing script can be created. 
Not only can we ingest into the feature store direclty, but combine it with data cleaning and feature engineering. 

 ## Create PySpark Processing script
The following script
- Creates a data schema for the spark dataframe
- Ordinally encodes a column
- Scales the data 
- Transforms the column `purchased_on` to the ML ready feature `n_days_since_last_purchase`
- Repartitions the data 
- Ingests into the offline feature store

If you are only interested in the batch ingestion, please take a look at the `batch_ingest_to_feature_store` method.


In [None]:
%%writefile ./scripts/batch_ingest_sm_pyspark.py
from pyspark.ml.feature import VectorAssembler, StringIndexer, MinMaxScaler
from feature_store_manager import FeatureStoreManager
from pyspark.sql.functions import udf, datediff, to_date, lit, col,isnan, when, count
from pyspark.sql.types import IntegerType, DoubleType, StructType, StructField, StringType, FloatType
from pyspark.sql import SparkSession, DataFrame
from argparse import Namespace, ArgumentParser
from pyspark.ml.linalg import Vector
from pyspark.ml import Pipeline
from datetime import datetime
import argparse
import ast
import logging
import boto3
import time
import os


logger = logging.getLogger('__name__')
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())


def parse_args() -> None:
 parser = argparse.ArgumentParser()
 parser.add_argument('--num_processes', type=int, default=1)
 parser.add_argument('--num_workers', type=int, default=1)
 parser.add_argument('--feature_group_name', type=str)
 parser.add_argument('--feature_group_arn', type=str)
 parser.add_argument('--target_feature_store_list', type=str)
 parser.add_argument('--s3_uri_prefix', type=str)
 
 args, _ = parser.parse_known_args()
 return args

def transform_row(row) -> list:
 columns = list(row.asDict())
 record = []
 for column in columns:
 feature = {'FeatureName': column, 'ValueAsString': str(row[column])}
 record.append(feature)
 return record

def batch_ingest_to_feature_store(args: argparse.Namespace, df: DataFrame) -> None:
 feature_group_name = args.feature_group_name
 logger.info(f'Feature Group name supplied is: {feature_group_name}')
 session = boto3.session.Session()

 logger.info(f'Instantiating FeatureStoreManger!')
 feature_store_manager=FeatureStoreManager()

 logger.info(f'trying to load datatypes directly from Dataframe')

 # Load the feature definitions from input schema. The feature definitions can be used to create a feature group
 feature_definitions = feature_store_manager.load_feature_definitions_from_schema(df)
 logger.info(f'Feature definitions loaded successfully!')
 print(feature_definitions)
 feature_group_arn = args.feature_group_arn
 logger.info(f'Feature Group ARN supplied is: {feature_group_arn}')

 # If only OfflineStore is selected, the connector will batch write the data to offline store directly
 args.target_feature_store_list = ast.literal_eval(args.target_feature_store_list)
 logger.info(f'Ingesting into the following stores: {args.target_feature_store_list}')

 feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores= args.target_feature_store_list) 
 logger.info(f'Feature Ingestions successful!')

def scale_col(df: DataFrame, col_name: str) -> DataFrame:
 unlist = udf(lambda x: round(float(list(x)[0]), 2), DoubleType())
 assembler = VectorAssembler(inputCols=[col_name], outputCol=f'{col_name}_vec')
 # scale an column col_name with minmax scaler and drop the original column

 scaler = MinMaxScaler(inputCol=f'{col_name}_vec', outputCol=f'{col_name}_scaled')
 pipeline = Pipeline(stages=[assembler, scaler])
 df = pipeline.fit(df).transform(df).withColumn(f'{col_name}_scaled', unlist(f'{col_name}_scaled')) \
 .drop(f'{col_name}_vec')
 df = df.drop(col_name)
 df = df.withColumnRenamed(f'{col_name}_scaled', col_name)
 return df

def ordinal_encode_col(df: DataFrame, col_name: str) -> DataFrame:
 indexer = StringIndexer(inputCol=col_name, outputCol=f'{col_name}_new')
 df = indexer.fit(df).transform(df)
 df = df.drop(col_name)
 df = df.withColumnRenamed(f'{col_name}_new', col_name)
 return df

def run_spark_job():

 args = parse_args()
 
 spark = SparkSession.builder.getOrCreate()
 
 # set the legacy time parser policy to LEGACY to allow for parsing of dates in the format dd/MM/yyyy HH:mm:ss, which solves backwards compatibility issues to spark 2.4
 spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

 logger.info(f'Using Spark-Version:{spark.version}')

 # get the total number of cores in the Spark cluster; if developing locally, there might be no executor
 try:
 spark_context = spark.sparkContext
 total_cores = int(spark_context._conf.get('spark.executor.instances')) * int(spark_context._conf.get('spark.executor.cores'))
 logger.info(f'Total available cores in the Spark cluster = {total_cores}')
 except:
 total_cores = 1
 logger.error(f'Could not retrieve number of total cores. Setting total cores to 1. Error message: {str(e)}')
 
 logger.info(f'Reading input file from S3. S3 uri is {args.s3_uri_prefix}')

 # define the schema of the input data
 csvSchema = StructType([
 StructField("order_id", StringType(), True),
 StructField("customer_id", StringType(), False),
 StructField("product_id", StringType(), False),
 StructField("purchase_amount", FloatType(), False),
 StructField("is_reordered", IntegerType(), False),
 StructField("purchased_on", StringType(), False),
 StructField("event_time", StringType(), False)])

 # read the pyspark dataframe with a schema 
 df = spark.read.option("header", "true").schema(csvSchema).csv(args.s3_uri_prefix) 

 # transform 1 - encode boolean to int
 df = ordinal_encode_col(df, 'is_reordered')
 df = df.withColumn('is_reordered', df['is_reordered'].cast(IntegerType()))

 # transform 2 - min max scale `purchase_amount`
 df = df.withColumn('purchase_amount', df['purchase_amount'].cast(DoubleType()))
 df = scale_col(df, 'purchase_amount')
 
 # transform 3 - derive `n_days_since_last_purchase` column using the `purchased_on` col
 current_date = datetime.today().strftime('%Y-%m-%d')
 df = df.withColumn('n_days_since_last_purchase', datediff(to_date(lit(current_date)), to_date('purchased_on', 'yyyy-MM-dd')))
 df = df.drop('purchased_on')
 df = scale_col(df, 'n_days_since_last_purchase')
 
 
 logger.info(f'Number of partitions = {df.rdd.getNumPartitions()}')
 # Rule of thumb heuristic - rely on the product of #executors by #executor.cores, and then multiply that by 3 or 4
 df = df.repartition(total_cores * 3)
 logger.info(f'Number of partitions after re-partitioning = {df.rdd.getNumPartitions()}')
 logger.info(f'Feature Store ingestion start: {datetime.now().strftime("%m/%d/%Y, %H:%M:%S")}')
 batch_ingest_to_feature_store(args, df)
 logger.info(f'Feature Store ingestion complete: {datetime.now().strftime("%m/%d/%Y, %H:%M:%S")}')

if __name__ == '__main__':
 logger.info('BATCH INGESTION - STARTED')
 run_spark_job()
 logger.info('BATCH INGESTION - COMPLETED')


Next, check that our feature group names, feature definition and our arguments are in line with our expectation.

In [None]:
%store -r orders_feature_group_name
s3_uri_prefix = f's3://{default_bucket}/{prefix}/partitions/*'

# REUSE orders feature group name from module 1
try:
 feature_group_name = orders_feature_group_name 
 logger.info(f"{feature_group_name} is used for the feature group name.")
except NameError:
 logger.info(f"Feature group name could not be retrieved, please specify manually.")

In [None]:
# Specify the feature group name manually 
feature_group_name = "fscw-orders-01-19-17-06"

In [None]:
sm_client=boto3.client('sagemaker')

feature_group_description = sm_client.describe_feature_group(FeatureGroupName=feature_group_name)
feature_group_arn = feature_group_description['FeatureGroupArn']

# please specify what target stores you want to ingest into -> PySpark does not accept list as a parameter
target_feature_store_list = "['OfflineStore']" # ['OfflineStore', 'OnlineStore'] for both

feature_group_name, feature_group_arn, target_feature_store_list, s3_uri_prefix

In [None]:
feature_group_description["FeatureDefinitions"]

# Run batch ingestion job
You can check what image URI for the PySpark Processor is the right one for your region via [aws github](https://github.com/aws/sagemaker-spark-container/releases).

If so, please specify the image URI via the `image_uri` attribute when instantiating the PySparkProcessor. 

Alternatively, you can specify the framework version that you would like to use via the `framework_version` attribute. 


In [None]:
from sagemaker.spark.processing import PySparkProcessor

pyspark_processor = PySparkProcessor(
 base_job_name="spark-preprocessor",
 # image_uri = image_uri,
 framework_version=spark_version,
 role=role,
 instance_count=1,
 instance_type="ml.m5.xlarge",
 max_runtime_in_seconds=1200,
)

pyspark_processor.run(submit_app='./scripts/batch_ingest_sm_pyspark.py', 
 arguments = ['--feature_group_name', feature_group_name, 
 '--s3_uri_prefix', s3_uri_prefix,
 '--feature_group_arn', feature_group_arn,
 '--target_feature_store_list', target_feature_store_list],
 submit_jars=[jar_path],
 submit_py_files=[
 './feature_store_pyspark/feature_store_manager.py',
 './feature_store_pyspark/wrapper.py'
 ],
 spark_event_logs_s3_uri=f's3://{default_bucket}/spark-logs', 
 logs=False,
 wait=True) # set logs=True to disable logging

# Verify processing job results

To verify the processing results, we are going to fetch an ingested order, by its `order_id` that we created. The same methodology can be used to get any record from the feature store. 

In [None]:
order_id = f'O{randint(1, 100000)}'
logger.info(f'order_id={order_id}') 
print(feature_group_name)
feature_record = featurestore_runtime_client.get_record(FeatureGroupName=feature_group_name, 
 RecordIdentifierValueAsString=order_id)
print(json.dumps(feature_record, indent=2))

Alternatively, we can use pyathena to comfortably query the underlying table for the latest inserted orders. 

In [None]:
import pyathena as pa
import pandas as pd

# getting the latest fetaure group description
feature_group_description = sm_client.describe_feature_group(FeatureGroupName=feature_group_name)

# Opening a connection to Athena
conn = pa.connect(s3_staging_dir=f's3://{default_bucket}/athena-staging',
 region_name=region)

# Getting the table name from the feature group description
table_name = feature_group_description['OfflineStoreConfig']['DataCatalogConfig']['TableName']

# Querying the table
query = f"""SELECT * FROM \"sagemaker_featurestore\".\"{table_name}\" 
 ORDER BY "write_time" DESC
 LIMIT 1000;"""

df = pd.read_sql(query, conn)
df