# MLOps: Repeatable - Automate machine learning workflows

## Contents

- [Introduction](#Introduction)
- [Model Build pipeline with SageMaker Pipelines](#Training-pipeline-with-SageMaker-Pipelines)
 - [Pipeline inputs](#Pipeline-inputs)
 - [SageMaker Processing step](#SageMaker-Processing-step)
 - [SageMaker Training step](#SageMaker-Training-step)
 - [Model evaluation step](#Model-evaluation-step)
 - [Register model in Model Registry step](#Register-model-in-Model-Registry-step)
 - [Define & create the training pipeline](#Assemble-the-training-pipeline)
 - [Execute the training pipeline](#Execute-the-training-pipeline)
- [Deployment pipeline (real-time endpoint) with SageMaker Pipelines](#Deployment-pipeline-with-SageMaker-Pipelines)
 - [Assemble the deployment pipeline](#Assemble-the-deployment-pipeline)
 - [Execute the deployment pipeline](#Execute-the-deployment-pipeline)
 - [Test the SageMaker endpoint](#Test-the-SageMaker-endpoint)

## Introduction

This is our fourth notebook (Lab 2) which will dive deep into automating machine learning workflows to create a more repeatable path to production. 

Here, we will put on the hat of a `ML Engineer` and perform the tasks required to automate the tasks within our machine learning workflows as well as orchestrate the steps. For this, we'll build pipeline steps that include all the previous notebooks components into one singular entity. This pipeline entity accomplishes a repeatable ML workflow with some reliability built in through quality minimal quality gates. 

For this task we will be using Amazon SageMaker Pipelines capabilities to build out an end-to-end machine learning pipeline. 

![Notebook4](images/Notebook-4.png)

Keep in mind, CI/CD practicies are typically more aligned with the *Reliable* stage so you'll notice we have not yet considered a more robust set of pipelines that considers the lifecycle of each stage (build vs deploy), source/version control, automated triggers, or additional quality gates. 

Let's get started!

**Imports**

In [None]:
!pip install -U sagemaker

In [None]:
%store -r

In [None]:
# Processing imports
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput, ScriptProcessor

# SageMaker Pipeline imports
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, TrainingStep, CreateModelStep, TransformStep
from sagemaker.workflow.model_step import ModelStep

from sagemaker.workflow.parameters import (
 ParameterInteger,
 ParameterString,
)

# Other imports
import json
import time
from time import gmtime, strftime
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.model import Model
from sagemaker.tuner import IntegerParameter, HyperparameterTuner
from sagemaker.inputs import TrainingInput
from sagemaker.lambda_helper import Lambda
from sagemaker.workflow.lambda_step import (
 LambdaStep,
 LambdaOutput,
 LambdaOutputTypeEnum,
)

# To test the endpoint once it's deployed
from sagemaker.predictor import Predictor
from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import JSONDeserializer, CSVDeserializer
from sagemaker.workflow.pipeline_context import PipelineSession
import sagemaker
import json
import boto3
from sagemaker.model_metrics import ModelMetrics, MetricsSource
import pandas as pd
from sagemaker.feature_store.feature_group import FeatureGroup
from helper_library import *

from sagemaker.workflow.steps import CacheConfig

**Session variables**

In [None]:
# Useful SageMaker variables
session = PipelineSession()
bucket = session.default_bucket()
role_arn= sagemaker.get_execution_role()
region = session.boto_region_name
sagemaker_client = boto3.client('sagemaker')
aws_account_id = boto3.client('sts').get_caller_identity().get('Account')
lambda_role = create_lambda_iam_role('LambdaSageMakerExecutionRole')

## Model Build pipeline with SageMaker Pipelines

[Amazon SageMaker Pipelines](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-sdk.html) provides the ability to create a directed acryclic graph (DAG) containing the pipeline steps need to build and/or deploy machine learning models. Each pipeline, created through the provided Python SDK, is a series of interconnected steps. This same pipeline can also be exported as a JSON pipeline definition. 

The structure of a pipeline's DAG is determined by the data dependencies between steps. These data dependencies are created when the properties of a step's output are passed as the input to another step. The following image is a pipeline DAG that we'll be creating for our training pipeline:

![](images/sagemaker-pipelines-dag.png)

#### Pipeline Parameters

SageMaker Pipelines supports [pipeline parameters](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-parameters.html) allowing you to provide runtime parameters for each run of your pipeline. This allows you to change key inputs for each run of your pipeline without changing your pipeline code (ex. raw data on input)

Here, we'll identify the parameters and set the parameter default. You can also use this feature to make it reusable (you'll be able to override these inputs upon executing the pipeline later in the notebook).

In [None]:
processing_instance_count = ParameterInteger(
 name='ProcessingInstanceCount',
 default_value=1
)

#### Setup Step Caching Configuration

This configuration can be enabled on pipeline steps to allow SageMaker Pipelines to automatically check if a previous (successful) run of a pipeline step with the same values for specific parameters is found. If it is found, Pipelines propogates the results of that step to the next step without re-running the step saving both time and compute costs.

In [None]:
cache_config = CacheConfig(enable_caching=True, expire_after="PT12H")

#### SageMaker Processing step

This should look very similar to the SageMaker Processing Job you configured in notebook 2. The only new line of code is the `ProcessingStep` line at the bottom of the cell below which allows us to take the Processing Job configuration and include it as a pipeline step. 

In [None]:
preprocess_data_processor = SKLearnProcessor(
 framework_version='0.23-1',
 role=role_arn,
 instance_type='ml.m5.xlarge',
 instance_count=processing_instance_count,
 base_job_name='preprocess-data',
 sagemaker_session=session,
)

preprocess_dataset_step = ProcessingStep(
 name='PreprocessData',
 code='./pipeline_scripts/preprocessing.py',
 processor=preprocess_data_processor,
 inputs=[
 ProcessingInput(
 source=raw_s3,
 destination='/opt/ml/processing/input',
 s3_data_distribution_type='ShardedByS3Key'
 )
 ],
 outputs=[
 ProcessingOutput(
 output_name='train',
 destination=f'{output_path}/train',
 source='/opt/ml/processing/output/train'
 ),
 ProcessingOutput(
 output_name='validation',
 destination=f'{output_path}/validation',
 source='/opt/ml/processing/output/validation'
 ),
 ProcessingOutput(
 output_name='test',
 destination=f'{output_path}/test',
 source='/opt/ml/processing/output/test'
 )
 ],
 cache_config=cache_config
)

#### SageMaker Training step

This configuration should also look very similar to the SageMaker Training job you did in notebook 2. The only new line of code is the `TrainingStep` line at the bottom of the cell below to allow us to run the training job as a step in our pipeline.

In [None]:
 # Tuned hyperparameters
hyperparameters = {
 'max_depth': 8, 
 'n_jobs': 4, 
 'n_estimators': 80}

train_instance_type = 'ml.c5.xlarge'

# Metrics to be captured from logs.
metric_definitions = [{'Name': 'r_squared',
 'Regex': 'r-squared: ([0-9\\.]+)'},
 {'Name': 'mse',
 'Regex': 'MSE: ([0-9\\.]+)'}]

estimator_parameters = {
 'entry_point': './pipeline_scripts/train_deploy_scikitlearn_randomforestregressor.py',
 'framework_version': '0.23-1',
 'py_version': 'py3',
 'instance_type': train_instance_type,
 'instance_count': 1,
 'hyperparameters': hyperparameters,
 'role': role_arn,
 'metric_definitions': metric_definitions,
 'base_job_name': 'randomforestregressor-model',
 'output_path': f's3://{bucket}/{s3_prefix}/'
}

estimator = SKLearn(**estimator_parameters)

training_step = TrainingStep(
 name='TrainModel',
 estimator=estimator,
 inputs={
 'train': TrainingInput(
 s3_data=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[
 'train'
 ].S3Output.S3Uri
 ),
 'validation': TrainingInput(
 s3_data=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[
 'validation'
 ].S3Output.S3Uri
 )
 },
 cache_config=cache_config
)

#### Model evaluation step

After the training step in our pipeline, we'll want to then evaluate our model's performance. To do that, we can create a SageMaker Processing Step that will utilize evaluation code (evaluation.py) that we specify to perform evaluation of the model using the test hold-out dataset that is output of the preprocess data step configured above. 

In [None]:
evaluation_processor = SKLearnProcessor(
 framework_version='0.23-1',
 role=role_arn,
 instance_type='ml.m5.xlarge',
 instance_count=processing_instance_count,
 base_job_name='evaluation',
 sagemaker_session=session,
)

In [None]:
# Specify where we'll store the model evaluation results so
# that other steps can access those results
evaluation_report = PropertyFile(
 name='EvaluationReport',
 output_name='evaluation',
 path='evaluation.json',
)

evaluation_step = ProcessingStep(
 name='EvaluateModel',
 processor=evaluation_processor,
 inputs=[
 ProcessingInput(
 source=training_step.properties.ModelArtifacts.S3ModelArtifacts,
 destination='/opt/ml/processing/model',
 ),
 ProcessingInput(
 source=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs['test'].S3Output.S3Uri,
 destination='/opt/ml/processing/test',
 ),
 ],
 outputs=[
 ProcessingOutput(
 output_name='evaluation', source='/opt/ml/processing/evaluation'
 ),
 ],
 code='./pipeline_scripts/evaluation.py',
 property_files=[evaluation_report],
)

#### Register model in Model Registry step

Once we've evaluated the model's peformance, we'll want to register the model version in SageMaker Model Registery IF the model is performing well according to the conditional criteria we've identified. For this, key metadata for each pipeline step will be captured in the model registry. 

The conditional step will be setup after this step because the model registration will have a dependency of meeting the objective criteria defined so the step must be defined before we can reference it in the conditional step. 

In [None]:
model_metrics = ModelMetrics(
 model_statistics=MetricsSource(
 s3_uri='{}/evaluation.json'.format(
 evaluation_step.arguments['ProcessingOutputConfig']['Outputs'][0]['S3Output'][
 'S3Uri'
 ]
 ),
 content_type='application/json',
 )
)

model = Model(
 image_uri=estimator.training_image_uri(),
 model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
 entry_point=estimator.entry_point,
 role=role_arn,
 sagemaker_session=session
)

model_registry_args = model.register(
 content_types=['text/csv'],
 response_types=['application/json'],
 inference_instances=['ml.t2.medium', 'ml.m5.xlarge'],
 transform_instances=['ml.m5.xlarge'],
 model_package_group_name=model_package_group_name,
 approval_status='PendingManualApproval',
 model_metrics=model_metrics
)

register_step = ModelStep(
 name='RegisterModel',
 step_args=model_registry_args
)

Because we only want to register the model if its performance meets a predefined threshold that we set, we now need to create a Condition Step that says if our model's MSE values is less than 320,000,000 then we'll registery the model.

In [None]:
# Condition step for evaluating model quality and branching execution

cond_lte = ConditionLessThanOrEqualTo(
 left=JsonGet(
 step_name=evaluation_step.name,
 property_file=evaluation_report,
 json_path='regression_metrics.mse.value',
 ),
 right=320000000.0,
)
condition_step = ConditionStep(
 name='CheckEvaluation',
 conditions=[cond_lte],
 if_steps=[register_step],
 else_steps=[],
)

#### Define & create the training pipeline

Next, we need to define the pipeline, `Pipeline()`, and then create the pipeline, `upsert()`. 

When defining the pipeline, the steps do not need to be in the order to be executed. SageMaker will automatically infer order based on dependencies between the steps. 

In [None]:
# pipeline_name = 'synthetic-housing-training-pipeline-{}'.format(strftime('%d-%H-%M-%S', gmtime()))
pipeline_name = 'synthetic-housing-training-pipeline'
step_list = [preprocess_dataset_step,
 training_step,
 evaluation_step,
 condition_step]

training_pipeline = Pipeline(
 name=pipeline_name,
 parameters=[
 processing_instance_count
 ],
 steps=step_list
)

# Note: If an existing pipeline has the same name it will be overwritten.
training_pipeline.upsert(role_arn=role_arn)

# Viewing the pipeline definition will all the string variables interpolated may help debug pipeline bugs. It is commented out here due to length.
#json.loads(training_pipeline.definition())

#### Execute the training pipeline

In [None]:
# This is where you could optionally override parameter defaults 
execution = training_pipeline.start()

Programmatically check on status of pipeline using describe() 

In [None]:
execution.describe()

In [None]:
execution.list_steps()

To visualize you pipeline steps as well as monitor and debug pipeline runs, you can utilize Studio's Pipeline interface by selecting **Home** from the left-hand menu then choosing **Pipelines** and navigating to the pipeline execution in process. You'll see a DAG similar to the one below where steps that have completed are green, steps in process are blue, and steps that have not yet run are in grey. You can also click on a step to understand inputs, outputs, logs (for debugging), as well as step metadata. 


![Pipelines](images/pipelines-view.png)


(Optional) If you have time, feel free to start the pipeline again. Because we enabled step caching, you'll notice that SageMaker Pipelines is able to identify that the previous steps ran with the same input as the prior execution and you'll see cache hits identified by Pipelines. This allows you to avoid unnecessarily re-running pipeline steps saving not only time but cost of recomputing unnecessary tasks. 

![Pipelines](images/cache-hit.png)



## Deployment pipeline with SageMaker Pipelines

Now let's create a separate pipeline that will take the model that was registered in Model Registry and deploy it as a SageMaker hosted endpoint. This is include to show how you could use SageMaker Pipelines to deploy a real-time endpoint but you should also consider the advantages of deploying through a CD pipeline that utilizes IaC/CaC for more advanced deployment strategies and rollback capabilities. 

For a real-time endpoint, you'll need to utilize a Lambda step (which is a great option for custom logic); however, for batch use cases there is a native SageMaker Pipeline step for a SageMaker Batch Transform to orchestrate your batch inference pipelines. 

First we'll specify the input parameters to our deployment pipeline so that we can reuse it for other use cases.

In [None]:
model_name = ParameterString(
 name='ModelName',
 default_value='my-awesome-model'
)

Next, we'll create a Lambda function that will pull the specified model (or latest approved model) from the Model Registry and deploy as a Sagemaker endpoint.

In [None]:
lambda_name = 'sagemaker-pipelines-deploy-model'

lambda_function = Lambda(
 function_name=lambda_name,
 execution_role_arn=lambda_role,
 script='./pipeline_scripts/lambda_deploy.py',
 handler='lambda_deploy.lambda_handler',
 timeout=600,
 memory_size=3000,
)

try:
 lambda_function_response = lambda_function.create()
 lambda_function_arn = lambda_function_response['FunctionArn']
 print(f'Lambda function arn: {lambda_function_arn}')
except:
 print('Lambda function already exists!')

Now we'll create a Lambda step for our pipeline and associate it with the new Lambda function we just created.

In [None]:
# The dictionary retured by the Lambda function is captured by LambdaOutput, each key in the dictionary corresponds to a
# LambdaOutput

output_param_1 = LambdaOutput(output_name='statusCode', output_type=LambdaOutputTypeEnum.String)
output_param_2 = LambdaOutput(output_name='body', output_type=LambdaOutputTypeEnum.String)

deploy_lambda_step = LambdaStep(
 name='LambdaStepDeploy',
 lambda_func=lambda_function,
 inputs={
 'region': region,
 'aws_account_id': aws_account_id,
 'model_package_group_name': model_package_group_name,
 'model_name': model_name,
 'instance_count': 1,
 'role_arn': role_arn
 },
 outputs=[
 output_param_1, 
 output_param_2
 ],
)

Excellent, now we just need to assemble the pipeline.

#### Define & create the deployment pipeline

In [None]:
# pipeline_name = 'synthetic-housing-deployment-pipeline-{}'.format(strftime('%d-%H-%M-%S', gmtime()))
pipeline_name = 'synthetic-housing-deployment-pipeline'
step_list = [deploy_lambda_step]

deployment_pipeline = Pipeline(
 name=pipeline_name,
 parameters=[
 model_name
 ],
 steps=step_list
)

# Note: If an existing pipeline has the same name it will be overwritten.
deployment_pipeline.upsert(role_arn=role_arn)

# Viewing the pipeline definition will all the string variables interpolated may help debug pipeline bugs. It is commented out here due to length.
json.loads(deployment_pipeline.definition())

#### Execute the deployment pipeline

In [None]:
deployed_model_name = 'my-xgboost-model'
execution = deployment_pipeline.start(
 parameters = {
 'ModelName': deployed_model_name
 }
)

Check on status of pipeline programmatically (shown below) or via the Studio Pipelines interface

In [None]:
execution.describe()

In [None]:
execution.list_steps()

#### Test the SageMaker endpoint

Let's now send some data to the endpoint and test it is working properly. 

You could optionally do this as part of the pipeline, in a Lambda step, which would be recommended as you mature your MLOps practices.

For this, we first load our test data from Feature Store

In [None]:
# Read in test set that was used for batch transform
fs_group = FeatureGroup(name=test_feature_group_name, sagemaker_session=session) 
query = fs_group.athena_query()
table = query.table_name
query_string = f'SELECT {features_to_select} FROM "sagemaker_featurestore"."{table}" ORDER BY record_id'
query_results= 'sagemaker-featurestore'
output_location = f's3://{bucket}/{query_results}/query_results/'
query.run(query_string=query_string, output_location=output_location)
query.wait()
df = query.as_dataframe()
df.head()

Then we query the endpoint once it is available

In [None]:
response_status = 'None'
while response_status != 'InService':
 if response_status != 'None':
 print(f'Waiting for the endpoint deployment to finish. Current endpoint status: {response_status}')
 time.sleep(120) # wait until endpoint is in service
 response = sagemaker_client.describe_endpoint(
 EndpointName=deployed_model_name+'-endpoint'
 )
 response_status = response['EndpointStatus']
# Attach to the SageMaker endpoint
predictor = Predictor(endpoint_name=deployed_model_name+'-endpoint',
 sagemaker_session=session,
 serializer=CSVSerializer(),
 deserializer=CSVDeserializer())

# Get a real-time prediction
predictor.predict(df.drop(columns=["price"]).to_csv(index=False, header=False))[0]