## Introduction

This is our fourth notebook which will dive deep into automating machine learning workflows to create a more repeatable path to production. 

Here, we will put on the hat of a `ML Engineer` and perform the tasks required to automate the tasks within our machine learning workflows as well as orchestrate the steps. For this, we'll build pipeline steps that include all the previous notebooks components into one singular entity. This pipeline entity accomplishes a repeatable ML workflow with some reliability built in through minimal quality gates. 

For this task we will be using Amazon SageMaker Pipelines capabilities to build out an end-to-end machine learning pipeline. 

Let's get started!

In [None]:
!pip install -U sagemaker

In [None]:
!pip show sagemaker

In [None]:
%store -r

In [None]:
# Processing imports
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput, ScriptProcessor

# SageMaker Pipeline imports
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, TrainingStep, CreateModelStep, TransformStep, TuningStep
from sagemaker.workflow.model_step import ModelStep

from sagemaker.workflow.parameters import (
 ParameterInteger,
 ParameterString,
 ParameterFloat,
)

# Other imports
import json
import time
from time import gmtime, strftime
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.model import Model
from sagemaker.tuner import IntegerParameter, HyperparameterTuner
from sagemaker.inputs import TrainingInput
from sagemaker.lambda_helper import Lambda
from sagemaker.workflow.lambda_step import (
 LambdaStep,
 LambdaOutput,
 LambdaOutputTypeEnum,
)

# To test the endpoint once it's deployed
from sagemaker.predictor import Predictor
from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import JSONDeserializer, CSVDeserializer
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.tuner import IntegerParameter, ContinuousParameter, HyperparameterTuner

import sagemaker
import json
import boto3
from sagemaker.model_metrics import ModelMetrics, MetricsSource
import pandas as pd
from sagemaker.feature_store.feature_group import FeatureGroup
from helper_library import *

from sagemaker.workflow.steps import CacheConfig

**Session variables**

In [None]:
# Useful SageMaker variables
session = PipelineSession()
bucket = session.default_bucket()
role_arn= sagemaker.get_execution_role()
region = session.boto_region_name
sagemaker_client = boto3.client('sagemaker')
aws_account_id = boto3.client('sts').get_caller_identity().get('Account')
#lambda_role = create_lambda_iam_role('LambdaSageMakerExecutionRole')
# Data paths in S3
s3_prefix = 'aws-sm-ray-pipeline-workshop'
bucket_prefix = f'{s3_prefix}/data/feature-store'
model_prefix = f'{s3_prefix}/models'
output_path = f's3://{bucket}/{s3_prefix}/data/sm_processed'
fs_s3_path = f's3://{bucket}/{s3_prefix}/data/feature-store'

In [None]:
processing_instance_type='ml.m5.2xlarge'
num_actors = 10
train_instance_type = 'ml.c5.xlarge'
train_instance_count = 3

## Model Build pipeline with SageMaker Pipelines

[Amazon SageMaker Pipelines](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-sdk.html) provides the ability to create a directed acryclic graph (DAG) containing the pipeline steps need to build and/or deploy machine learning models. Each pipeline, created through the provided Python SDK, is a series of interconnected steps. This same pipeline can also be exported as a JSON pipeline definition. 

The structure of a pipeline's DAG is determined by the data dependencies between steps. These data dependencies are created when the properties of a step's output are passed as the input to another step. The following image is a pipeline DAG that we'll be creating for our training pipeline:

![](images/sagemaker-pipelines-dag.png)

#### Pipeline Parameters

SageMaker Pipelines supports [pipeline parameters](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-parameters.html) allowing you to provide runtime parameters for each run of your pipeline. This allows you to change key inputs for each run of your pipeline without changing your pipeline code (ex. raw data on input)

Here, we'll identify the parameters and set the parameter default. You can also use this feature to make it reusable (you'll be able to override these inputs upon executing the pipeline later in the notebook).

In [None]:
# Upload raw data to S3
local_data_path_ray = "data/raw/ray/house_pricing.csv"

raw_data_s3_prefix = '{}/data/raw'.format(s3_prefix)
raw_s3 = session.upload_data(path=local_data_path_ray, key_prefix=raw_data_s3_prefix)

In [None]:
# Optional step
# Delete all file in the S3 prefix before begining preprocessing. 
# This is to prevent duplication of data when running this workshop multiple time.

s3 = boto3.resource('s3')
print(bucket)
bucket_obj = s3.Bucket(bucket)
print(f"{s3_prefix}/data/sm_processed/")
files = bucket_obj.objects.filter(Prefix=f"{s3_prefix}/data/sm_processed/")
files.delete()

In [None]:
# Optional step
# Delete all Feature Groups that start with the prefix fs-ray-. 
# This is to prevent duplication of feature stores when running this workshop multiple time.

sm_client = boto3.client('sagemaker', region_name='us-east-1')
sagemaker_session = sagemaker.Session(boto3.Session(region_name='us-east-1'))
response = sm_client.list_feature_groups(
 NameContains='fs-ray-'
)

for feature in response["FeatureGroupSummaries"]:
 print(f'deleting {feature["FeatureGroupName"]}')
 resp = sm_client.delete_feature_group(
 FeatureGroupName=feature["FeatureGroupName"]
 )

In [None]:
processing_instance_count = ParameterInteger(
 name='ProcessingInstanceCount',
 default_value=1
)

feature_group_name = ParameterString(
 name='FeatureGroupName',
 default_value='fs-ray-synthetic-housing-data'
)

bucket_prefix = ParameterString(
 name='Bucket_Prefix',
 default_value='aws-ray-mlops-workshop/feature-store'
)

rmse_threshold = ParameterFloat(name="RMSEThreshold", default_value=15000.0)

train_size = ParameterString(
 name='TrainSize',
 default_value="0.6"
)

val_size = ParameterString(
 name='ValidationSize',
 default_value="0.2"
)

test_size = ParameterString(
 name='TestSize',
 default_value="0.2"
)


#### Setup Step Caching Configuration

This configuration can be enabled on pipeline steps to allow SageMaker Pipelines to automatically check if a previous (successful) run of a pipeline step with the same values for specific parameters is found. If it is found, Pipelines propogates the results of that step to the next step without re-running the step saving both time and compute costs.

In [None]:
cache_config = CacheConfig(enable_caching=True, expire_after="PT12H")

#### SageMaker Processing step

This should look very similar to the SageMaker Processing Job you configured in notebook 2. The only new line of code is the `ProcessingStep` line at the bottom of the cell below which allows us to take the Processing Job configuration and include it as a pipeline step. 

In [None]:
from sagemaker.workflow.functions import Join

feature_store_ingestion = SKLearnProcessor(
 framework_version='1.0-1',
 role=role_arn,
 instance_type=processing_instance_type,
 instance_count=processing_instance_count,
 base_job_name='feature-store-ingestion',
 sagemaker_session=session
)

fs_processor_args = feature_store_ingestion.run(
 code="./pipeline_scripts/feature-store/script-fs.py",
 inputs=[
 ProcessingInput(
 source=raw_s3,
 destination='/opt/ml/processing/input',
 s3_data_distribution_type='ShardedByS3Key'
 )
 ], 
 arguments=[
 "--feature_group_name", feature_group_name,
 "--num_actors", str(num_actors),
 "--bucket_prefix", bucket_prefix,
 "--role_arn", role_arn,
 "--region", region,
 ]
)
 
feature_store_ingestion_step = ProcessingStep(
 name='FeatureStoreIngestion',
 step_args=fs_processor_args,
 cache_config=cache_config
)

In [None]:
f'{output_path}/test'

In [None]:
preprocess_data_processor = SKLearnProcessor(
 framework_version='1.0-1',
 role=role_arn,
 instance_type=processing_instance_type,
 instance_count=processing_instance_count,
 base_job_name='preprocess-data',
 sagemaker_session=session,
 
)

processor_args = preprocess_data_processor.run(
 code="./pipeline_scripts/preprocessing/script.py",
 outputs=[
 ProcessingOutput(
 output_name='train',
 destination=f'{output_path}/train',
 source='/opt/ml/processing/output/train'
 ),
 ProcessingOutput(
 output_name='validation',
 destination=f'{output_path}/validation',
 source='/opt/ml/processing/output/validation'
 ),
 ProcessingOutput(
 output_name='test',
 destination=f'{output_path}/test',
 source='/opt/ml/processing/output/test'
 )
 ], 
 arguments=[
 "--feature_group_name", feature_group_name,
 "--train_size", train_size,
 "--val_size", val_size,
 "--test_size", test_size,
 "--role_arn", role_arn,
 "--region", region
 ]
)

preprocess_dataset_step = ProcessingStep(
 name='PreprocessData',
 step_args=processor_args,
 cache_config=cache_config
)
preprocess_dataset_step.add_depends_on([feature_store_ingestion_step])

## Hyperparameter Tuning Step
Amazon SageMaker automatic model tuning, also known as hyperparameter tuning, finds the best version of a model by running many training jobs on your dataset using the algorithm and ranges of hyperparameters that you specify. It then chooses the hyperparameter values that result in a model that performs the best, as measured by a metric that you choose.

This configuration should also look very similar to the SageMaker Training job you did in notebook 2. The only new line of code is the `TuningStep` line at the bottom of the cell below to allow us to run the training job as a step in our pipeline.

You can learn more about [Hyperparameter Tuning](https://docs.aws.amazon.com/sagemaker/latest/dg/automatic-model-tuning-how-it-works.html) in the SageMaker docs.

In [None]:
role_arn

In [None]:
from sagemaker.xgboost.estimator import XGBoost

hyperparams = {
 # Tuned hyperparameters
 #"max_depth": "4",
 #"eta": "0.810249",
 #"min_child_weight": "79",
 #"subsample": "0.984023",
 #"objective": "reg:squarederror",
 # Training job params
 # "feature_group_name": feature_group_name.to_string(),
 #"validation_feature_group_name": validation_feature_group_name,
 "role_arn": role_arn,
 "region": region,
}

estimator_parameters = {
 'source_dir': './pipeline_scripts/train/',
 'entry_point': 'script.py',
 'framework_version': '1.7-1',
 'instance_type': train_instance_type,
 'instance_count': train_instance_count,
 'hyperparameters': hyperparams,
 'role': role_arn,
 'base_job_name': 'XGBoost-model',
 'output_path': f's3://{bucket}/{s3_prefix}/',
 'image_scope': 'training',
}

estimator = XGBoost(**estimator_parameters)


In [None]:
hyperparameter_ranges = {
 "max_depth": IntegerParameter(1, 8),
 "eta": ContinuousParameter(0.2, 1),
 "min_child_weight": IntegerParameter(0, 120),
 "subsample": ContinuousParameter(0.2, 1),
}

objective_metric_name = 'validation:rmse'
objective_type = 'Minimize'
tuner_parameters = {
 'estimator': estimator,
 'objective_metric_name': objective_metric_name,
 'hyperparameter_ranges': hyperparameter_ranges,
 # 'metric_definitions': metric_definitions,
 'max_jobs': 4,
 'max_parallel_jobs': 4,
 'objective_type': objective_type
 }
 
tuner = HyperparameterTuner(**tuner_parameters)

tuning_step = TuningStep(
 name="HPTuning",
 tuner=tuner,
 inputs={
 "train": TrainingInput(
 s3_data=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[
 "train"
 ].S3Output.S3Uri,
 content_type="text/csv"
 ),
 "validation": TrainingInput(
 s3_data=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[
 "validation"
 ].S3Output.S3Uri,
 content_type="text/csv"
 )
 },
 #step_args=hpo_args,
 cache_config=cache_config,
)
tuning_step.add_depends_on([preprocess_dataset_step])

## Creating and Registering the best models

After successfully completing the Hyperparameter Tuning step, you can either create SageMaker models from the model artifacts created by the training jobs from the TuningStep or register the models into the Model Registry. 

In this example we will implement a minimum quality gate that will compare the HPO objective metric (validation RMSE) against a threshold (`rmse_threshold` input parameter)

To do that, we can create a SageMaker Processing Step that will utilize evaluation code (evaluation.py). The evaluation will be read the eval_metric (MAE and RMSE) put out by the best estimator of the hypertuning job and create a evalution.json that can be use by the ConditionLessThanOrEqualTo to create a minimum threashold for the model to be registred in to the Model Registry.

[More information about regression metrics](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-model-quality-metrics.html)

In [None]:
%%writefile ./pipeline_scripts/evaluate/script.py
import subprocess
import sys
subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'sagemaker', 'ray', 'modin[ray]', 'pydantic==1.10.10', 'xgboost_ray'])
import os
import time
import tarfile
import argparse
import json
import logging
import boto3
import sagemaker
import glob

import pathlib
import numpy as np
from math import sqrt
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

# Sagemaker specific imports
from sagemaker.session import Session
import pandas as pd
import xgboost as xgb

# Ray specific imports
# import ray
# from ray.air.checkpoint import Checkpoint
# from ray.train.xgboost import XGBoostCheckpoint, XGBoostPredictor
# import ray.cloudpickle as cloudpickle

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))

if __name__ == "__main__":
 logger.debug('Starting evaluation.')
 
 model_dir = '/opt/ml/processing/model'
 for file in os.listdir(model_dir):
 logger.info(file)
 
 model_path = os.path.join(model_dir, 'model.tar.gz')
 with tarfile.open(model_path) as tar:
 tar.extractall(path=model_dir)
 
 for file in os.listdir(model_dir):
 logger.info(file)
 
 logger.debug('Loading sklearn model.')
 model = xgb.Booster()
 model.load_model(os.path.join(model_dir, 'model.xgb'))

 logger.debug('Reading test data.')

 test_path = "/opt/ml/processing/test/"
 # Get list of all csv files in folder
 csv_files = glob.glob(f'{test_path}*.csv')
 
 # Read each CSV file into DataFrame
 # This creates a list of dataframes
 df_list = (pd.read_csv(file, header=0) for file in csv_files)

 # Concatenate all DataFrames
 df = pd.concat(df_list, ignore_index=True)
 df.reset_index(drop=True, inplace=True)
 print(df.head(5))
 y_test = df["PRICE"].to_numpy()
 df.drop(columns=["PRICE"], axis=1, inplace=True)

 X_test = xgb.DMatrix(df.values)
 
 logger.info('Performing predictions against test data.')
 predictions = model.predict(X_test)

 # See the regression metrics
 # see: https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-model-quality-metrics.html
 logger.debug('Calculating metrics.')
 mae = mean_absolute_error(y_test, predictions)
 mse = mean_squared_error(y_test, predictions)
 rmse = sqrt(mse)
 std = np.std(y_test - predictions)
 report_dict = {
 'regression_metrics': {
 'mae': {
 'value': mae,
 "standard_deviation": std
 },
 'rmse': {
 'value': rmse,
 "standard_deviation": std
 },
 },
 }

 output_dir = '/opt/ml/processing/evaluation'
 pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

 logger.info('Writing out evaluation report with rmse: %f', rmse)
 evaluation_path = f'{output_dir}/evaluation.json'
 with open(evaluation_path, 'w') as f:
 f.write(json.dumps(report_dict))

In [None]:
evaluation_processor = SKLearnProcessor(
 framework_version='0.23-1',
 role=role_arn,
 instance_type='ml.m5.xlarge',
 instance_count=processing_instance_count,
 base_job_name='evaluation',
 sagemaker_session=session,
)

In [None]:
# Specify where we'll store the model evaluation results so
# that other steps can access those results
evaluation_report = PropertyFile(
 name='EvaluationReport',
 output_name='evaluation',
 path='evaluation.json',
)

# A ProcessingStep is used to evaluate the performance of a selected model from the HPO step. 
# In this case, the top performing model is evaluated. 
evaluation_step = ProcessingStep(
 name='EvaluateModel',
 processor=evaluation_processor,
 inputs=[
 ProcessingInput(
 source=tuning_step.get_top_model_s3_uri(
 top_k=0, s3_bucket=bucket, prefix=s3_prefix
 ),
 destination='/opt/ml/processing/model',
 ),
 ProcessingInput(
 source=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs['test'].S3Output.S3Uri,
 destination='/opt/ml/processing/test',
 ),
 ],
 outputs=[
 ProcessingOutput(
 output_name='evaluation', source='/opt/ml/processing/evaluation'
 ),
 ],
 code='./pipeline_scripts/evaluate/script.py',
 property_files=[evaluation_report],
)

In [None]:
from sagemaker.xgboost.model import XGBoostModel
from sagemaker.utils import unique_name_from_base

model_package_group_name = unique_name_from_base('synthetic-housing-models-ray-')

model_metrics = ModelMetrics(
 model_statistics=MetricsSource(
 s3_uri='{}/evaluation.json'.format(
 evaluation_step.arguments['ProcessingOutputConfig']['Outputs'][0]['S3Output'][
 'S3Uri'
 ]
 ),
 content_type='application/json',
 )
)

# Based on the results of the evaluation, the model is registered into the Model Registry using a ConditionStep.
best_model = XGBoostModel(
 #image_uri=estimator.training_image_uri(),
 model_data=tuning_step.get_top_model_s3_uri(top_k=0, s3_bucket=bucket, prefix=s3_prefix),
 # source_dir=estimator.source_dir,
 #entry_point=estimator.entry_point,
 entry_point="./pipeline_scripts/inference/script.py",
 role=role_arn,
 sagemaker_session=session,
 framework_version="1.7-1"
)

model_registry_args = best_model.register(
 content_types=['text/csv'],
 response_types=['application/json'],
 inference_instances=['ml.t2.medium', 'ml.m5.xlarge'],
 transform_instances=['ml.m5.xlarge'],
 model_package_group_name=model_package_group_name,
 approval_status='PendingManualApproval',
 description='XGBoost model to predict synthetic housing prices',
 model_metrics=model_metrics
)

register_step = ModelStep(
 name='RegisterTrainedModel',
 step_args=model_registry_args
)

In [None]:
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join

metrics_fail_step = FailStep(
 name="RMSEFail",
 error_message=Join(on=" ", values=["Execution failed due to RMSE >", rmse_threshold]),
)

# Condition step for evaluating model quality and branching execution
cond_lte = ConditionLessThanOrEqualTo(
 left=JsonGet(
 step_name=evaluation_step.name,
 property_file=evaluation_report,
 json_path='regression_metrics.rmse.value',
 ),
 right=rmse_threshold,
)
condition_step = ConditionStep(
 name='CheckEvaluation',
 conditions=[cond_lte],
 if_steps=[register_step],
 else_steps=[metrics_fail_step],
)

In [None]:
# pipeline_name = 'synthetic-housing-training-pipeline-{}'.format(strftime('%d-%H-%M-%S', gmtime()))
pipeline_name = 'synthetic-housing-training-sm-pipeline-ray'
step_list = [
 feature_store_ingestion_step,
 preprocess_dataset_step,
 tuning_step,
 evaluation_step,
 condition_step
 ]

training_pipeline = Pipeline(
 name=pipeline_name,
 parameters=[
 processing_instance_count,
 feature_group_name,
 train_size,
 val_size,
 test_size,
 bucket_prefix,
 rmse_threshold
 ],
 steps=step_list
)

# Note: If an existing pipeline has the same name it will be overwritten.
training_pipeline.upsert(role_arn=role_arn)

# Viewing the pipeline definition will all the string variables interpolated may help debug pipeline bugs. It is commented out here due to length.
#json.loads(training_pipeline.definition())

In [None]:
json.loads(training_pipeline.definition())

In [None]:
# This is where you could optionally override parameter defaults 
execution = training_pipeline.start()

In [None]:
execution.describe()

In [None]:
execution.wait()

List the steps in the execution. These are the steps in the pipeline that have been resolved by the step executor service.

In [None]:
execution.list_steps()

### Examining the Evaluation

Examine the resulting model evaluation after the pipeline completes. Download the resulting `evaluation.json` file from S3 and print the report.

In [None]:
from pprint import pprint


evaluation_json = sagemaker.s3.S3Downloader.read_file(
 "{}/evaluation.json".format(
 evaluation_step.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
 )
)
pprint(json.loads(evaluation_json))

### Lineage

Review the lineage of the artifacts generated by the pipeline.

In [None]:
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer


viz = LineageTableVisualizer(sagemaker.session.Session())
for execution_step in reversed(execution.list_steps()):
 print(execution_step)
 display(viz.show(pipeline_execution_step=execution_step))
 time.sleep(5)

## Cleaning up resources

Users are responsible for cleaning up resources created when running this notebook. Specify the ModelName, ModelPackageName, and ModelPackageGroupName that need to be deleted. The model names are generated by the CreateModel step of the Pipeline and the property values are available only in the Pipeline context. To delete the models created by this pipeline, navigate to the Model Registry and Console to find the models to delete.

In [None]:
# # Create a SageMaker client
# sm_client = boto3.client("sagemaker")

# # Delete SageMaker Models
# sm_client.delete_model(ModelName="...")

# # Delete Model Packages
# sm_client.delete_model_package(ModelPackageName="...")

# # Delete the Model Package Group
# sm_client.delete_model_package_group(ModelPackageGroupName="...")

# # Delete the Pipeline
# sm_client.delete_pipeline(PipelineName=pipeline_name)

# # Delete created dataset
# !rm -rf ./data/processed/*