# Inference Pipeline with Custom Containers and xgBoost


---

This notebook's CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook. 

![This us-west-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/us-west-2/contrib|inference_pipeline_custom_containers|inference-pipeline.ipynb)

---

Typically a Machine Learning (ML) process consists of few steps: data gathering with various ETL jobs, pre-processing the data, featurizing the dataset by incorporating standard techniques or prior knowledge, and finally training an ML model using an algorithm. 
In many cases, when the trained model is used for processing real time or batch prediction requests, the model receives data in a format which needs to pre-processed (e.g. featurized) before it can be passed to the algorithm. In the following notebook, we will demonstrate how you can build your ML Pipeline leveraging the ability to create custom Sagemaker algorithms and the out of the box SageMaker xgBoost algorithm. After the model is trained we will deploy the ML Pipeline (data preprocessing, the xgBoost classifier, and data postprocessing) as an Inference Pipeline behind a single SageMaker Endpoint for real time inference. We will also use the preprocessor with batch transformation using Amazon SageMaker Batch Transform to prepare xgBoost training data.

![Inference Diagram](./Inference_diagram.png)

The toy problem that is being solved here is to match a set of keywords to a category of questions. From there we can match that category against a list of available agents who specialize in answering that category of question. The agents and their availability is stored externally in a DynamoDB database. The data transformations, matching against our model, and querying of the database are all done as part of the inference pipeline.

The preprocessing step of the pipeline encodes a comma-separated list of words into a format that xgBoost understands using a CountVectorizer. It also trains a LabelEncoder, which is used to transform from the categories of questions to a set of integers - having the labels encoded as integers is also a requirement of the xgBoost multiclass classifer. 

The xgBoost model maps the encoded list of words to an integer, which represents the encoded class of question that best matches those words.

Finally, the postprocessing step of the pipeline uses the LabelEncoding model trained in the preprocessing step to map the number representing the classification of the question back to the text. It then takes the category and queries dynamodb for available agents that matches that category.

Let's first create our Sagemaker session and role, and create a S3 prefix to use for the notebook example.

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
!mkdir -p returns_data
!python3 generate-training-data.py --samples 100000 --filename returns_data/samples.csv

In [3]:
!python3 load-ddb-data.py PipelineLookupTable

In [4]:
# S3 prefix

import sagemaker
from sagemaker import get_execution_role

sagemaker_session = sagemaker.Session()

# Get a SageMaker-compatible role used by this Notebook Instance.
role = get_execution_role()

bucket = sagemaker_session.default_bucket()
prefix = "Custom-Pipeline-Inference-Example"

## Upload the data for training <a class="anchor" id="upload_data"></a>

When training large models with huge amounts of data, you'll typically use big data tools, like Amazon Athena, AWS Glue, or Amazon EMR, to create your data in S3. We can use the tools provided by the SageMaker Python SDK to upload the data to a default bucket. 

In [5]:
WORK_DIRECTORY = "returns_data"

train_input = sagemaker_session.upload_data(
    path="{}/{}".format(WORK_DIRECTORY, "samples.csv"),
    bucket=bucket,
    key_prefix="{}/{}".format(prefix, "train"),
)

### Set up a loader function

The load_data function pulls in the CSV data into two columns: the first column of the CSV is mapped to the label, and every subsequent CSV column is loaded as a dictionary into the second Pandas column

In [6]:
import pandas as pd
import csv


def load_data(raw, columns, skip_first_row=True):
    recs = [(row[0], set(row[1:])) for row in csv.reader(raw)]
    if skip_first_row:
        return pd.DataFrame.from_records(recs[1:], columns=columns)
    else:
        return pd.DataFrame.from_records(recs, columns=columns)


def load(files, columns, skip_first_row=True):
    raw_data = []
    for file in files:
        raw_data.append(load_data(open(file), columns, skip_first_row))

    return pd.concat(raw_data)

In [7]:
df = load(["returns_data/samples.csv"], ["label", "words"])

In [8]:
df.head()

Unnamed: 0,label,words
0,category_properties,"{rental, properties, investment}"
1,category_medical,"{medical, covid}"
2,category_itemization,"{donation, itemization}"
3,category_estate taxes,"{medical, inheritance, estate}"
4,category_estate taxes,{estate}


In [9]:
num_labels = len(df["label"].unique())
num_labels

6

### Words in our dataset

Let's take a look at the set of words being used. We use a CountVectorizer with the set analyzer to encode the column.

In [10]:
from sklearn.feature_extraction.text import CountVectorizer

vectorizer = CountVectorizer(analyzer=set)
count_res = vectorizer.fit_transform(df["words"])
vectorizer.get_feature_names()

['401k',
 '403b',
 'capital',
 'charitable',
 'covid',
 'deduction',
 'deferment',
 'delay',
 'donation',
 'estate',
 'expense',
 'gains',
 'inheritance',
 'investment',
 'ira',
 'itemization',
 'late',
 'local',
 'losses',
 'medical',
 'mortgage',
 'payment',
 'properties',
 'rental',
 'state',
 'tax']

In [11]:
import boto3

ecr_namespace = "custompipeline/"
prefix = "preprocessor"

ecr_repository_name = ecr_namespace + prefix
role = get_execution_role()
account_id = role.split(":")[4]
region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
bucket = sagemaker_session.default_bucket()
container_image_uri = "{0}.dkr.ecr.{1}.amazonaws.com/{2}:latest".format(
    account_id, region, ecr_repository_name
)
print(container_image_uri)

305752278501.dkr.ecr.us-west-2.amazonaws.com/custompipeline/preprocessor:latest


In [12]:
import sagemaker

custom_preprocessor = sagemaker.estimator.Estimator(
    container_image_uri,
    role,
    train_instance_count=1,
    # train_instance_type='local', # use local mode
    train_instance_type="ml.m5.4xlarge",
    base_job_name=prefix,
)

train_config = sagemaker.session.s3_input(
    "s3://{0}/{1}/train/".format(bucket, prefix), content_type="text/csv"
)
val_config = sagemaker.session.s3_input(
    "s3://{0}/{1}/val/".format(bucket, prefix), content_type="text/csv"
)

custom_preprocessor.fit({"train": train_input})

Parameter image_name will be renamed to image_uri in SageMaker Python SDK v2.
's3_input' class will be renamed to 'TrainingInput' in SageMaker Python SDK v2.
's3_input' class will be renamed to 'TrainingInput' in SageMaker Python SDK v2.
's3_input' class will be renamed to 'TrainingInput' in SageMaker Python SDK v2.


2020-10-02 00:28:04 Starting - Starting the training job...
2020-10-02 00:28:06 Starting - Launching requested ML instances......
2020-10-02 00:29:07 Starting - Preparing the instances for training...
2020-10-02 00:29:59 Downloading - Downloading input data
2020-10-02 00:29:59 Training - Downloading the training image.....[34mStarting script[0m
[34marguments: ['main.py', 'train'][0m
[34mstarting training...
[0m
[34mHyperparameters configuration:[0m
[34m{}
[0m
[34mInput data configuration:[0m
[34m{'train': {'RecordWrapperType': 'None',
           'S3DistributionType': 'FullyReplicated',
           'TrainingInputMode': 'File'}}
[0m
[34mList of files in train channel: [0m
[34m/opt/ml/input/data/train/samples.csv
[0m
[34mResource configuration:[0m
[34m{'current_host': 'algo-1',
 'hosts': ['algo-1'],
 'network_interface_name': 'eth0'}[0m
[34m<class 'pandas.core.frame.DataFrame'>[0m
[34mRangeIndex: 100000 entries, 0 to 99999[0m
[34mData columns (total 2 columns):
 

In [13]:
from time import gmtime, strftime

timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())
endpoint_name = "simple-ep-" + timestamp_prefix
predictor = custom_preprocessor.deploy(
    initial_instance_count=1, instance_type="ml.c5.4xlarge", endpoint_name=endpoint_name
)

Parameter image will be renamed to image_uri in SageMaker Python SDK v2.


-----------!

In [14]:
from sagemaker.predictor import (
    json_serializer,
    csv_serializer,
    json_deserializer,
    RealTimePredictor,
)
from sagemaker.content_types import CONTENT_TYPE_CSV, CONTENT_TYPE_JSON

payload = "rental,peanut,butter\ncovid"
print("content type csv", CONTENT_TYPE_CSV)

predictor = RealTimePredictor(
    endpoint=endpoint_name,
    sagemaker_session=sagemaker_session,
    serializer=csv_serializer,
    content_type=CONTENT_TYPE_CSV,
    accept=CONTENT_TYPE_CSV,
)

print(predictor.predict(payload))

content type csv text/csv
b'0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0\n0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0\n'


In [None]:
sm_client = sagemaker_session.boto_session.client("sagemaker")
sm_client.delete_endpoint(EndpointName=endpoint_name)

## Batch transform our training data <a class="anchor" id="preprocess_train_data"></a>
Now that our proprocessor is properly fitted, let's go ahead and preprocess our training data. Let's use batch transform to directly preprocess the raw data and store right back into s3.

In [15]:
# Define a SKLearn Transformer from the trained SKLearn Estimator
transformer = custom_preprocessor.transformer(
    instance_count=1, instance_type="ml.m4.xlarge", assemble_with="Line", accept="text/csv"
)

Parameter image will be renamed to image_uri in SageMaker Python SDK v2.
Using already existing model: preprocessor-2020-10-02-00-28-04-621


In [16]:
# Preprocess training input
transformer.transform(train_input, content_type="text/csv", split_type="Line")
print("Waiting for transform job: " + transformer.latest_transform_job.job_name)
transformer.wait()
preprocessed_train = transformer.output_path
print(preprocessed_train)

Waiting for transform job: preprocessor-2020-10-02-00-36-49-266
...........................
[32m2020-10-02T00:41:08.139:[sagemaker logs]: MaxConcurrentTransforms=1, MaxPayloadInMB=6, BatchStrategy=MULTI_RECORD[0m
[34mStarting script[0m
[34marguments: ['main.py', 'serve'][0m
[34mStarting the inference server with 4 workers.[0m
[34musing port:  8080[0m
[34mnginx.conf: worker_processes 1;[0m
[34mdaemon off; #Prevent forking

[0m
[34mpid /tmp/nginx.pid;[0m
[34merror_log /var/log/nginx/error.log;
[0m
[34mevents {
  # defaults[0m
[34m}
[0m
[35mStarting script[0m
[35marguments: ['main.py', 'serve'][0m
[35mStarting the inference server with 4 workers.[0m
[35musing port:  8080[0m
[35mnginx.conf: worker_processes 1;[0m
[35mdaemon off; #Prevent forking

[0m
[35mpid /tmp/nginx.pid;[0m
[35merror_log /var/log/nginx/error.log;
[0m
[35mevents {
  # defaults[0m
[35m}
[0m
[34mhttp {
  include /etc/nginx/mime.types;
  default_type application/octet-stream;
  acc

## Fit a xgBoost Model with the preprocessed data <a class="anchor" id="training_model"></a>
Let's take the preprocessed training data and fit a xgBoost Model. Sagemaker provides prebuilt algorithm containers that can be used with the Python SDK.

In [17]:
from sagemaker.amazon.amazon_estimator import get_image_uri

xgboost_container = get_image_uri(region, "xgboost")
xgboost_container

'get_image_uri' method will be deprecated in favor of 'ImageURIProvider' class in SageMaker Python SDK v2.
There is a more up to date SageMaker XGBoost image. To use the newer image, please set 'repo_version'='1.0-1'. For example:
	get_image_uri(region, 'xgboost', '1.0-1').


'433757028032.dkr.ecr.us-west-2.amazonaws.com/xgboost:1'

In [18]:
xgboost_hyperparameters = {
    "max_depth": "5",
    "eta": "0.2",
    "gamma": "4",
    "min_child_weight": "6",
    "silent": "0",
    "objective": "multi:softmax",
    "num_class": num_labels,
    "num_round": "10",
}

# set an output path where the trained model will be saved
output_path = "s3://{}/{}/{}/output".format(bucket, prefix, "xgboost-pipeline-training")

In [19]:
xgboost_estimator = sagemaker.estimator.Estimator(
    image_name=xgboost_container,
    hyperparameters=xgboost_hyperparameters,
    role=sagemaker.get_execution_role(),
    train_instance_count=1,
    train_instance_type="ml.m5.4xlarge",
    train_volume_size=5,  # 5 GB
    output_path=output_path,
)

Parameter image_name will be renamed to image_uri in SageMaker Python SDK v2.


In [20]:
xgboost_train_data = sagemaker.session.s3_input(
    preprocessed_train,
    distribution="FullyReplicated",
    content_type="text/csv",
    s3_data_type="S3Prefix",
)

's3_input' class will be renamed to 'TrainingInput' in SageMaker Python SDK v2.


In [21]:
# execute the XGBoost training job
xgboost_estimator.fit({"train": xgboost_train_data})

2020-10-02 00:41:34 Starting - Starting the training job...
2020-10-02 00:41:36 Starting - Launching requested ML instances......
2020-10-02 00:42:37 Starting - Preparing the instances for training...
2020-10-02 00:43:31 Downloading - Downloading input data...
2020-10-02 00:44:04 Training - Training image download completed. Training in progress.
2020-10-02 00:44:04 Uploading - Uploading generated training model
2020-10-02 00:44:04 Completed - Training job completed
[34mArguments: train[0m
[34m[2020-10-02:00:43:52:INFO] Running standalone xgboost training.[0m
[34m[2020-10-02:00:43:52:INFO] Path /opt/ml/input/data/validation does not exist![0m
[34m[2020-10-02:00:43:52:INFO] File size need to be processed in the node: 5.15mb. Available memory size in the node: 54839.87mb[0m
[34m[2020-10-02:00:43:52:INFO] Determined delimiter of CSV input is ','[0m
[34m[00:43:52] S3DistributionType set as FullyReplicated[0m
[34m[00:43:52] 100000x26 matrix with 2600000 entries loaded from /opt

# Serial Inference Pipeline with the preprocessor, xgBoost classifier and postprocessor <a class="anchor" id="serial_inference"></a>


## Set up the inference pipeline <a class="anchor" id="pipeline_setup"></a>
Setting up a Machine Learning pipeline can be done with the Pipeline Model. This sets up a list of models in a single endpoint; in this example, we configure our pipeline model with the fitted preprocessor model, the fitted xgBoost model, and the postprocessor (which uses the preprocessor model data). Deploying the model follows the same ```deploy``` pattern in the SDK.

Notice that we pass in the trained model from the proprocessor into the postpostprocessor. The reason we do this is so that the postprocesser can access the LabelEncoder that was trained in the preprocessor to invert that operation. This allows the inference pipeline to return the actual name of the category instead of the category label (e.g. "medical" instead of 7).

In [22]:
ecr_namespace = "custompipeline/"
prefix = "postprocessor"
ecr_repository_name = ecr_namespace + prefix
postprocessor_container_image_uri = "{0}.dkr.ecr.{1}.amazonaws.com/{2}:latest".format(
    account_id, region, ecr_repository_name
)
postprocessor_container_image_uri

'305752278501.dkr.ecr.us-west-2.amazonaws.com/custompipeline/postprocessor:latest'

In [23]:
from sagemaker.model import Model

# print(Model.__init__.__doc__)
custom_postprocessor = Model(
    image=postprocessor_container_image_uri,
    model_data=custom_preprocessor.model_data,
    role=role,
    sagemaker_session=sagemaker_session,
)

Parameter image will be renamed to image_uri in SageMaker Python SDK v2.


In [24]:
from sagemaker.model import Model
from sagemaker.pipeline import PipelineModel
import boto3
from time import gmtime, strftime

timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

preprocessor_model = custom_preprocessor.create_model()
classifier_model = xgboost_estimator.create_model()
postprocessor_model = custom_postprocessor

model_name = "inference-pipeline-" + timestamp_prefix
endpoint_name = "inference-pipeline-ep-" + timestamp_prefix
sm_model = PipelineModel(
    name=model_name, role=role, models=[preprocessor_model, classifier_model, postprocessor_model]
)

sm_model.deploy(initial_instance_count=1, instance_type="ml.c4.xlarge", endpoint_name=endpoint_name)

Parameter image will be renamed to image_uri in SageMaker Python SDK v2.
Parameter image will be renamed to image_uri in SageMaker Python SDK v2.


-------------!

## Make a request to our pipeline endpoint <a class="anchor" id="pipeline_inference_request"></a>

Here we just grab the first line from the test data (you'll notice that the inference python script is very particular about the ordering of the inference request data). The ```ContentType``` field configures the first container, while the ```Accept``` field configures the last container. You can also specify each container's ```Accept``` and ```ContentType``` values using environment variables.


In [25]:
from sagemaker.predictor import (
    json_serializer,
    csv_serializer,
    json_deserializer,
    RealTimePredictor,
)
from sagemaker.content_types import CONTENT_TYPE_CSV, CONTENT_TYPE_JSON

payload = "rental,peanut,butter\ncovid"

predictor = RealTimePredictor(
    endpoint=endpoint_name,
    sagemaker_session=sagemaker_session,
    serializer=csv_serializer,
    content_type=CONTENT_TYPE_CSV,
    accept=CONTENT_TYPE_JSON,
)

print(predictor.predict(payload))

b'{"response": [{"category": "properties", "agent": {"ID": "2345", "FirstName": "Megan", "LastName": "Duvernoy"}}, {"category": "medical", "agent": {"ID": "5678", "FirstName": "Mohammad", "LastName": "Asif"}}]}'


## Delete Endpoint <a class="anchor" id="delete_endpoint"></a>
Once we are finished with the endpoint, we clean up the resources!

In [26]:
sm_client = sagemaker_session.boto_session.client("sagemaker")
sm_client.delete_endpoint(EndpointName=endpoint_name)

{'ResponseMetadata': {'RequestId': '9eda73a2-7319-4e8a-ae5e-a0edf85d86d2',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '9eda73a2-7319-4e8a-ae5e-a0edf85d86d2',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '0',
   'date': 'Fri, 02 Oct 2020 00:50:50 GMT'},
  'RetryAttempts': 0}}

## Notebook CI Test Results

This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.

![This us-east-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/us-east-1/contrib|inference_pipeline_custom_containers|inference-pipeline.ipynb)

![This us-east-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/us-east-2/contrib|inference_pipeline_custom_containers|inference-pipeline.ipynb)

![This us-west-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/us-west-1/contrib|inference_pipeline_custom_containers|inference-pipeline.ipynb)

![This ca-central-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ca-central-1/contrib|inference_pipeline_custom_containers|inference-pipeline.ipynb)

![This sa-east-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/sa-east-1/contrib|inference_pipeline_custom_containers|inference-pipeline.ipynb)

![This eu-west-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-west-1/contrib|inference_pipeline_custom_containers|inference-pipeline.ipynb)

![This eu-west-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-west-2/contrib|inference_pipeline_custom_containers|inference-pipeline.ipynb)

![This eu-west-3 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-west-3/contrib|inference_pipeline_custom_containers|inference-pipeline.ipynb)

![This eu-central-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-central-1/contrib|inference_pipeline_custom_containers|inference-pipeline.ipynb)

![This eu-north-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-north-1/contrib|inference_pipeline_custom_containers|inference-pipeline.ipynb)

![This ap-southeast-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-southeast-1/contrib|inference_pipeline_custom_containers|inference-pipeline.ipynb)

![This ap-southeast-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-southeast-2/contrib|inference_pipeline_custom_containers|inference-pipeline.ipynb)

![This ap-northeast-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-northeast-1/contrib|inference_pipeline_custom_containers|inference-pipeline.ipynb)

![This ap-northeast-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-northeast-2/contrib|inference_pipeline_custom_containers|inference-pipeline.ipynb)

![This ap-south-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-south-1/contrib|inference_pipeline_custom_containers|inference-pipeline.ipynb)
