## Use Sagemaker Pipelines To Orchestrate End To End Cross Validation Model Training Workflow

Amazon SageMaker Pipelines simplifies ML workflows orchestration across each step of the ML process, from exploration data analysis, preprocessing to model training and model deployment. 
With Sagemaker Pipelines, you can develop a consistent, reusable workflow that integrates with CI/CD pipeline for improved quality and reduced errors throughout development lifecycle.

## SageMaker Pipelines
An ML workflow built using Sagemaker Pipeline is made up of a series of Steps defined as a directed acryclic graph (DAG). The pipeline is expressed in JSON definition that captures relationships between the steps of your pipeline. Here's a terminology used in Sagemaker Pipeline for defining an ML workflow.

* Pipelines - Top level definition of a pipeline. It encapsulates name, parameters, and steps. A pipeline is scoped within an account and region. 
* Parameters - Parameters are defined in the pipeline definition. It introduces variables that can be provided to the pipeline at execution time. Parameters support string, float and integer types. 
* Pipeline Steps - Defines the actions that the pipeline takes and the relationships between steps using properties. Sagemaker Pipelines support the following step types: <b>Processing, Training, Transform, CreateModel, RegisterModel, Condition, Callback</b>.

## Notebook Overview
This notebook implements a complete Cross Validation ML model workflow using a custom built docker image, HyperparameterTuner for automatic hyperparameter optimization, 
SKLearn framework for K fold split and model training. The workflow is defined orchestrated using Sagemaker Pipelines. 
Here are the main steps involved the end to end workflow:
    
<ol>
<li>Defines a list of parameters, with default values to be used throughout the pipeline</li>
<li>Defines a ProcessingStep with SKLearn processor to perform KFold cross validation splits</li>
<li>Defines a ProcessingStep that orchestrates cross validation model training with HyperparameterTuner integration </li>
<li>Defines a ConditionStep that validates the model performance against the baseline</li>
<li>Defines a TrainingStep to train the model with the hyperparameters suggested by HyperparameterTuner using all the dataset </li>
<li>Creates a Model package, defines RegisterModel to register the trained model in the previous step with Sagemaker Model Registry</li>    
</ol>

## Dataset

The Iris flower data set is a multivariate data set introduced by the British statistician, eugenicist, and biologist Ronald Fisher in his 1936 [paper](https://onlinelibrary.wiley.com/doi/abs/10.1111/j.1469-1809.1936.tb02137.x). The data set consists of 50 samples from each of 3 species of Iris:
* Iris setosa 
* Iris virginica  
* Iris versicolor

There are 4 features available in each sample: the length and the width of the sepals and petals measured in centimeters. 

Based on the combination of these four features, we are going to build a linear algorithm (SVM) to train a multiclass classification model to distinguish the species from each other.

In [None]:
import boto3
import sagemaker

region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()

## Defines Pipeline Parameters
With Pipeline Parameters, you can introduce variables to the pipeline that specific to the pipeline run.  
The supported parameter types include:

ParameterString - represents a str Python type
ParameterInteger - represents an int Python type
ParameterFloat - represents a float Python type

Additionally, parameters support default values, which can be useful for scenarios where only a subset of the defined parameters need to change. For example, for training a model that uses k fold Cross Validation method, you could provide the desired k value at pipeline execution time. 

Here are the parameters for the workflow used in this notebook:

* ProcessingInstanceCount - number of instances for a Sagemaker Processing job in prepropcessing step.
* ProcessingInstanceType  - instance type used for a Sagemaker Processing job in prepropcessing step.
* TrainingInstanceType -  instance type used for Sagemaker Training job.
* TrainingInstanceCount -  number of instances for a Sagemaker Training job.
* InferenceInstanceType - instance type for hosting the deployment of the Sagemaker trained model.
* HPOTunerScriptInstanceType - instance type for the script processor that triggers the hyperparameter tuning job 
* ModelApprovalStatus - the initial approval status for the trained model in Sagemaker Model Registry
* ExecutionRole - IAM role to use throughout the specific pipeline execution. 
* DefaultS3Bucket - default S3 bucket name as the object storage for the target pipeline execution.
* BaselineModelObjectiveValue - the minimum objective metrics used for model evaluation.
* S3BucketPrefix - bucket prefix for the pipeline execution.
* ImageURI - docker image URI (ECR) for triggering cross validation model training with HyperparameterTuner.
* KFold - the value of k to be used in k fold cross validation
* MaxTrainingJobs - maximum number of model training jobs to trigger in a single hyperparameter tuner job.
* MaxParallelTrainingJobs - maximum number of parallel model training jobs to trigger in a single hyperparameter tuner job.
* MinimumC, MaximumC - Hyperparameter ranges for SVM 'c' parameter.
* MimimumGamma, MaximumGamma - Hyperparameter ranges for SVM 'gamma' parameter. 

In [None]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat
)

processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
processing_instance_type = ParameterString(name="ProcessingInstanceType", default_value="ml.m5.xlarge")
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
training_instance_count = ParameterInteger(name="TrainingInstanceCount", default_value=1)
inference_instance_type = ParameterString(name="InferenceInstanceType", default_value="ml.m5.large")
hpo_tuner_instance_type = ParameterString(name="HPOTunerScriptInstanceType", default_value="ml.t3.medium")
model_approval_status = ParameterString(name="ModelApprovalStatus", default_value="PendingManualApproval")
role = ParameterString(name='ExecutionRole', default_value=sagemaker.get_execution_role())
default_bucket = ParameterString(name="DefaultS3Bucket", default_value=sagemaker_session.default_bucket())
baseline_model_objective_value = ParameterFloat(name='BaselineModelObjectiveValue', default_value=0.6)
bucket_prefix = ParameterString(name="S3BucketPrefix", default_value="cross_validation_iris_classification")
image_uri = ParameterString(name="ImageURI")
k = ParameterInteger(name="KFold", default_value=3)
max_jobs = ParameterInteger(name="MaxTrainingJobs", default_value=3)
max_parallel_jobs = ParameterInteger(name="MaxParallelTrainingJobs", default_value=1)
min_c = ParameterInteger(name="MinimumC", default_value=0)
max_c = ParameterInteger(name="MaximumC", default_value=1)
min_gamma = ParameterFloat(name="MinimumGamma", default_value=0.0001)
max_gamma = ParameterFloat(name="MaximumGamma", default_value=0.001)
gamma_scaling_type = ParameterString(name="GammaScalingType", default_value="Logarithmic")

In [None]:
# Variables / Constants used throughout the pipeline
model_package_group_name="IrisClassificationCrossValidatedModel"
framework_version = "0.23-1"
s3_bucket_base_path=f"s3://{default_bucket}/{bucket_prefix}"
s3_bucket_base_path_train = f"{s3_bucket_base_path}/train"
s3_bucket_base_path_test = f"{s3_bucket_base_path}/test"
s3_bucket_base_path_evaluation = f"{s3_bucket_base_path}/evaluation"
s3_bucket_base_path_jobinfo = f"{s3_bucket_base_path}/jobinfo"
s3_bucket_base_path_output = f"{s3_bucket_base_path}/output"

## Preprocessing Step
The first step in K Fold cross validation model workflow is to split the training dataset into k batches randomly.
We are going to use Sagemaker SKLearnProcessor with a preprocessing script to perform dataset splits, and upload the results to the specified S3 bucket for model training step. 

In [None]:
from sagemaker.sklearn.processing import SKLearnProcessor

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="kfold-crossvalidation-split",
    role=role
)

In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

step_process = ProcessingStep(
    name="PreprocessStep",
    processor=sklearn_processor,
    outputs=[
        ProcessingOutput(output_name="train", 
                         source="/opt/ml/processing/train", 
                         destination=s3_bucket_base_path_train),
        ProcessingOutput(output_name="test", 
                         source="/opt/ml/processing/test", 
                         destination=s3_bucket_base_path_test),
    ],
    code="code/preprocessing.py"
)

## Cross Validation Model Training Step 
In Cross Validation Model Training workflow, a script processor is used for orchestrating k training jobs in parallel, each of the k jobs is responsible for training a model using the specified split samples. Additionally, the script processor leverages Sagemaker HyperparameterTuner to optimize the hyper parameters and pass these values to perform k training jobs. The script processor monitors all training jobs. Once the jobs are complete, the script processor captures key metrics, including the training accuracy and the hyperparameters from the best training job, then uploads the results to the specified S3 bucket location to be used for model evaluation and model selection steps.

The components involved in orchestrating the cross validation model training, hyperparameter optimizations and key metrics capture:

* PropertyFile - EvaluationReport, contains the performance metrics from the HyperparameterTuner job, expressed in JSON format.
* PropertyFile JobInfo, contains information about the best training job and the corresponding hyperparameters used for training, expressed in JSON format.
* ScriptProcessor - A python script that orchestrates a hyperparameter tuning job for cross validation model trainings.

## Custom Docker Image
In order to facilitate k fold cross validation training jobs through Sagemaker Automatic Model tuning, we need to create a custom docker image to include both the python script that manages the kfold cross validation training jobs, and the actual training script that each of the k training jobs would submit. For details about adopting custom docker containers to work with Sagemaker, please follow this [link](https://docs.aws.amazon.com/sagemaker/latest/dg/docker-containers-adapt-your-own.html). The docker image used in the pipeline was built using the [Dockerfile](code/Dockerfile) included in this project. 

Following are the steps for working with [ECR](https://aws.amazon.com/ecr/) on authentication, image building and pushing to ECR registry for Sagemaker training: \(follow this [link](https://docs.aws.amazon.com/AmazonECR/latest/userguide/getting-started-cli.html) for official AWS guidance for working with ECR\)

<b>Prerequisites</b>
* [docker](https://docs.docker.com/get-docker/) 
* [git client](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) 
* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-install.html) 

<b>Note:</b>
If you use [AWS Cloud9](https://aws.amazon.com/cloud9/) as the CLI terminal, the prerequisites described above are  met by default, there is no need to install any additional tools.

<b>Steps</b>
* Open a new terminal
* git clone this project
* cd to code directory
* ./build-and-push-docker.sh [aws_acct_id] [aws_region]
* capture the ECR repository name from the script after a successful run. You'll need to provide the image name at  pipeline execution time. Here's an example format of an ECR repo name: ############.dkr.ecr.region.amazonaws.com/sagemaker-cross-validation-pipeline:latest

In [None]:
from sagemaker.processing import ScriptProcessor
from sagemaker.workflow.properties import PropertyFile

evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)

jobinfo = PropertyFile(
    name="JobInfo", output_name="jobinfo", path="jobinfo.json"
)

script_tuner = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=hpo_tuner_instance_type,
    instance_count=1,
    base_job_name="KFoldCrossValidationHyperParameterTuner",
    role=role
)

step_cv_train_hpo = ProcessingStep(
    name="HyperParameterTuningStep",
    processor=script_tuner,
    code="code/cross_validation_with_hpo.py",
    outputs=[
        ProcessingOutput(output_name="evaluation", 
                         source="/opt/ml/processing/evaluation", 
                         destination=s3_bucket_base_path_evaluation),
        ProcessingOutput(output_name="jobinfo", 
                         source="/opt/ml/processing/jobinfo", 
                         destination=s3_bucket_base_path_jobinfo)
    ],
    job_arguments=["-k", str(k),
                   "--image-uri", image_uri, 
                   "--train", s3_bucket_base_path_train, 
                   "--test", s3_bucket_base_path_test,
                   "--instance-type", training_instance_type,
                   "--instance-count", str(training_instance_count),
                   "--output-path", s3_bucket_base_path_output,
                   "--max-jobs", str(max_jobs),
                   "--max-parallel-jobs" , str(max_parallel_jobs),
                   "--min-c", str(min_c),
                   "--max-c", str(max_c),
                   "--min-gamma", str(min_gamma), 
                   "--max-gamma", str(max_gamma),
                   "--gamma-scaling-type", str(gamma_scaling_type),
                   "--region", str(region)],
    property_files=[evaluation_report],
    depends_on=['PreprocessStep'])

## Model Selection Step
Model selection is the final step in cross validation model training workflow. Based on the metrics and hyperparameters acquired from the cross validation steps orchestrated through ScriptProcessor, 
a Training Step is defined to train a model with the same algorithm used in cross validation training, with all available training data. The model artifacts created from the training process will be used 
for model registration, deployment and inferences. 

Components involved in the model selection step:
    
* SKLearn Estimator - A Sagemaker Estimator used in training a final model.
* TrainingStep - Workflow step that triggers the model selection process.


In [None]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
from sagemaker.sklearn.estimator import SKLearn

sklearn_estimator = SKLearn("scikit_learn_iris.py", 
                           framework_version=framework_version, 
                           instance_type=training_instance_type,
                           py_version='py3', 
                           source_dir="code",
                           output_path=s3_bucket_base_path_output,
                           role=role)

step_model_selection = TrainingStep(
    name="ModelSelectionStep",
    estimator=sklearn_estimator,
    inputs={
        "train": TrainingInput(
            s3_data=f'{step_process.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]}/all',
            content_type="text/csv"
        ),
        "jobinfo": TrainingInput(
            s3_data=f"{s3_bucket_base_path_jobinfo}",
            content_type="application/json"
        )
    }
)

## Register Model With Model Registry
Once the model selection step is complete, the trained model artifact can be registered with Sagemaker Model Registry.
Model registry catalogs the trained model to enable model versioning, performance metrics and approval status captures. Additionally, models versioned in the ModelRegistry can be deployed through CI/CD. Here's a link for more information about Model Registry, https://docs.aws.amazon.com/sagemaker/latest/dg/model-registry.html

Components involved in registering a trained model with Model Registry:
* Model - Model object that contains metadata for the trained model. 
* CreateModelInput - An object that encapsulates the parameters used to create a Sagemaker Model.
* CreateModelStep - Workflow Step that creates a Sagemaker Model
* ModelMetrics - Captures metadata, including metrics statistics, data constraints, bias and explainability for the trained model.
* RegisterModel - Workflow Step that registers model Model Registry.

In [None]:
from sagemaker.model import Model

model = Model(
    image_uri=sklearn_estimator.image_uri,
    model_data=step_model_selection.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sagemaker_session,
    role=role,
)

In [None]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel


model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_cv_train_hpo.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)

step_register_model = RegisterModel(
    name="RegisterModelStep",
    estimator=sklearn_estimator,
    model_data=step_model_selection.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)

## Condition Step
Sagemaker Pipelines supports condition steps for evaluating the conditions of step properties to determine the next action.
In the context of cross validation model workflow, a condition step is defined to evaluate model metrics captured in the Cross Validation Training Step to determine whether 
the model selection step should take place. This step evaluates a ConditionGreaterThanOrEqualTo based on a given baseline model objective value to determine the next steps.

Components involved in defining a Condition Step:

ConditionGreaterThanOrEqualTo - A condition that defines the evaluation criteria for the given model objective value and model performance metrics captured in the evaluation report. This condition returns True if the model performance metrics is greater or equals to the baseline model objective value, False otherwise.
ConditionStep - Workflow Step that performs the evaluation based on the criteria defined in ConditionGreaterThanOrEqualTo

In [None]:
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import (
    ConditionStep,
    JsonGet,
)

cond_gte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step=step_cv_train_hpo,
        property_file=evaluation_report,
        json_path="multiclass_classification_metrics.accuracy.value",
    ),
    right=baseline_model_objective_value,
)

step_cond = ConditionStep(
    name="ModelEvaluationStep",
    conditions=[cond_gte],
    if_steps=[step_model_selection, step_register_model],
    else_steps=[],
)

## Define A Pipeline
With Pipeline components defined, we can create Sagemaker Pipeline by associating the Parameters, Steps and Conditions created in this notebook.
The pipeline definition encodes a pipeline using a directed acyclic graph (DAG) with relationships between each step of the pipeline. 
The structure of a pipeline's DAG is determined by either data dependencies between steps, or custom dependencies defined in the Steps.
For CrossValidation training pipline, relationships between the components in the DAG are specified in the depends_on attribute of the Steps.

A pipeline instance is composed of a <b>name, parameters, and steps </b>.

In [None]:
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.execution_variables import ExecutionVariables

pipeline_name = f"CrossValidationTrainingPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        training_instance_count,
        inference_instance_type,
        hpo_tuner_instance_type,
        model_approval_status,
        role,
        default_bucket,
        baseline_model_objective_value,
        bucket_prefix,
        image_uri,
        k,
        max_jobs,
        max_parallel_jobs,
        min_c,
        max_c,
        min_gamma,
        max_gamma,
        gamma_scaling_type
    ],    
    pipeline_experiment_config=PipelineExperimentConfig(
      ExecutionVariables.PIPELINE_NAME,
      ExecutionVariables.PIPELINE_EXECUTION_ID),
    steps=[step_process, step_cv_train_hpo, step_cond],
)


## Examine Pipeline Definition
Before triggering a pipeline run, it's a good practice to examine the JSON pipeline definition to ensure that it's well-formed.

In [None]:
import json
json.loads(pipeline.definition())

# Pipeline Creation
Submit the pipeline definition to the SageMaker Pipelines service to create a pipeline if it doesn't exist, or update the pipeline if it does. The role passed in is used by SageMaker Pipelines to create all of the jobs defined in the steps.

In [None]:
pipeline.upsert(role_arn=role)

# Trigger Pipeline Execution
After creating a pipeline definition, you can submit it to SageMaker to start your execution, optionally provides the parameters specific for the run.

In [None]:
# Before triggering the pipeline, make sure to override the ImageURI parameter value with 
# one created the previous step.
execution = pipeline.start(
    parameters=dict(
        BaselineModelObjectiveValue=0.8,
        MinimumC=0,
        MaximumC=1,
        MaxTrainingJobs=3,
        ImageURI="041158455166.dkr.ecr.us-east-1.amazonaws.com/sagemaker-cross-validation-pipeline:latest"
    ))

# Examine a Pipeline Execution
Examine the pipeline execution at runtime by using sagemaker SDK

In [None]:
execution.describe()

## Wait For The Pipeline Execution To Complete 
Pipeline execution supports waiting for the job to complete synchrounously

In [None]:
execution.wait()