# Create a pipeline to perform batch inference utilizing a lambda step
Amazon SageMaker Pipelines offers machine learning (ML) application developers and operations engineers the ability to orchestrate SageMaker jobs and author reproducible ML pipelines.

This notebook shows how to create a pipeline that reads the latest model registered in a model registry and perform batch transformation on data. <BR>


## Notebook Overview
This notebook shows how to:
- Define a LambdaStep that returns the latest model to be used by the transformation
- Define a CreateModelStep that makes use of the model returned by the lambda
- Define a TransformStep that creates a batch transform step using the latest model

As a prerequisite, you need a trained model and a dataset. <BR>
The pretrained model comes with this repository and is added to a new model registry as part of the Setup process. <BR>
A sample dataset is also downloaded and is uploaded to Amazon s3 as part of the Setup process. The data used is the [UCI Machine Learning Abalone Dataset](https://archive.ics.uci.edu/ml/datasets/abalone) [1]. The aim for this task is to determine the age of an abalone from its physical measurements. At the core, this is a regression problem. However, we will use a pretrained model, as training the machine learning model is outside the scope of this notebook. If interested to see how you could use SageMaker Pipelines to train a model for this use-case, the example [Orchestrating Jobs with Amazon SageMaker Model Building Pipelines](https://github.com/aws/amazon-sagemaker-examples/blob/master/sagemaker-pipelines/tabular/abalone_build_train_deploy/sagemaker-pipelines-preprocess-train-evaluate-batch-transform.ipynb) shows exactly how to do this.


[1] Dua, D. and Graff, C. (2019). UCI Machine Learning Repository. Irvine, CA: University of California, School of Information and Computer Science.

## Pre-Setup - IAM role setup
Before you begin running this Notebook you need to set the right permissions for everything to work. Specifically in addition to the standard SageMaker permissions, what is needed is:
- To have permissions to create a lambda function
- To allow lambda to assume a role capable of loading data from the model registry

To get set-up, head over to the IAM Management Console, locate the AmazonSagemaker-ExecutionRole and add the following permission by clicking on *Attach Policy*

![iam_permissions](images/iam_permissions_lambdastep.png)

Next, click on *Trust relationships* and *Edit trust relationship*. Edit the Service parameter to look like: 

"Service": ["sagemaker.amazonaws.com", "lambda.amazonaws.com"]



*Please note that we are giving full access permissions to the SageMaker Execution Role as we will also be deleting the resources we create. In a real-world scenario we wouldn't need to delete the resources at the end and shouldn't therefore grant these permissions.*

*Also note, that in a production system you may use different roles for the execution of the pipeline and the execution of the lambda. This example is only a demonstration of the SageMaker functionality and does not suggest how to setup a production system.*


In [None]:
!pip install "sagemaker>=2.45.0" --upgrade

In [None]:
!pip install "boto3>=1.17.97" --upgrade

In [None]:
import sagemaker
import boto3
from sagemaker import get_execution_role

sm_client = boto3.client(service_name="sagemaker")
role = get_execution_role()
sagemaker_session = sagemaker.Session()
default_bucket = sagemaker_session.default_bucket()
region = boto3.Session().region_name


In [None]:
print("Execution role used by this notebook: ", role)

In [None]:
prefix = "batch-transform-with-lambda-step-pipeline-demo"
model_package_group_name = "lambdaBatchTransformPipelineModelPackageGroup"
lambda_function_name = "lambdaBatchTransformPipelineLambda"
pipeline_name = "BatchTransformPipelineWithLambdaStep"

# Setup
There are certain preparatory steps we need to perform. 


First we assume that some other process/pipeline has trained a model, registered it in the Model Registry and someone has approved the model to be used for inference. <BR>
For this we are going to use a pretrained model and create and register model to a new Model Registry.


Secondly, we assume that some other process is generating the dataset on which we want to perform the predictions and saved this in some s3 location known to us. <BR>
For this we are going to dowload a dataset from a public location and upload it to an s3 bucket that we will then provide as input to the pipeline. 

### Create a model registry and register model

Upload the model to Amazon s3

In [None]:
base_uri = f"s3://{default_bucket}/{prefix}/trained_model"

model_location = sagemaker.s3.S3Uploader.upload(
    local_path="model.tar.gz",
    desired_s3_uri=base_uri,
)

image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost", region=region, version="1.0-1", py_version="py3"
)

Create a model package group and create a model package. Notice that we add the model directly with a status of `Approved`

In [None]:
_ = sm_client.create_model_package_group(
    ModelPackageGroupName=model_package_group_name,
    ModelPackageGroupDescription="Demo model package group",
)

In [None]:
_ = sm_client.create_model_package(
    ModelPackageGroupName=model_package_group_name,
    ModelApprovalStatus="Approved",
    InferenceSpecification={
        "Containers": [
            {
                "Image": image_uri,
                "ModelDataUrl": model_location,
            },
        ],
        "SupportedTransformInstanceTypes": ["ml.m5.large"],
        "SupportedRealtimeInferenceInstanceTypes": ["ml.m5.large"],
        "SupportedContentTypes": [
            "text/csv",
        ],
        "SupportedResponseMIMETypes": [
            "text/csv",
        ],
    },
)

Checking that the model was registered successfully. The function `get_approved_model` will also be used by the lambda function later to find the latest approved model

In [None]:
from utils import get_approved_package

get_approved_package(model_package_group_name)

### Collect data and upload to own Amazon s3 location

In [None]:
local_path = "abalone-dataset-batch"

s3 = boto3.resource("s3")
s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file(
    "dataset/abalone-dataset-batch", local_path
)

base_uri = f"s3://{default_bucket}/{prefix}/batch_data"
batch_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=base_uri,
)

# Build pipeline
We are now building the SageMaker Pipeline.


We begin by defining the Pipeline parameters. These parameters can change between consecutive pipeline runs,
but if not set, the default values will be used. You can add any parameter that you want to be able to change
amongst executions of the pipeline as a Pipeline Parameter.

In [None]:
from sagemaker.workflow.parameters import ParameterString

batch_data = ParameterString(
    name="BatchData",
    default_value=batch_data_uri,
)


Then, we define the lambda step that is responsible for fetching the latest model from the model registry

In [None]:
from sagemaker.lambda_helper import Lambda

# Lambda helper class can be used to create the Lambda function
func = Lambda(
    function_name=lambda_function_name,
    execution_role_arn=role,
    script="lambda_step_code.py",
    handler="lambda_step_code.handler",
    timeout=600,
    memory_size=128,
)

In [None]:
from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum,
)

step_latest_model_fetch = LambdaStep(
    name="fetchLatestModel",
    lambda_func=func,
    inputs={
        "model_package_group_name": model_package_group_name,
    },
    outputs=[
        LambdaOutput(output_name="ModelUrl", output_type=LambdaOutputTypeEnum.String), 
        LambdaOutput(output_name="ImageUri", output_type=LambdaOutputTypeEnum.String), 
    ],
)

At the next step we define the model.

Note that we use the output of the previous step, which is the output of the lambda function,
as the input for this step.

In [None]:
from sagemaker.model import Model

model = Model(
    image_uri=step_latest_model_fetch.properties.Outputs["ImageUri"],
    model_data=step_latest_model_fetch.properties.Outputs["ModelUrl"],
    sagemaker_session=sagemaker_session,
    role=role,
)

In [None]:
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.steps import CreateModelStep


inputs = CreateModelInput(
    instance_type="ml.m5.large",
)
step_create_model = CreateModelStep(
    name="CreateModel",
    model=model,
    inputs=inputs,
)

The final step in the pipeline is the Transformer Step, which is the step when the batch transformation
of the data takes place.

In [None]:
from sagemaker.transformer import Transformer


transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.large",
    instance_count=1,
    output_path=f"s3://{default_bucket}/AbaloneTransform",
)

In [None]:
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep


step_transform = TransformStep(
    name="Transform", transformer=transformer, inputs=TransformInput(data=batch_data)
)

To tie together the above steps, we define a Pipeline object as below.

In [None]:
from sagemaker.workflow.pipeline import Pipeline


pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        batch_data,
    ],
    steps=[step_latest_model_fetch, step_create_model, step_transform],
)

In [None]:
import json


definition = json.loads(pipeline.definition())
# definition

# Execute pipeline

In [None]:
pipeline.upsert(role_arn=role)

In [None]:
execution = pipeline.start()

In [None]:
execution.wait()

Once you start the execution of the pipeline, head over to the Pipelines SageMaker Resources (shown in the picture below)

![pipelineexecution](images/pipelineexecution.png)

Feel free to double-click on the executing pipeline to get more details on the progress of the execution.

Double-clicking on the pipeline execution (or on the tab named "Graph") will open a page where SageMaker constracts
the DAG that represents the Pipeline you defined. If the pipeline execution is opened, the progress of the execution
of the pipeline will be represented using a standard colouring format (green for succeeded, blue for running,
red for failed), like in the image below:

![pipelinegraph](images/pipelinegraph.png)


The pipeline will be finished and marked green, in about 4 minutes. You may need to refresh the page to see all
these steps being marked green. Lastly, you can click on the "Transform" step which will open the details of the step,
including the data output location on Amazon s3 where the transformed data have been saved.

# Clean-up

You have successfully run this notebook.

Run the following steps to delete all resources created by this example. <BR>
Please note that the following are not deleting data from your Amazon s3 bucket, including the original trained model, the original data, the transformed data as well as any intermediate data created by the Pipeline Steps

In [None]:
sm_client.delete_pipeline(PipelineName=pipeline_name)

In [None]:
from utils import get_approved_package

arn_of_model_to_delete = get_approved_package(model_package_group_name)["ModelPackageArn"]
sm_client.delete_model_package(
    ModelPackageName=arn_of_model_to_delete,
)
sm_client.delete_model_package_group(
    ModelPackageGroupName=model_package_group_name,
)

In [None]:
lambda_client = boto3.client("lambda")
lambda_client.delete_function(FunctionName=lambda_function_name)