# Module 5: Online ingestion via SageMaker Processing job (PySpark)

---

**Note:** Please set kernel to `Python 3 (Data Science)` and select instance to `ml.t3.medium`

## Contents

1. [Setup](#Setup)
1. [Create PySpark SageMaker Processing script](#Create-PySpark-SageMaker-Processing-script)
1. [Run batch ingestion job](#Run-batch-ingestion-job)
1. [Verify processing job results](#Verify-processing-job-results)

This notebook uses the SageMaker Processing Job to create a scalable and repeatable ingestion into the feature store. It utilizes the `.put_record()` api of the feature store contrary to [notebook 4 - m5_nb4_sm_processing_pyspark_offline_batch](./m5_nb4_sm_processing_pyspark_offline_batch.ipynb), which uses the SageMaker Feature Store Manager to perform a spark batch ingestion. The SageMaker Feature Store Manager needs extra packages, which can be avoided using the method presented in this section. 

# Setup

#### Imports 

In [None]:
from sagemaker.spark.processing import PySparkProcessor
from sagemaker import get_execution_role
from random import randint
import sagemaker
import logging
import boto3
import json

In [None]:
logger = logging.getLogger('__name__')
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())

In [None]:
logger.info(f'Using SageMaker version: {sagemaker.__version__}')

#### Essentials

In [None]:
role = get_execution_role()
logger.info(f'Role = {role}')
sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name
featurestore_runtime_client = sagemaker_session.boto_session.client('sagemaker-featurestore-runtime', region_name=region)
default_bucket = sagemaker_session.default_bucket()
logger.info(f'Default bucket = {default_bucket}')
prefix = 'sagemaker-feature-store'

# Create PySpark SageMaker Processing script

In [None]:
%%writefile ./scripts/batch_ingest_sm_pyspark.py
from pyspark.ml.feature import VectorAssembler, StringIndexer, MinMaxScaler
from pyspark.sql.functions import udf, datediff, to_date, lit
from pyspark.sql.types import IntegerType, DoubleType
from pyspark.sql import SparkSession, DataFrame
from argparse import Namespace, ArgumentParser
from pyspark.ml.linalg import Vector
from pyspark.ml import Pipeline
from datetime import datetime
import argparse
import logging
import boto3
import time
import os


logger = logging.getLogger('__name__')
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())


def transform_row(row) -> list:
 columns = list(row.asDict())
 record = []
 for column in columns:
 feature = {'FeatureName': column, 'ValueAsString': str(row[column])}
 record.append(feature)
 return record


def ingest_to_feature_store(args: argparse.Namespace, rows) -> None:
 feature_group_name = args.feature_group_name
 session = boto3.session.Session()
 featurestore_runtime_client = session.client(service_name='sagemaker-featurestore-runtime')
 rows = list(rows)
 logger.info(f'Ingesting {len(rows)} rows into feature group: {feature_group_name}')
 for _, row in enumerate(rows):
 record = transform_row(row)
 response = featurestore_runtime_client.put_record(FeatureGroupName=feature_group_name, Record=record)
 assert response['ResponseMetadata']['HTTPStatusCode'] == 200


def parse_args() -> None:
 parser = argparse.ArgumentParser()
 parser.add_argument('--num_processes', type=int, default=1)
 parser.add_argument('--num_workers', type=int, default=1)
 parser.add_argument('--feature_group_name', type=str)
 parser.add_argument("--s3_uri_prefix", type=str)
 args, _ = parser.parse_known_args()
 return args


def scale_col(df: DataFrame, col_name: str) -> DataFrame:
 unlist = udf(lambda x: round(float(list(x)[0]), 2), DoubleType())
 assembler = VectorAssembler(inputCols=[col_name], outputCol=f'{col_name}_vec')
 scaler = MinMaxScaler(inputCol=f'{col_name}_vec', outputCol=f'{col_name}_scaled')
 pipeline = Pipeline(stages=[assembler, scaler])
 df = pipeline.fit(df).transform(df).withColumn(f'{col_name}_scaled', unlist(f'{col_name}_scaled')) \
 .drop(f'{col_name}_vec')
 df = df.drop(col_name)
 df = df.withColumnRenamed(f'{col_name}_scaled', col_name)
 return df


def ordinal_encode_col(df: DataFrame, col_name: str) -> DataFrame:
 indexer = StringIndexer(inputCol=col_name, outputCol=f'{col_name}_new')
 df = indexer.fit(df).transform(df)
 df = df.drop(col_name)
 df = df.withColumnRenamed(f'{col_name}_new', col_name)
 return df


def run_spark_job():
 args = parse_args()
 spark_session = SparkSession.builder.appName('PySparkJob').getOrCreate()
 spark_context = spark_session.sparkContext
 total_cores = int(spark_context._conf.get('spark.executor.instances')) * int(spark_context._conf.get('spark.executor.cores'))
 logger.info(f'Total available cores in the Spark cluster = {total_cores}')
 logger.info('Reading input file from S3')
 df = spark_session.read.options(Header=True).csv(args.s3_uri_prefix)
 
 # transform raw features 
 
 # transform 1 - encode boolean to int
 df = ordinal_encode_col(df, 'is_reordered')
 df = df.withColumn('is_reordered', df['is_reordered'].cast(IntegerType()))

 # transform 2 - min max scale `purchase_amount`
 df = df.withColumn('purchase_amount', df['purchase_amount'].cast(DoubleType()))
 df = scale_col(df, 'purchase_amount')
 
 # transform 3 - derive `n_days_since_last_purchase` column using the `purchased_on` col
 current_date = datetime.today().strftime('%Y-%m-%d')
 df = df.withColumn('n_days_since_last_purchase', datediff(to_date(lit(current_date)), to_date('purchased_on', 'yyyy-MM-dd')))
 df = df.drop('purchased_on')
 df = scale_col(df, 'n_days_since_last_purchase')
 df.show(5)
 
 logger.info(f'Number of partitions = {df.rdd.getNumPartitions()}')
 # Rule of thumb heuristic - rely on the product of #executors by #executor.cores, and then multiply that by 3 or 4
 df = df.repartition(total_cores * 3)
 logger.info(f'Number of partitions after re-partitioning = {df.rdd.getNumPartitions()}')
 logger.info(f'Feature Store ingestion start: {datetime.now().strftime("%m/%d/%Y, %H:%M:%S")}')
 df.foreachPartition(lambda rows: ingest_to_feature_store(args, rows))
 logger.info(f'Feature Store ingestion complete: {datetime.now().strftime("%m/%d/%Y, %H:%M:%S")}')


if __name__ == '__main__':
 logger.info('BATCH INGESTION - STARTED')
 run_spark_job()
 logger.info('BATCH INGESTION - COMPLETED')

# Run batch ingestion job

In [None]:
%store -r orders_feature_group_name

s3_uri_prefix = f's3://{default_bucket}/{prefix}/partitions/*'
# REUSE orders feature group name from module 1
feature_group_name = orders_feature_group_name 

In [None]:
pyspark_processor = PySparkProcessor(framework_version='2.4', # Spark version
 role=role,
 instance_type='ml.m5.xlarge',
 instance_count=2,
 base_job_name='sm-processing-pyspark-fs-ingestion',
 env={'AWS_DEFAULT_REGION': boto3.Session().region_name,
 'mode': 'python'},
 max_runtime_in_seconds=3600)

In [None]:
%%time

pyspark_processor.run(submit_app='./scripts/batch_ingest_sm_pyspark.py', 
 arguments = ['--feature_group_name', feature_group_name, 
 '--s3_uri_prefix', s3_uri_prefix], 
 spark_event_logs_s3_uri=f's3://{default_bucket}/spark-logs', 
 logs=False) # set logs=True to enable logging

# Verify processing job results

In [None]:
order_id = f'O{randint(1, 100000)}'
logger.info(f'order_id={order_id}') 

feature_record = featurestore_runtime_client.get_record(FeatureGroupName=feature_group_name, 
 RecordIdentifierValueAsString=order_id)
print(json.dumps(feature_record, indent=2))