# SageMaker Inference Pipeline with Scikit Learn and Linear Learner
ISO20022 pacs.008 inference pipeline notebook. This notebook uses training dataset to perform model training. It uses SageMaker Linear Learner to train a model. The problem is defined to be a `binary classification` problem of accepting or rejecting a pacs.008 message.

Amazon SageMaker provides a very rich set of [builtin algorithms](https://docs.aws.amazon.com/sagemaker/latest/dg/algorithms-choose.html) for model training and development. This notebook uses [Amazon SageMaker Linear Learner Algorithm](https://docs.aws.amazon.com/sagemaker/latest/dg/linear-learner.html) on training dataset to perform model training. The Amazon SageMaker linear learner algorithm provides a solution for both classification and regression problems. With the SageMaker algorithm, you can simultaneously explore different training objectives and choose the best solution from a validation set. You can also explore a large number of models and choose the best. The best model optimizes either of the following:
* Continuous objectives, such as mean square error, cross entropy loss, absolute error (regression models).
* Discrete objectives suited for classification, such as F1 measure, precision, recall, or accuracy (classification models).

ML Model development is an iterative process with several tasks that data scientists go through to produce an effective model that can solve business problem. The process typically involves:
* Data exploration and analysis
* Feature engineering
* Model development
* Model training and tuning
* Model deployment

We provide the accompanying notebook [pacs008_xgboost_local.ipynb](./pacs008_xgboost_local.ipynb) which demonstrates data exploration, analysis and feature engineering, focussing on text feature engineering. This notebook uses the results of analysis in [pacs008_xgboost_local.ipynb](./pacs008_xgboost_local.ipynb) to create a feature engineering pipeline using [SageMaker Inference Pipeline](https://docs.aws.amazon.com/sagemaker/latest/dg/inference-pipelines.html).

Here we define the ML problem to be a `binary classification` problem, that of predicting if a pacs.008 XML message with be processed sucessfully or lead to exception process. The predicts `Success` i.e. 1 or `Failure` i.e. 0. 

**Feature Engineering** 

Data pre-processing and featurizing the dataset by incorporating standard techniques or prior knowledge is a standard mechanism to make dataset meaningful for training. Once data has been pre-processed and transformed, it can be finally used to train an ML model using an algorithm. However, when the trained model is used for processing real time or batch prediction requests, the model receives data in a format which needs to be pre-processed (e.g. featurized) before it can be passed to the algorithm. In this notebook, we will demonstrate how you can build your ML Pipeline leveraging the Sagemaker Scikit-learn container and SageMaker XGBoost algorithm. After a model is trained, we deploy the Pipeline (Data preprocessing and XGBoost) as an **Inference Pipeline** behind a **single Endpoint** for real time inference and for **batch inferences** using Amazon SageMaker Batch Transform.

We use pacs.008 xml element `TEXT` to perform feature engineer i.e featurize text into new numeric features that can be used in making prodictions.

Since we featurize `InstrForNxtAgt` to numeric representations during training, we have to pre-processs to transform text into numeric features before using the trained model to make predictions.

**Inference Pipeline**

The diagram below shows how Amazon SageMaker Inference Pipeline works. It is used to deploy multi-container endpoints.

![SageMaker Inference Pipeline](../images/inference-pipeline.png)


**Inference Endpoint** 

The diagram below shows the places in the cross-border payment message flow where a call to ML inference endpoint can be injected to get inference from the ML model. The inference result can be used to take additional actions, including corrective actions before sending the message downstream.

![ML Inference Endpoint](../images/iso20022-prototype-real-time-inference.png)


**Further Reading:** 
For information on Amazon SageMaker Linear Learner algorithm and SageMaker Inference Pipeline visit the following references: 

[SageMaker Linear Learner Algorithm](https://docs.aws.amazon.com/sagemaker/latest/dg/linear-learner.html) 

[SageMaker Inference Pipeline](https://docs.aws.amazon.com/sagemaker/latest/dg/inference-pipelines.html)


## Basic Setup

In this step we do basic setup needed for rest of the notebook:
* Amazon SageMaker API client using boto3
* Amazon SageMaker session object
* AWS region
* AWS IAM role

In [None]:
import os
import boto3
import sagemaker
from sagemaker import get_execution_role

sm_client = boto3.Session().client('sagemaker')
sm_session = sagemaker.Session()
region = boto3.session.Session().region_name

role = get_execution_role()
print ("Notebook is running with assumed role {}".format (role))
print("Working with AWS services in the {} region".format(region))

### Provide S3 Bucket Name

In [None]:
# Working directory for the notebook
WORKDIR = os.getcwd()
BASENAME = os.path.dirname(WORKDIR)
print(f"WORKDIR: {WORKDIR}")
print(f"BASENAME: {BASENAME}")

# Create a directory storing local data
iso20022_data_path = 'iso20022-data'
if not os.path.exists(iso20022_data_path):
 # Create a new directory because it does not exist 
 os.makedirs(iso20022_data_path)

# Store all prototype assets in this bucket
s3_bucket_name = 'iso20022-prototype-t3'
s3_bucket_uri = 's3://' + s3_bucket_name

# Prefix for all files in this prototype
prefix = 'iso20022'

pacs008_prefix = prefix + '/pacs008'
raw_data_prefix = pacs008_prefix + '/raw-data'
labeled_data_prefix = pacs008_prefix + '/labeled-data'
training_data_prefix = pacs008_prefix + '/training-data'
training_headers_prefix = pacs008_prefix + '/training-headers'
test_data_prefix = pacs008_prefix + '/test-data'
training_job_output_prefix = pacs008_prefix + '/training-output'

print(f"Training data with headers will be uploaded to {s3_bucket_uri + '/' + training_headers_prefix}")
print(f"Training data will be uploaded to {s3_bucket_uri + '/' + training_data_prefix}")
print(f"Test data will be uploaded to {s3_bucket_uri + '/' + test_data_prefix}")
print(f"Training job output will be stored in {s3_bucket_uri + '/' + training_job_output_prefix}")

In [None]:
labeled_data_location = s3_bucket_uri + '/' + labeled_data_prefix
training_data_w_headers_location = s3_bucket_uri + '/' + training_headers_prefix
training_data_location = s3_bucket_uri + '/' + training_data_prefix
test_data_location = s3_bucket_uri + '/' + test_data_prefix
print(f"Raw labeled data location = {labeled_data_location}")
print(f"Training data with headers location = {training_data_w_headers_location}")
print(f"Training data location = {training_data_location}")
print(f"Test data location = {test_data_location}")

## Prepare Training Dataset 

1. Select training dataset from raw labeled dataset.
1. Split labeled dataset to training and test datasets.

In [None]:
import numpy as np 
import pandas as pd 
import matplotlib.pyplot as plt
import seaborn as sns
import string
from sklearn.model_selection import train_test_split
from sklearn import ensemble, metrics, model_selection, naive_bayes

color = sns.color_palette()

%matplotlib inline

### Download raw labeled dataset

In [None]:
# Download labeled raw dataset from S3
s3_client = boto3.client('s3')
s3_client.download_file(s3_bucket_name, labeled_data_prefix + '/labeled_data.csv', 'iso20022-data/labeled_data.csv')

# Read the train and test dataset and check the top few lines ##
labeled_raw_df = pd.read_csv("iso20022-data/labeled_data.csv")

In [None]:
labeled_raw_df.head()

### Select features for training

In [None]:
# Training features
fts=[
 'y_target', 
 'Document_FIToFICstmrCdtTrf_CdtTrfTxInf_Dbtr_PstlAdr_Ctry', 
 'Document_FIToFICstmrCdtTrf_CdtTrfTxInf_Cdtr_PstlAdr_Ctry', 
 'Document_FIToFICstmrCdtTrf_CdtTrfTxInf_RgltryRptg_DbtCdtRptgInd', 
 'Document_FIToFICstmrCdtTrf_CdtTrfTxInf_RgltryRptg_Authrty_Ctry', 
 'Document_FIToFICstmrCdtTrf_CdtTrfTxInf_RgltryRptg_Dtls_Cd',
 'Document_FIToFICstmrCdtTrf_CdtTrfTxInf_InstrForNxtAgt_InstrInf',
]

# New data frame with selected features
selected_df = labeled_raw_df[fts]
 
selected_df.head()

In [None]:
# Rename columns
selected_df = selected_df.rename(columns={
 'Document_FIToFICstmrCdtTrf_CdtTrfTxInf_Dbtr_PstlAdr_Ctry': 'Dbtr_PstlAdr_Ctry',
 'Document_FIToFICstmrCdtTrf_CdtTrfTxInf_Cdtr_PstlAdr_Ctry': 'Cdtr_PstlAdr_Ctry',
 'Document_FIToFICstmrCdtTrf_CdtTrfTxInf_RgltryRptg_DbtCdtRptgInd': 'RgltryRptg_DbtCdtRptgInd',
 'Document_FIToFICstmrCdtTrf_CdtTrfTxInf_RgltryRptg_Authrty_Ctry': 'RgltryRptg_Authrty_Ctry',
 'Document_FIToFICstmrCdtTrf_CdtTrfTxInf_RgltryRptg_Dtls_Cd': 'RgltryRptg_Dtls_Cd',
 'Document_FIToFICstmrCdtTrf_CdtTrfTxInf_InstrForNxtAgt_InstrInf': 'InstrForNxtAgt',
})

selected_df.head()

In [None]:
from sklearn.preprocessing import LabelEncoder

# Assign Pandas data types.
categorical_fts=[
 'Dbtr_PstlAdr_Ctry', 
 'Cdtr_PstlAdr_Ctry',
 'RgltryRptg_DbtCdtRptgInd', 
 'RgltryRptg_Authrty_Ctry', 
 'RgltryRptg_Dtls_Cd'
]

integer_fts=[
 
]

numeric_fts=[
 
]

text_fts=[
# Leave text as object 
# 'InstrForNxtAgt'
]

# Categorical features to categorical data type.
for col in categorical_fts:
 selected_df[col] = selected_df[col].astype(str).astype('category')

# Integer features to int64 data type. 
for col in integer_fts:
 selected_df[col] = selected_df[col].astype(str).astype('int64')
 
# Numeric features to float64 data type. 
for col in numeric_fts:
 selected_df[col] = selected_df[col].astype(str).astype('float64')

# Text features to string data type. 
for col in text_fts:
 selected_df[col] = selected_df[col].astype(str).astype('string')

label_encoder = LabelEncoder()
selected_df['y_target'] = label_encoder.fit_transform(selected_df['y_target'])
 
selected_df.dtypes

In [None]:
selected_df.info()

In [None]:
selected_df

In [None]:
X_train_df, X_test_df, y_train_df, y_test_df = train_test_split(selected_df, selected_df['y_target'], test_size=0.20, random_state=299, shuffle=True)

print("Number of rows in train dataset : ",X_train_df.shape[0])
print("Number of rows in test dataset : ",X_test_df.shape[0])

In [None]:
X_train_df

In [None]:
X_test_df

In [None]:
## Save training and test datasets to CSV

train_data_w_headers_output_path = 'iso20022-data/train_data_w_headers.csv'
print(f'Saving training data with headers to {train_data_w_headers_output_path}')
X_train_df.to_csv(train_data_w_headers_output_path, index=False)

train_data_output_path = 'iso20022-data/train_data.csv'
print(f'Saving training data without headers to {train_data_output_path}')
X_train_df.to_csv(train_data_output_path, header=False, index=False)

test_data_output_path = 'iso20022-data/test_data.csv'
print(f'Saving test data without headers to {test_data_output_path}')
X_test_df.to_csv(test_data_output_path, header=False, index=False)

### Upload training and test datasets to S3 for training

In [None]:
train_input_data_location = sm_session.upload_data(
 path=train_data_w_headers_output_path,
 bucket=s3_bucket_name,
 key_prefix=training_headers_prefix,
)
print(f'Uploaded traing data with headers to: {train_input_data_location}')

train_input_data_location = sm_session.upload_data(
 path=train_data_output_path,
 bucket=s3_bucket_name,
 key_prefix=training_data_prefix,
)
print(f'Uploaded data without headers to: {train_input_data_location}')

test_input_data_location = sm_session.upload_data(
 path=test_data_output_path,
 bucket=s3_bucket_name,
 key_prefix=test_data_prefix,
)
print(f'Uploaded data without headers to: {test_input_data_location}')

# Feature Engineering

## Create a Scikit-learn script to train with 
To run Scikit-learn on Sagemaker `SKLearn` Estimator with a script as an entry point. The training script is very similar to a training script you might run outside of SageMaker, but you can access useful properties about the training environment through various environment variables, such as:

* SM_MODEL_DIR: A string representing the path to the directory to write model artifacts to. These artifacts are uploaded to S3 for model hosting.
* SM_OUTPUT_DIR: A string representing the filesystem path to write output artifacts to. Output artifacts may include checkpoints, graphs, and other files to save, not including model artifacts. These artifacts are compressed and uploaded to S3 to the same S3 prefix as the model artifacts.

Supposing two input channels, 'train' and 'test', were used in the call to the Chainer estimator's fit() method, the following will be set, following the format SM_CHANNEL_[channel_name]:

* SM_CHANNEL_TRAIN: A string representing the path to the directory containing data in the 'train' channel
* SM_CHANNEL_TEST: Same as above, but for the 'test' channel.

A typical training script loads data from the input channels, configures training with hyperparameters, trains a model, and saves a model to model_dir so that it can be hosted later. Hyperparameters are passed to your script as arguments and can be retrieved with an argparse.ArgumentParser instance.

### Create SageMaker Scikit Estimator 

To run our Scikit-learn training script on SageMaker, we construct a `sagemaker.sklearn.estimator.sklearn` estimator, which accepts several constructor arguments:

* __entry_point__: The path to the Python script SageMaker runs for training and prediction.
* __role__: Role ARN
* __framework_version__: Scikit-learn version you want to use for executing your model training code.
* __train_instance_type__ *(optional)*: The type of SageMaker instances for training. __Note__: Because Scikit-learn does not natively support GPU training, Sagemaker Scikit-learn does not currently support training on GPU instance types.
* __sagemaker_session__ *(optional)*: The session used to train on Sagemaker.

In [None]:
from sagemaker.sklearn.estimator import SKLearn

preprocessing_job_name = 'pacs008-preprocessor-ll'
print('data preprocessing job name: ' + preprocessing_job_name)

FRAMEWORK_VERSION = "0.23-1"
source_dir = "../sklearn-transformers"
script_file = "pacs008_sklearn_featurizer.py"

sklearn_preprocessor = SKLearn(
 entry_point=script_file,
 source_dir=source_dir,
 role=role,
 framework_version=FRAMEWORK_VERSION,
 instance_type="ml.c4.xlarge",
 sagemaker_session=sm_session,
 base_job_name=preprocessing_job_name,
)

In [None]:
sklearn_preprocessor.fit({"train": train_input_data_location})

### Batch transform our training data 
Now that our proprocessor is properly fitted, let's go ahead and preprocess our training data. Let's use batch transform to directly preprocess the raw data and store right back into s3.

In [None]:
# Define a SKLearn Transformer from the trained SKLearn Estimator
transformer = sklearn_preprocessor.transformer(
 instance_count=1,
 instance_type="ml.m5.xlarge",
 assemble_with="Line",
 accept="text/csv",
)

In [None]:
# Preprocess training input
transformer.transform(train_input_data_location, content_type="text/csv")
print("Waiting for transform job: " + transformer.latest_transform_job.job_name)
transformer.wait()
preprocessed_train = transformer.output_path

# Train a Linear Learner Model

## Fit a LinearLearner Model with the preprocessed data 
Let's take the preprocessed training data and fit a LinearLearner Model. Sagemaker provides prebuilt algorithm containers that can be used with the Python SDK. The previous Scikit-learn job preprocessed the labeled raw pacs.008 dataset into useable training data that we can now use to fit a binary classifier Linear Learner model.

For more on Linear Learner see: https://docs.aws.amazon.com/sagemaker/latest/dg/linear-learner.html

In [None]:
from sagemaker.image_uris import retrieve

ll_image = retrieve("linear-learner", boto3.Session().region_name)

In [None]:
# Set job name
training_job_name = 'pacs008-ll-training'
print('Linear Learner training job name: ' + training_job_name)

# S3 bucket for storing model artifacts
training_job_output_location = s3_bucket_uri + '/' + training_job_output_prefix + '/ll_model'

ll_estimator = sagemaker.estimator.Estimator(
 ll_image,
 role,
 instance_count=1,
 instance_type="ml.m4.2xlarge",
 volume_size=20,
 max_run=3600,
 input_mode="File",
 output_path=training_job_output_location,
 sagemaker_session=sm_session,
 base_job_name=training_job_name,
)

# binary_classifier_model_selection_criteria: accuracy is default
# - accuracy | f_beta | precision_at_target_recall |recall_at_target_precision | loss_function
# feature_dim=auto, # auto or actual number, default is auto
# epochs=15, default is 15
# learning_rate=auto or actual number 0.05 or 0.005
# loss=logistic | auto |hinge_loss, default is logistic
# mini_batch_size=32, default is 1000
# num_models=auto, or a number
# optimizer=auto or sgd | adam | rmsprop
ll_estimator.set_hyperparameters(
 predictor_type="binary_classifier",
 binary_classifier_model_selection_criteria="accuracy",
 epochs=15,
 mini_batch_size=32)

ll_train_data = sagemaker.inputs.TrainingInput(
 preprocessed_train, # set after preprocessing job completes
 distribution="FullyReplicated",
 content_type="text/csv",
 s3_data_type="S3Prefix",
)

data_channels = {"train": ll_train_data}
ll_estimator.fit(inputs=data_channels, logs=True)

# Serial Inference Pipeline with Scikit preprocessor and Linear Learner 
## Set up the inference pipeline 
Setting up a Machine Learning pipeline can be done with the Pipeline Model. This sets up a list of models in a single endpoint. We configure our pipeline model with the fitted Scikit-learn inference model (data preprocessing/feature engineering model) and the fitted Linear Learner model. Deploying the model follows the standard ```deploy``` pattern in the SageMaker Python SDK.


In [None]:
from sagemaker.model import Model
from sagemaker.pipeline import PipelineModel
import boto3
from time import gmtime, strftime

timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

# The two SageMaker Models: one for data preprocessing, and second for inference
scikit_learn_inferencee_model = sklearn_preprocessor.create_model()
linear_learner_model = ll_estimator.create_model()

model_name = "pacs008-ll-inference-pipeline-" + timestamp_prefix
endpoint_name = "pacs008-ll-inference-pipeline-ep-" + timestamp_prefix
sm_model = PipelineModel(
 name=model_name, role=role, models=[scikit_learn_inferencee_model, linear_learner_model]
)

sm_model.deploy(initial_instance_count=1, instance_type="ml.c4.xlarge", endpoint_name=endpoint_name)

### Store Model Name and Endpoint Name in Notebook Magic Store

These notebook magic store values are used in the example batch transform notebook.

In [None]:
%store model_name
%store endpoint_name

## Make a request to our pipeline endpoint 

The diagram below shows the places in the cross-border payment message flow where a call to ML inference endpoint can be injected to get inference from the ML model. The inference result can be used to take additional actions, including corrective actions before sending the message downstream.

![ML Inference Endpoint](../images/iso20022-prototype-real-time-inference.png)

Here we just grab the first line from the test data (you'll notice that the inference python script is very particular about the ordering of the inference request data). The ```ContentType``` field configures the first container, while the ```Accept``` field configures the last container. You can also specify each container's ```Accept``` and ```ContentType``` values using environment variables.

We make our request with the payload in ```'text/csv'``` format, since that is what our script currently supports. If other formats need to be supported, this would have to be added to the ```output_fn()``` method in our entry point. Note that we set the ```Accept``` to ```application/json```, since Linear Learner does not support ```text/csv``` ```Accept```. The inference output in this case is trying to predict `Success` or `Failure` of ISO20022 pacs.008 payment message using only the subset of message XML elements in the message i.e. features on which model was trained. 

In [None]:
from sagemaker.predictor import Predictor
from sagemaker.serializers import CSVSerializer

# payload_1, expect: Failure
#payload_1 = "US,GB,,,,/SVC/It is to be delivered in three days. Greater than three days penalty add 2bp per day"
payload_1 = "MX,GB,,,,/SVC/It is to be delivered in four days. Greater than four days penalty add 2bp per day"

# payload_2, expect: Success
payload_2 = "MX,GB,,,,"
#payload_2 = "US,IE,,,,/TRSY/Treasury Services Platinum Customer"

# payload_3, expect: Failure
payload_3 = "TH,US,,,,/SVC/It is to be delivered in four days. Greater than four days penalty add 2bp per day"
#payload_3 = "CA,US,,,,/SVC/It is to be delivered in three days. Greater than three days penalty add 2bp per day"

# payload_4, expect: Success
payload_4 = "IN,CA,DEBT,IN,00.P0006,"

# payload_5, expect: Success
payload_5 = "IE,IN,CRED,IN,0,/REG/15.X0003 FDI in Transportation"
# Failure
payload_5 = "IE,IN,CRED,IN,0,/REG/15.X0009 FDI in Agriculture "
# Failure
payload_5 = "IE,IN,CRED,IN,0,/REG/15.X0004 retail"

# payload_6, expect: Failure
payload_6 = "IE,IN,CRED,IN,0,/REG/99.C34698"
#payload_6 = "MX,IE,,,,/TRSY/eweweww"

endpoint_name = 'pacs008-ll-inference-pipeline-ep-2021-11-25-00-58-52'

predictor = Predictor(
 endpoint_name=endpoint_name, sagemaker_session=sm_session, serializer=CSVSerializer()
)

print(f"1. Expect Failure i.e. 0, {predictor.predict(payload_1)}")
print(f"2. Expect Success i.e. 1, {predictor.predict(payload_2)}")
print(f"3. Expect Failure i.e. 0, {predictor.predict(payload_3)}")
print(f"4. Expect Success i.e. 1, {predictor.predict(payload_4)}")
print(f"5. Expect Success i.e. 1, {predictor.predict(payload_5)}")
print(f"6. Expect Failure i.e. 0, {predictor.predict(payload_6)}")


# Delete Endpoint
Once we are finished with the endpoint, we clean up the resources!

In [None]:
sm_client = sm_session.boto_session.client("sagemaker")
sm_client.delete_endpoint(EndpointName=endpoint_name)