# Module 5: Batch ingestion via SageMaker Processing job (Sklearn)

---
**Note:** Please set kernel to `Python 3 (Data Science)` and select instance to `ml.t3.medium`

## Contents

1. [Setup](#Setup)
1. [Create Sklearn SageMaker Processing script](#Create-Sklearn-SageMaker-Processing-script)
1. [Run batch ingestion processing job](#Run-batch-ingestion-processing-job)
1. [Verify processing job results](#Verify-processing-job-results)

# Setup

#### Imports

In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker import get_execution_role
from random import randint
import sagemaker
import logging
import json

In [None]:
logger = logging.getLogger('__name__')
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())

In [None]:
logger.info(f'Using SageMaker version: {sagemaker.__version__}')

#### Essentials

In [None]:
role = get_execution_role()
logger.info(f'Role = {role}')
sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name
featurestore_runtime_client = sagemaker_session.boto_session.client('sagemaker-featurestore-runtime', region_name=region)
default_bucket = sagemaker_session.default_bucket()
logger.info(f'Default bucket = {default_bucket}')
prefix = 'sagemaker-feature-store'

# Create Sklearn SageMaker Processing script

In [None]:
%%writefile ./scripts/batch_ingest_sm_sklearn.py
import subprocess
import sys
subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'sagemaker'])

from sagemaker.feature_store.feature_group import FeatureGroup
from sklearn.preprocessing import MinMaxScaler, LabelEncoder
from datetime import datetime, timezone, date
import pandas as pd
import sagemaker
import argparse
import logging
import time
import os


sagemaker_session = sagemaker.Session()

logger = logging.getLogger('__name__')
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())


label_encoder = LabelEncoder()
min_max_scaler = MinMaxScaler()
n_cores = os.cpu_count()


def get_file_paths(directory):
 file_paths = [] 
 for root, directories, files in os.walk(directory):
 for file_name in files:
 if file_name.endswith('.csv'):
 file_path = os.path.join(root, file_name)
 file_paths.append(file_path) 
 return file_paths


def get_delta_in_days(date_time) -> int:
 today = date.today()
 delta = today - date_time.date()
 return delta.days


def apply_transforms(df: pd.DataFrame) -> pd.DataFrame:
 df['is_reordered'] = df['is_reordered'].astype(int)
 df['purchased_on'] = pd.to_datetime(df['purchased_on'], format='%Y-%m-%d %H:%M:%S')
 df['n_days_since_last_purchase'] = df['purchased_on'].apply(lambda x: get_delta_in_days(x))
 df['n_days_since_last_purchase'] = min_max_scaler.fit_transform(df[['n_days_since_last_purchase']])
 df.drop('purchased_on', axis=1, inplace=True)
 return df


def ingest_data(args: argparse.Namespace) -> None:
 files = get_file_paths('/opt/ml/processing/input/')
 logger.info(f'Files: {files}')
 df = pd.concat([pd.read_csv(file) for file in files], ignore_index=True)
 df = apply_transforms(df)
 logger.info(f'Ingesting a total of [{df.shape[0]}] rows from {len(files)} files')
 logger.info(f'Ingesting into feature group [{args.feature_group_name}] using {args.num_processes} processes and {args.num_workers} workers')
 fg = FeatureGroup(name=args.feature_group_name, sagemaker_session=sagemaker_session)
 response = fg.ingest(data_frame=df, max_processes=args.num_processes, max_workers=args.num_workers, wait=True)
 """
 The ingest call above returns an IngestionManagerPandas instance as a response. Zero based indices of rows 
 that failed to be ingested are captured via failed_rows in this response. By asserting this count to be 0,
 we validated that all rows were successfully ingested without a failure.
 """
 assert len(response.failed_rows) == 0
 
 
def parse_args() -> None:
 parser = argparse.ArgumentParser()
 # if num_processes is not set, we set it to the number of vCPUs by default
 parser.add_argument('--num_processes', type=int, default=n_cores)
 # if num_workers is not set, we default it to 4 i.e., 4 threads per python process
 parser.add_argument('--num_workers', type=int, default=4)
 parser.add_argument('--feature_group_name', type=str)
 args, _ = parser.parse_known_args()
 return args


if __name__ == '__main__':
 logger.info('BATCH INGESTION - STARTED')
 args = parse_args()
 ingest_data(args)
 logger.info('BATCH INGESTION - COMPLETED')

# Run batch ingestion processing job

In [None]:
%store -r orders_feature_group_name

s3_uri_prefix = f's3://{default_bucket}/{prefix}/partitions/'
# REUSE orders feature group name from module 1
feature_group_name = orders_feature_group_name

In [None]:
%%time

sklearn_processor = SKLearnProcessor(framework_version='0.20.0',
 role=role,
 instance_type='ml.m5.xlarge',
 instance_count=2,
 base_job_name='fscw-sm-processing-sklearn-fs-ingestion', 
 env={'AWS_DEFAULT_REGION': region})

"""
Note: It is recommended to set the num_processes argument below to the total number of cores (vCPUs)in your 
processing node and set the num_workers to 4. num_workers here denotes number of threads per python process.
In this example, since we are using instance_type=m1.m5.xlarge (set above) for our processing node, we have 
set num_processes=4 (an m5.xlarge instance has 4 cores) and num_workers=4 below.
"""

sklearn_processor.run(code='./scripts/batch_ingest_sm_sklearn.py', 
 arguments = ['--num_processes', '4', 
 '--num_workers', '4',
 '--feature_group_name', feature_group_name],
 inputs=[ProcessingInput(s3_data_type='S3Prefix', 
 source=s3_uri_prefix, 
 s3_data_distribution_type='ShardedByS3Key', 
 destination='/opt/ml/processing/input')],
 logs=False) # set logs=True to enable logging

# Verify processing job results

In [None]:
order_id = f'O{randint(1, 100000)}'
logger.info(f'order_id={order_id}') 

feature_record = featurestore_runtime_client.get_record(FeatureGroupName=feature_group_name, 
 RecordIdentifierValueAsString=order_id)
print(json.dumps(feature_record, indent=2))