# Amazon SageMaker Batch Transform: Associate prediction results with their corresponding input records
_**Use SageMaker's XGBoost to train a binary classification model and for a list of tumors in batch file, predict if each is malignant**_

_**It also shows how to use the input output joining / filter feature in Batch transform in details**_

---

## Setup

Let's start by specifying:

* The SageMaker role arn used to give training and batch transform access to your data. The snippet below will use the same role used by your SageMaker notebook instance. Otherwise, specify the full ARN of a role with the SageMakerFullAccess policy attached.
* The S3 bucket that you want to use for training and storing model objects.

In [None]:
import os
import boto3
import sagemaker
from sagemaker import get_execution_role
from time import gmtime, strftime

role = get_execution_role()

region = boto3.Session().region_name

sagemaker_session = sagemaker.Session()

bucket=sagemaker.Session().default_bucket()
prefix = 'sagemaker/DEMO-xgboost-tripfare'

In [None]:
%store
%store -r

## XGBoost Bring Your Own Model

Amazon SageMaker includes functionality to support a hosted notebook environment, distributed, serverless training, and real-time hosting. We think it works best when all three of these services are used together, but they can also be used independently. Some use cases may only require hosting. Maybe the model was trained prior to Amazon SageMaker existing, in a different service.

This section shows how to use a pre-existing trained XGBoost model with the Amazon SageMaker XGBoost Algorithm container to quickly create a hosted endpoint for that model. 

In [None]:
container = sagemaker.image_uris.retrieve(region=boto3.Session().region_name, framework='xgboost', version='1.2-2')

In [None]:
%%time

model_file_name = "DEMO-byo-xgboost-model"
model_name = model_file_name + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
model_data = model_url
print(model_data)
sm_client = boto3.client("sagemaker")


primary_container = {
 "Image": container,
 "ModelDataUrl": model_data,
}

create_model_response2 = sm_client.create_model(
 ModelName=model_name, ExecutionRoleArn=role, PrimaryContainer=primary_container
)

print(create_model_response2["ModelArn"])

In [None]:
endpoint_config_name = "DEMO-XGBoostEndpointConfig-" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print(endpoint_config_name)
create_endpoint_config_response = sm_client.create_endpoint_config(
 EndpointConfigName=endpoint_config_name,
 ProductionVariants=[
 {
 "InstanceType": "ml.m4.xlarge",
 "InitialInstanceCount": 1,
 "InitialVariantWeight": 1,
 "ModelName": model_name,
 "VariantName": "AllTraffic",
 }
 ],
)

print("Endpoint Config Arn: " + create_endpoint_config_response["EndpointConfigArn"])

In [None]:
%%time
import time

endpoint_name = "BYOM-XGBoostEndpoint-" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print(endpoint_name)
create_endpoint_response = sm_client.create_endpoint(
 EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name
)
print(create_endpoint_response["EndpointArn"])

resp = sm_client.describe_endpoint(EndpointName=endpoint_name)
status = resp["EndpointStatus"]
print("Status: " + status)

while status == "Creating":
 time.sleep(60)
 resp = sm_client.describe_endpoint(EndpointName=endpoint_name)
 status = resp["EndpointStatus"]
 print("Status: " + status)

print("Arn: " + resp["EndpointArn"])
print("Status: " + status)


Before you run the next cell, make sure you wait till the endpoint is created. The code will check the endpoint status every 60 seconds.
Make sure you see the status of the endpoint is "InService" then move to next cell.

In [None]:
import awswrangler as wr
test_df = wr.s3.read_csv(
 path=test_path, dataset=True, nrows=5, header=None
 )

In [None]:
import io
import csv

runtime_client = boto3.client("runtime.sagemaker")

data = test_df.iloc[:,1:].to_numpy()

results = []
csv_buffer = io.StringIO()
csv_writer = csv.writer(csv_buffer, delimiter=",")
for record in data:
 csv_writer.writerow(record)

response = runtime_client.invoke_endpoint(
 EndpointName=endpoint_name, ContentType="text/csv", Body=csv_buffer.getvalue()
 )
print("Predicted Class Probabilities: {}.".format(response["Body"].read().decode("ascii")))


## Batch Transform


In SageMaker Batch Transform, we introduced 3 new attributes - __input_filter__, __join_source__ and __output_filter__. In the below cell, we use the [SageMaker Python SDK](https://github.com/aws/sagemaker-python-sdk) to kick-off several Batch Transform jobs using different configurations of these 3 new attributes. Please refer to [this page](https://docs.aws.amazon.com/sagemaker/latest/dg/batch-transform-data-processing.html) to learn more about how to use them.


#### 1. Create a transform job with the default configurations
Let's first skip these 3 new attributes and inspect the inference results. We'll use it as a baseline to compare to the results with data processing.

In [None]:
from sagemaker.model import Model
xgb_model = Model(
 image_uri=container,
 model_data=model_url,
 role=role,
 name=model_file_name + strftime("%Y-%m-%d-%H-%M-%S", gmtime()),
 sagemaker_session=sagemaker_session,
)

#### 2. Join the input and the prediction results 
Now, let's associate the prediction results with their corresponding input records. We can also use the __input_filter__ to exclude the ID column easily and there's no need to have a separate file in S3.

* Set __input_filter__ to "$[1:]": indicates that we are excluding column 0 (the 'ID') before processing the inferences and keeping everything from column 1 to the last column (all the features or predictors) 
 
 
* Set __join_source__ to "Input": indicates our desire to join the input data with the inference results 

* Leave __output_filter__ to default ('$'), indicating that the joined input and inference results be will saved as output.

In [None]:
xgb_transformer = xgb_model.transformer(instance_count=2, instance_type="ml.m4.xlarge")

# content_type / accept and split_type / assemble_with are required to use IO joining feature
xgb_transformer.assemble_with = "Line"
xgb_transformer.accept = "text/csv"

# start a transform job
xgb_transformer.transform(test_path, 
 content_type="text/csv", 
 split_type="Line",
 input_filter="$[1:]",
 join_source="Input",
 )
xgb_transformer.wait()

Please wait until the batch transform job finishes before executing the following code.

Let's inspect the output of the Batch Transform job in S3. It should show the list of trips identified by their original feature columns and their corresponding predicted trip fares.

In [None]:
import json
import io
from urllib.parse import urlparse

def get_csv_output_from_s3(s3uri):
 parsed_url = urlparse(s3uri)
 bucket_name = parsed_url.netloc
 prefix = parsed_url.path[1:]
 s3 = boto3.client("s3")
 obj_key = s3.list_objects(Bucket=bucket_name, Prefix=prefix)["Contents"][0]["Key"]
 return s3.get_object(Bucket=bucket_name, Key=obj_key)["Body"].read().decode('utf-8')

In [None]:
!aws s3 ls $xgb_transformer.output_path/

In [None]:
output_df = get_csv_output_from_s3(xgb_transformer.output_path)
output_df.split('\n')[0]

## (Optional) Cleanup

In [None]:
sagemaker_session.delete_endpoint(endpoint_name)

---

## End of Lab 3