# Create a pipeline to perform batch inference utilizing a custom callback step
Amazon SageMaker Pipelines offers machine learning (ML) application developers and operations engineers the ability to orchestrate SageMaker jobs and author reproducible ML pipelines.

This notebook shows how to create a pipeline that reads the latest model registered in a model registry and perform batch transformation on data. <BR>
The architecture that this notebook builds is shown in the diagram below:

![SM-Pipelines-callback-and-batch-transform](images/SM-Pipelines-callback-and-batch-transform.png)


## Notebook Overview
This notebook shows how to:
- Define a CallBackStep that is able to integrate with almost any other AWS service outside the realm of SageMaker
- Create an SQS queue and a Lambda function that returns the latest model to be used by the transformation
- Define a CreateModelStep that makes use of the model returned by the lambda
- Define a TransformStep that creates a batch transform step using the latest model

As a prerequisite, you need a trained model and a dataset. <BR>
The pretrained model comes with this repository and is added to a new model registry as part of the Setup process. <BR>
A sample dataset is also downloaded and is uploaded to Amazon s3 as part of the Setup process. The data used is the [UCI Machine Learning Abalone Dataset](https://archive.ics.uci.edu/ml/datasets/abalone) [1]. The aim for this task is to determine the age of an abalone from its physical measurements. At the core, this is a regression problem. However, we will use a pretrained model, as training the machine learning model is outside the scope of this notebook. If interested to see how you could use SageMaker Pipelines to train a model for this use-case, the example [Orchestrating Jobs with Amazon SageMaker Model Building Pipelines](https://github.com/aws/amazon-sagemaker-examples/blob/master/sagemaker-pipelines/tabular/abalone_build_train_deploy/sagemaker-pipelines-preprocess-train-evaluate-batch-transform.ipynb) shows exactly how to do this.


[1] Dua, D. and Graff, C. (2019). UCI Machine Learning Repository. Irvine, CA: University of California, School of Information and Computer Science.

## Pre-Setup - IAM role setup
Before you begin running this Notebook you need to set the right permissions for everything to work. Specifically in addition to the standard SageMaker permissions, what is needed is:
- To have permissions to create an SQS queue and a lambda function
- Have a lambda role capable of reading from the queue and the model registry to then return the results to SageMaker

To get set-up, head over to the IAM Management Console, locate the AmazonSagemaker-ExecutionRole and add the following two permissions

![iam_permissions](images/iam_permissions.png)

*Please note that we are giving full access permissions to the SageMaker Execution Role as we will also be deleting the resources we create. In a real-world scenario we wouldn't need to delete the resources at the end and shouldn't therefore grant these permissions.*


Next, **create a new role** to be used for the lambda function that will include the Managed policy `AWSLambdaSQSQueueExecutionRole`
as well as the inline policy:

```
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sagemaker:DescribeModelPackage",
                "sagemaker:ListModelPackageGroups",
                "sagemaker:ListModelPackages",
                "sagemaker:SendPipelineExecutionStepSuccess",
                "sagemaker:SendPipelineExecutionStepFailure"
            ],
            "Resource": "*"
        }
    ]
}
```
*(instructions on how to create an IAM role can be found [here](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-service.html#roles-creatingrole-service-console))*


Once you create this new role, take a note of the role arn and insert it below.

In [None]:
!pip install "sagemaker>=2.45.0" --upgrade

In [None]:
!pip install "boto3>=1.17.97" --upgrade

In [None]:
import iam_utils
lambda_role_arn = "INSERT THE LAMBDA ROLE ARN HERE"

In [None]:
import sagemaker
import boto3
from sagemaker import get_execution_role

sm_client = boto3.client(service_name="sagemaker")
role = get_execution_role()
sagemaker_session = sagemaker.Session()
default_bucket = sagemaker_session.default_bucket()
region = boto3.Session().region_name


In [None]:
print("Execution role used by this notebook: ", role)

In [None]:
prefix = "batch-transform-with-callback-pipeline-demo"
model_package_group_name = "callbackBatchTransformPipelineModelPackageGroup"
sqs_queue_name = "callbackBatchTransformPipelineSQS"
lambda_function_name = "callbackBatchTransformPipelineLambda"
pipeline_name = "BatchTransformPipelineWithCallbackStep"

# Setup
There are certain preparatory steps we need to perform. 


First we assume that some other process/pipeline has trained a model, registered it in the Model Registry and someone has approved the model to be used for inference. <BR>
For this we are going to use a pretrained model and create and register model to a new Model Registry.


Secondly, we assume that some other process is generating the dataset on which we want to perform the predictions and saved this in some s3 location known to us. <BR>
For this we are going to dowload a dataset from a public location and upload it to an s3 bucket that we will then provide as input to the pipeline. 

### Create a model registry and register model

Upload the model to Amazon s3

In [None]:
base_uri = f"s3://{default_bucket}/{prefix}/trained_model"

model_location = sagemaker.s3.S3Uploader.upload(
    local_path="model.tar.gz",
    desired_s3_uri=base_uri,
)

image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost", region=region, version="1.0-1", py_version="py3"
)

Create a model package group and create a model package. Notice that we add the model directly with a status of `Approved`

In [None]:
_ = sm_client.create_model_package_group(
    ModelPackageGroupName=model_package_group_name,
    ModelPackageGroupDescription="Demo model package group",
)

In [None]:
_ = sm_client.create_model_package(
    ModelPackageGroupName=model_package_group_name,
    ModelApprovalStatus="Approved",
    InferenceSpecification={
        "Containers": [
            {
                "Image": image_uri,
                "ModelDataUrl": model_location,
            },
        ],
        "SupportedTransformInstanceTypes": ["ml.m5.large"],
        "SupportedRealtimeInferenceInstanceTypes": ["ml.m5.large"],
        "SupportedContentTypes": [
            "text/csv",
        ],
        "SupportedResponseMIMETypes": [
            "text/csv",
        ],
    },
)

Checking that the model was registered successfully. The function `get_approved_model` will also be used by the lambda function later to find the latest approved model

In [None]:
from utils import get_approved_package

get_approved_package(model_package_group_name)

### Collect data and upload to own Amazon s3 location

In [None]:
local_path = "abalone-dataset-batch"

s3 = boto3.resource("s3")
s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file(
    "dataset/abalone-dataset-batch", local_path
)

base_uri = f"s3://{default_bucket}/{prefix}/batch_data"
batch_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=base_uri,
)

# Create SQS Queue
This SQS Queue is used by the CallBackStep of the pipeline to publish a message to. Then the execution of the pipeline pauses until this message is consumed (by any service that is configured to consume this queue - in this case a lambda).

In [None]:
sqs_client = boto3.client("sqs")
sqs_client.create_queue(QueueName=sqs_queue_name)

queue_url = sqs_client.get_queue_url(QueueName=sqs_queue_name)["QueueUrl"]
queue_arn = sqs_client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["QueueArn"])[
    "Attributes"
]["QueueArn"]

# Create Lambda (Consumer of the queue)
This lambda function is configured to read messages from the newly created queue, retrieves the latest approved model and then returns the retrieved details of the model along a success message to the pipeline so that it continues execution.


Since the CallBackStep functionality was introduced at version of `1.17.97` in boto3, in order for the lambda function to return a success message & output to the pipeline we need to be using at least that version of boto3. <BR>
Since the current runtime of lambda is an older version (check [this](https://docs.aws.amazon.com/lambda/latest/dg/lambda-python.html)) we need to create a lambda layer with the latest boto3 version. If you are not familiar with lambda layers, it is essentially an archive containing additional code, such as libraries, dependencies, or even custom runtimes. This [blogpost](https://aws.amazon.com/blogs/compute/using-lambda-layers-to-simplify-your-development-process/#:~:text=A%20Lambda%20layer%20is%20an,dependencies%2C%20or%20even%20custom%20runtimes.&text=By%20moving%20runtime%20dependencies%20from,archive%20uploaded%20during%20a%20deployment.) might help.

### Create lambda layer
To create the layer, we are <i>installing</i> the latest boto3 version in a local folder and we zip this folder in a file that is then used to publish the layer.

*`%%capture` is added to suppress the output of the following commands since that is very long. Feel free to comment out to view all output


In [None]:
%%capture

!mkdir boto3-mylayer
!pip install boto3 -t ./latestboto3

!apt-get update
!apt-get install zip

!zip -r latestboto3.zip latestboto3/

In [None]:
lambda_client = boto3.client("lambda")

with open("latestboto3.zip", "rb") as file_data:
    bytes_content = file_data.read()

layer_name = "myboto3layer"
lambda_layer = lambda_client.publish_layer_version(
    LayerName=layer_name, Content={"ZipFile": bytes_content}
)
layer_version_arn = lambda_layer["LayerVersionArn"]
layer_version_number = lambda_layer["Version"]

### Create the lambda function

Take a look at the core part of the lambda function below. <BR>
The lambda function does exactly 3 things. 
1. Retrieves a token from the message - this is originally inserted by the callBackStep of the Pipeline (will see that later)
2. Retrieves the latest model from the registry. We are particularly interested in the Image URI and the Model artifact location
3. Returns the model details along with the token to the pipeline by calling the `send_pipeline_execution_step_success` function

In [None]:
!sed -n '18,56p' < utils.py | pygmentize -l python

In [None]:
lambda_client = boto3.client("lambda")


def lambda_file():
    import os
    from zipfile import ZipFile

    filename = "lambdacode.zip"

    zipObj = ZipFile(filename, "w")
    zipObj.write("utils.py")
    zipObj.close()

    with open(filename, "rb") as file_data:
        bytes_content = file_data.read()
    return bytes_content


lambda_client.create_function(
    FunctionName=lambda_function_name,
    Runtime="python3.8",
    Role=lambda_role_arn,
    Handler="utils.handler",
    Code={"ZipFile": lambda_file()},
    Publish=True,
    PackageType="Zip",
    Layers=[layer_version_arn],
    Environment={"Variables": {"ModelPackageGroupName": model_package_group_name}},
)

### Create event source mapping
This event source mapping is triggering the lambda function on arrival of a new message to the SQS queue.

In [None]:
event_source_mapping = lambda_client.create_event_source_mapping(
    EventSourceArn=queue_arn, FunctionName=lambda_function_name, Enabled=True
)

event_mapping_uuid = event_source_mapping["UUID"]

# Build pipeline
We are now building the SageMaker Pipeline.


We begin by defining the Pipeline parameters. These parameters can change between consecutive pipeline runs,
but if not set, the default values will be used. You can add any parameter that you want to be able to change
amongst executions of the pipeline as a Pipeline Parameter.

In [None]:
from sagemaker.workflow.parameters import ParameterString

batch_data = ParameterString(
    name="BatchData",
    default_value=batch_data_uri,
)
sqs_queue_to_use = ParameterString(
    name="SQSQueue",
    default_value=sqs_queue_name,
)

The first step we will define is the callBack Step. When executed, this step will add a message to the SQS queue
that will be picked up by the lambda function that we defined above. The output of the lambda function
will be mapped to the CallBackOutput below so that it can be used later on.

In [None]:
from sagemaker.workflow.callback_step import CallbackStep, CallbackOutput, CallbackOutputTypeEnum

step_latest_model_fetch = CallbackStep(
    name="fetchLatestModel",
    sqs_queue_url=sqs_queue_to_use,
    inputs={},
    outputs=[
        CallbackOutput(output_name="ModelUrl", output_type=CallbackOutputTypeEnum.String),
        CallbackOutput(output_name="ImageUri", output_type=CallbackOutputTypeEnum.String),
    ],
)

At the next step we define the model.

Note that we use the output of the previous step, which is the output of the lambda function,
as the input for this step.

In [None]:
from sagemaker.model import Model

model = Model(
    image_uri=step_latest_model_fetch.properties.Outputs["ImageUri"],
    model_data=step_latest_model_fetch.properties.Outputs["ModelUrl"],
    sagemaker_session=sagemaker_session,
    role=role,
)

In [None]:
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.steps import CreateModelStep


inputs = CreateModelInput(
    instance_type="ml.m5.large",
)
step_create_model = CreateModelStep(
    name="CreateModel",
    model=model,
    inputs=inputs,
)

The final step in the pipeline is the Transformer Step, which is the step when the batch transformation
of the data takes place.

In [None]:
from sagemaker.transformer import Transformer


transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.large",
    instance_count=1,
    output_path=f"s3://{default_bucket}/AbaloneTransform",
)

In [None]:
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep


step_transform = TransformStep(
    name="Transform", transformer=transformer, inputs=TransformInput(data=batch_data)
)

To tie together the above steps, we define a Pipeline object as below.

In [None]:
from sagemaker.workflow.pipeline import Pipeline


pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        batch_data,
        sqs_queue_to_use,
    ],
    steps=[step_latest_model_fetch, step_create_model, step_transform],
)

In [None]:
import json


definition = json.loads(pipeline.definition())
# definition

# Execute pipeline

In [None]:
pipeline.upsert(role_arn=role)

In [None]:
execution = pipeline.start()

Once you start the execution of the pipeline, head over to the Pipelines SageMaker Resources (shown in the picture below)

![pipelineexecution](images/pipelineexecution.png)

Feel free to double-click on the executing pipeline to get more details on the progress of the execution.

Double-clicking on the pipeline execution (or on the tab named "Graph") will open a page where SageMaker constracts
the DAG that represents the Pipeline you defined. If the pipeline execution is opened, the progress of the execution
of the pipeline will be represented using a standard colouring format (green for succeeded, blue for running,
red for failed), like in the image below:

![pipelinegraph](images/pipelinegraph.png)

In case you see the following error instead of seeing the graph
> "An error occurred loading the pipeline execution graph"

Then all you need to do is to restart your SageMaker Studio server so that it can be updated to the latest version,
 which you can do as explained in
[How to Update SageMaker Studio](https://docs.aws.amazon.com/sagemaker/latest/dg/studio-tasks-update-studio.html)


The pipeline will be finished and marked green, in about 4 minutes. You may need to refresh the page to see all
these steps being marked green. Lastly, you can click on the "Transform" step which will open the details of the step,
including the data output location on Amazon s3 where the transformed data have been saved.

# Clean-up

You have successfully run this notebook.

Run the following steps to delete all resources created by this example. <BR>
Please note that the following are not deleting data from your Amazon s3 bucket, including the original trained model, the original data, the transformed data as well as any intermediate data created by the Pipeline Steps

In [None]:
sm_client.delete_pipeline(PipelineName=pipeline_name)

In [None]:
from utils import get_approved_package

arn_of_model_to_delete = get_approved_package(model_package_group_name)["ModelPackageArn"]
sm_client.delete_model_package(
    ModelPackageName=arn_of_model_to_delete,
)
sm_client.delete_model_package_group(
    ModelPackageGroupName=model_package_group_name,
)

In [None]:
sqs_client.delete_queue(QueueUrl=queue_url)

In [None]:
lambda_client.delete_event_source_mapping(UUID=event_mapping_uuid)

In [None]:
lambda_client.delete_function(FunctionName=lambda_function_name)

In [None]:
lambda_client.delete_layer_version(LayerName=layer_name, VersionNumber=layer_version_number)

In [None]:
!rm -rf latestboto3/

In [None]:
!rm latestboto3.zip

In [None]:
!rm lambdacode.zip