# Amazon SageMaker PreProcessing, Inferencing & PostProcessing with Your custom Machine Learning Code

This Notebook demonstrates how to rapidly migrate your local custom Machine Learning code to AWS Cloud. 
With this example we illustrate how a custom`preprocess.py`, custom`predict.py`and custom`postprocess.py`scripts can be migrated to AWS without any changes to your code-base. With this approach you can rapidly embed your ML code into AWS ecosystem and take advantage of agility and scale offered by AWS SageMaker.

*Please choose Python 3 (Data Science) kernel to proceed.*

## Setup
To build Docker images inside SageMaker Studio we'll need *sm-docker* library which is part of **sagemaker_studio-image_build**. Let's get that package into our Notebook instance.

In [None]:
#Install sagemaker_studio_image_build utility
import sys

!{sys.executable} -m pip install sagemaker_studio_image_build

## Review Dockerfile
Docker is a program that performs operating system-level virtualization for installing, distributing, and managing software. It packages applications and their dependencies into virtual containers that provide isolation, portability, and security. With Docker, you can ship code faster, standardize application operations, seamlessly move code, and economize by improving resource utilization.

For demonstration of this example **Dockerfile**, we will use python-slim base containers. We add the code that implements our specific inference code to the container and set up the right environment to run under using **requirements.txt**.

In [None]:
!cd InferenceContainer
!cat Dockerfile

## Perform Docker image build using **sm-docker** abstraction layer.

The **sm-docker** command workflow contains the following steps:

1. The CLI automatically zips the directory containing your Dockerfile, generates the buildspec for AWS CodeBuild, and adds the .zip package the final .zip file. By default, the final .zip package is put in the Amazon SageMaker default session S3 bucket. Alternatively, you can specify a custom bucket using the --bucket argument.
2. After packaging your files for build, the CLI creates an ECR repository if one doesn’t exist. By default, the ECR repository created has the naming convention of sagemaker-studio-. The final step performed by the CLI is to create a temporary build project in CodeBuild and start the build, which builds your container image, tags it, and pushes it to the ECR repository.
 
The great part about the CLI is you no longer have to set any of this up or worry about the underlying activities to easily build your container images from Amazon SageMaker Studio.

In [None]:
%%sh

sm-docker build . --repository legacycode:latest

## Initialize parameters used for SageMaker API calls

Here we are capturing various parameters that will be later used by different Sagemaker API calls

In [None]:
import sagemaker
import boto3

session = sagemaker.Session()

# Set a default S3 bucket
default_bucket = session.default_bucket()

# Get the region
region = boto3.Session().region_name

# Get the account
account = session.boto_session.client('sts').get_caller_identity()['Account']

# Get the SageMaker Execution Role
role = sagemaker.get_execution_role()

# Upload the input data and scripts into S3 bucket
S3_prefix= "legacycode2"

preprocessing_scripts_file= "../preprocessing/preprocess.py"
scripts_location = session.upload_data(preprocessing_scripts_file, 
 key_prefix=S3_prefix+"/scripts")

postprocessing_scripts_file= "../postprocessing/postprocess.py"
scripts_location = session.upload_data(postprocessing_scripts_file, 
 key_prefix=S3_prefix+"/scripts")

data_directory= "../data"
input_location = session.upload_data(data_directory, 
 key_prefix=S3_prefix+"/data/preproc/input")

## Pull the Amazon SageMaker *sklearn* pre-built docker image for processing job

In [None]:
from sagemaker import image_uris
sklearn_image_uri=image_uris.retrieve(framework='sklearn',region=region,version='0.23-1',image_scope='training')
print(sklearn_image_uri)

## Configure SageMaker Processing Job for Data Pre-Processing Step

Here is a sample configuration of Processing job. We are setting up the required arguments to trigger a SageMaker Processing Job. You will need to change the Input location based on your use-case to point to your specific input dataset. Here you'll notice we are using `preprocess.py` as our container entrypoint. This python script fill will have the data preprocessing logic.

In [None]:
## Create pre-processing job in script mode job using the pre-built sci-kit learn container
import os
import json
import boto3
import time

sm = boto3.client('sagemaker')

# Define parameters
instance_type = "ml.m5.xlarge"
volume_size = 20
max_runtime = 3600 # Default: 1h


timestamp = time.strftime('%Y%m%d-%H%M%S')
job_name = f'SM-Pre-Processing-Job-{timestamp}' 


#s3://sagemaker-us-west-2-123456789/legacycode1/scripts/data/preproc/input

# Define inputs/outputs

create_preprocessing_params = {
 "ProcessingInputs": [
 {
 'InputName': 'input_data',
 'S3Input': {
 'S3Uri': "s3://{}/{}/data/preproc/input".format(default_bucket, S3_prefix),
 'LocalPath': '/opt/ml/processing/input/data/',
 'S3DataType': 'S3Prefix',
 'S3InputMode': 'File'
 }
 },
 {
 'InputName': 'scripts',
 'S3Input': {
 'S3Uri': "s3://{}/{}/scripts".format(default_bucket,S3_prefix),
 'LocalPath': '/opt/ml/processing/input/scripts/',
 'S3DataType': 'S3Prefix',
 'S3InputMode': 'File'
 }
 }
 ],
 "ProcessingOutputConfig": {
 'Outputs': [
 {
 'OutputName': 'output_data',
 'S3Output': {
 'S3Uri': "s3://{}/{}/data/predict/input".format(default_bucket,S3_prefix),
 'LocalPath': '/opt/ml/processing/output',
 'S3UploadMode': 'EndOfJob'
 }
 }
 ]
 },
 "ProcessingJobName": job_name,
 "ProcessingResources": {
 'ClusterConfig': {
 'InstanceCount': 1,
 'InstanceType': instance_type,
 'VolumeSizeInGB': volume_size
 }
 },
 "StoppingCondition": {
 'MaxRuntimeInSeconds': max_runtime
 },
 "AppSpecification": {
 'ImageUri': sklearn_image_uri,
 'ContainerEntrypoint': ['python',"/opt/ml/processing/input/scripts/preprocess.py"]
 },
 "RoleArn": role
}
# Create processing job and return job ARN
sm.create_processing_job(**create_preprocessing_params)

## Let's wait and monitor the processing Job to complete before proceeding to next steps
The 3 steps (PreProcess, Inference & PostProcess) in this Notebook has inter-dependencies, so let's wait until each steps finishes.

In [None]:
# confirm that the processing job has started & Wait before proceeding to next cells
status = sm.describe_processing_job(ProcessingJobName=job_name)["ProcessingJobStatus"]
print("Processing Job {} current status: {}".format(job_name, status))

try:
 # wait for the job to finish and report the ending status
 sm.get_waiter("processing_job_completed_or_stopped").wait(ProcessingJobName=job_name)
 processing_info = sm.describe_processing_job(ProcessingJobName=job_name)
 status = processing_info["ProcessingJobStatus"]
 print("Processing Job {} ended with status: {} ".format(job_name, status))
except:
 print("Processing Job {} failed to start".format(job_name))
 # if exception is raised, that means it has failed
 message = sm.describe_processing_job(ProcessingJobName=job_name)["FailureReason"]
 print("Processing failed with the following error: {}".format(message))

## Configure SageMaker Processing Job for Inference Step using the custom image

Here is a sample configuration of Processing job used for inferencing. We are setting up the required arguments to trigger a SageMaker Processing Job. Here we are using the custom image that we build at the start of this notebook. You'll need to change the input dataset location to point to your batch input payload. The entrypoint script `predict.py` is the inference script that will read your input payload and run inference against your trained model.

In [None]:
## Create processing job using the customer container built in the above cell
import os
import json
import boto3
import time
from sagemaker import get_execution_role

sm = boto3.client('sagemaker')


# Get parameters
image_uri = '{}.dkr.ecr.{}.amazonaws.com/legacycode:latest'.format(account, region)
instance_type = "ml.m5.xlarge"
volume_size = 20
max_runtime = 3600 # Default: 1h
entrypoint = "/opt/ml/code/predict.py"

timestamp = time.strftime('%Y%m%d-%H%M%S')
job_name = f'SM-Inference-Process-Job-{timestamp}' 

# Define inputs/outputs

create_processing_params = {
 "ProcessingInputs": [
 {
 'InputName': 'input_data',
 'S3Input': {
 'S3Uri': "s3://{}/{}/data/predict/input".format(default_bucket, S3_prefix),
 'LocalPath': '/opt/ml/processing/input',
 'S3DataType': 'S3Prefix',
 'S3InputMode': 'File'
 }
 }
 ],
 "ProcessingOutputConfig": {
 'Outputs': [
 {
 'OutputName': 'output_data',
 'S3Output': {
 'S3Uri': "s3://{}/{}/data/postproc/input".format(default_bucket, S3_prefix),
 'LocalPath': '/opt/ml/processing/output',
 'S3UploadMode': 'EndOfJob'
 }
 }
 ]
 },
 "ProcessingJobName": job_name,
 "ProcessingResources": {
 'ClusterConfig': {
 'InstanceCount': 1,
 'InstanceType': instance_type,
 'VolumeSizeInGB': volume_size
 }
 },
 "StoppingCondition": {
 'MaxRuntimeInSeconds': max_runtime
 },
 "AppSpecification": {
 'ImageUri': image_uri,
 'ContainerEntrypoint': ['python', entrypoint]
 },
 "RoleArn": role
}
# Create processing job and return job ARN
sm.create_processing_job(**create_processing_params)

## Let's wait and monitor the Inference Job to complete before proceeding to next steps


In [None]:
# confirm that the processing job has started & Wait before proceeding to next cells
status = sm.describe_processing_job(ProcessingJobName=job_name)["ProcessingJobStatus"]
print("Processing Job {} current status: {}".format(job_name, status))

try:
 # wait for the job to finish and report the ending status
 sm.get_waiter("processing_job_completed_or_stopped").wait(ProcessingJobName=job_name)
 processing_info = sm.describe_processing_job(ProcessingJobName=job_name)
 status = processing_info["ProcessingJobStatus"]
 print("Processing Job {} ended with status: {} ".format(job_name, status))
except:
 print("Processing Job {} failed to start".format(job_name))
 # if exception is raised, that means it has failed
 message = sm.describe_processing_job(ProcessingJobName=job_name)["FailureReason"]
 print("Processing failed with the following error: {}".format(message))

## Configure SageMaker Processing Job for post Processing Step

Here is a sample configuration of Post-Processing job. We are setting up the required arguments to trigger a SageMaker Processing Job. You will need to change the Input location based on your use-case to point to your specific input dataset. Here you'll notice we are using `preprocess.py` as our container entrypoint. This python script fill will have the data post-processing logic.

In [None]:
## Create post-processing job in script mode job using the pre-built sci-kit learn container
import os
import json
import boto3
import time
from sagemaker import get_execution_role

sm = boto3.client('sagemaker')

# Define parameters
instance_type = "ml.m5.xlarge"
volume_size = 20
max_runtime = 3600 # Default: 1h

timestamp = time.strftime('%Y%m%d-%H%M%S')
job_name = f'SM-Post-Processing-Job-{timestamp}' 

# Define inputs/outputs

create_postprocessing_params = {
 "ProcessingInputs": [
 {
 'InputName': 'input_data',
 'S3Input': {
 'S3Uri': "s3://{}/{}/data/postproc/input".format(default_bucket, S3_prefix),
 'LocalPath': '/opt/ml/processing/input/data/',
 'S3DataType': 'S3Prefix',
 'S3InputMode': 'File'
 }
 },
 {
 'InputName': 'scripts',
 'S3Input': {
 'S3Uri': "s3://{}/{}/scripts".format(default_bucket,S3_prefix),
 'LocalPath': '/opt/ml/processing/input/scripts/',
 'S3DataType': 'S3Prefix',
 'S3InputMode': 'File'
 }
 }
 ],
 "ProcessingOutputConfig": {
 'Outputs': [
 {
 'OutputName': 'output_data',
 'S3Output': {
 'S3Uri': "s3://{}/{}/data/postproc/output".format(default_bucket, S3_prefix),
 'LocalPath': '/opt/ml/processing/output',
 'S3UploadMode': 'EndOfJob'
 }
 }
 ]
 },
 "ProcessingJobName": job_name,
 "ProcessingResources": {
 'ClusterConfig': {
 'InstanceCount': 1,
 'InstanceType': instance_type,
 'VolumeSizeInGB': volume_size
 }
 },
 "StoppingCondition": {
 'MaxRuntimeInSeconds': max_runtime
 },
 "AppSpecification": {
 'ImageUri': sklearn_image_uri,
 'ContainerEntrypoint': ['python'],
 "ContainerArguments": [
 "/opt/ml/processing/input/scripts/postprocess.py"
 ]
 },
 "RoleArn": role
}
# Create processing job and return job ARN
sm.create_processing_job(**create_postprocessing_params)

## Let's wait and monitor the PostProcessing Job to complete before proceeding to next steps


In [None]:
# confirm that the processing job has started & Wait before proceeding to next cells
status = sm.describe_processing_job(ProcessingJobName=job_name)["ProcessingJobStatus"]
print("Processing Job {} current status: {}".format(job_name, status))

try:
 # wait for the job to finish and report the ending status
 sm.get_waiter("processing_job_completed_or_stopped").wait(ProcessingJobName=job_name)
 processing_info = sm.describe_processing_job(ProcessingJobName=job_name)
 status = processing_info["ProcessingJobStatus"]
 print("Processing Job {} ended with status: {} ".format(job_name, status))
except:
 print("Processing Job {} failed to start".format(job_name))
 # if exception is raised, that means it has failed
 message = sm.describe_processing_job(ProcessingJobName=job_name)["FailureReason"]
 print("Processing failed with the following error: {}".format(message))

## Here we show how to test your custom script locally on this Jupyter Notebook (Optional)

First let's get the required packages needed to run our custom ML Script.

In [None]:
#Install requirements locally
!{sys.executable} -m pip install -r src/requirements.txt

Once you have all the libraries in your Notebook environment, its as easy as calling `python` to run your script with local argument

In [None]:
#Test preprocess script locally
!python ../preprocessing/preprocess.py local ../data/