## Use SageMaker Pipelines to track and manage multiple trials

In this notebook, we aim to show how to organize the manual steps involved in a ML lifecycle into an automated SageMaker Pipeline. It is recommended that you review the `01-Experiments.pynb` to understand the context of this exercise if you haven't.

[Amazon SageMaker Pipelines](https://aws.amazon.com/sagemaker/pipelines/) is a purpose built, fully managed machine learning workflow orchestration that helps customers build, automate and manage their entire machine learning lifecycle. For our use case, Amazon SageMaker Pipelines provides a faster path to move from a manual experimentation phase with notebooks to a controlled automated ML workflow orchestration. It can be accessed via [Amazon SageMaker Studio](https://aws.amazon.com/sagemaker/studio/) for effective visual workflow tracking and monitoring. 

[Amazon SageMaker Pipelines](https://aws.amazon.com/sagemaker/pipelines/) is integrated with [Amazon SageMaker Python SDK](https://sagemaker.readthedocs.io/en/stable/) which helps you to create your workflows programmatically. A Pipeline consists of a series of interconnected steps. Each step defines an action that the Pipeline takes and the dependency between steps is defined by a direct acyclic graph (DAG) in the form of a JSON definition.


At the time of writing, the `sagemaker` SDK version tested is `2.73.0`, while the `sagemaker-experiment` SDK library is `0.1.35`.

In [None]:
import sys
import subprocess
subprocess.check_call([sys.executable, '-m', 'pip', 'install', '-Uq', 'pip'])
subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'sagemaker==2.73.0', '-Uq'])
subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'sagemaker-experiments==0.1.35', '-Uq'])

### Set up

* The S3 bucket and prefix that you want to use for training and model data. This should be within the same region as the notebook instance, training, and hosting.
* The IAM role arn that has access to your data and training jobs
* The experiment name as the logical entity to keep our tests grouped and organized.

In [None]:
import json
import boto3
import time
from time import strftime

import sagemaker

# Set up execution role, client, session, and default bucket

role = sagemaker.get_execution_role()

region = sagemaker.Session().boto_region_name
sm_client = boto3.client("sagemaker")
boto_session = boto3.Session(region_name=region)

sagemaker_session = sagemaker.session.Session(
 boto_session=boto_session, sagemaker_client=sm_client
)

bucket = sagemaker_session.default_bucket()
experiment_name = 'DEMO-sagemaker-experiments-pipelines'
prefix = experiment_name

print(f"bucket: {bucket}")
print(f"region: {region}")
print(f"role: {role}")

### Define the pipeline
Here we define some configuration variables like instance count and instance type as pipeline parameters to set it dynamically during runtime. These variables will be populated when the pipeline is kick-started at the end of the notebook.

In [None]:
from sagemaker.workflow.execution_variables import ExecutionVariables
from sagemaker.workflow.parameters import ParameterString, ParameterInteger
from sagemaker.workflow.steps import CacheConfig

base_job_prefix = "pipeline-experiment-sample"

processing_instance_count = ParameterInteger(
 name="ProcessingInstanceCount", default_value=1
)

training_instance_count = ParameterInteger(
 name="TrainingInstanceCount", default_value=1
)

processing_instance_type = ParameterString(
 name="ProcessingInstanceType", default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
 name="TrainingInstanceType", default_value="ml.m5.xlarge"
)

# Cache Pipeline steps to reduce execution time on subsequent executions
cache_config = CacheConfig(enable_caching=True, expire_after="30d")

We use the Sklearn processor library to run our preprocessing logic using the SageMaker Processing job. The preprocessing step refers to the `california-housing-preprocessing.py` script which basically splits the input dataset into training, validation and test datasets on a predefined ratio.

In [None]:
!pygmentize 'california-housing-preprocessing.py'

Please note that we append the pipeline execution id to the dataset name to make it easy for tracking and monitoring the experiment runs.

In [None]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput

from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.functions import Join

framework_version = "0.23-1"

sklearn_processor = SKLearnProcessor(
 framework_version=framework_version,
 instance_type=processing_instance_type,
 instance_count=processing_instance_count,
 base_job_name="sklearn-ca-housing",
 role=role,
)

process_step = ProcessingStep(
 name="ca-housing-preprocessing",
 processor=sklearn_processor,
 outputs=[
 ProcessingOutput(
 output_name="train",
 source="/opt/ml/processing/train",
 destination=Join(
 on="/",
 values=[
 "s3://{}".format(bucket),
 prefix,
 ExecutionVariables.PIPELINE_EXECUTION_ID,
 "train",
 ],
 ),
 ),
 ProcessingOutput(
 output_name="validation",
 source="/opt/ml/processing/validation",
 destination=Join(
 on="/",
 values=[
 "s3://{}".format(bucket),
 prefix,
 ExecutionVariables.PIPELINE_EXECUTION_ID,
 "validation",
 ],
 )
 ),
 ProcessingOutput(
 output_name="test",
 source="/opt/ml/processing/test",
 destination=Join(
 on="/",
 values=[
 "s3://{}".format(bucket),
 prefix,
 ExecutionVariables.PIPELINE_EXECUTION_ID,
 "test",
 ],
 )
 ),
 ],
 code="california-housing-preprocessing.py",
)

We use the [SageMaker XGBoost](https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html) built-in algorithm by referring the container image as specified below for training. After configuring the training instance, count and path, we go on to set static values for the hyperparameters that are determined as optimal during our manual run of experiments. The hyperparameter tuning step uses the training and validation datasets to find the best model based on several iterations.

Our aim is to minimize RMSE with this tuning which is defined as the objective metric with the objective type minimize. We also define the output path for the model artifacts from the Hyperparameter Tuning Job.

In [None]:
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

from sagemaker.tuner import ContinuousParameter, HyperparameterTuner
from sagemaker.workflow.steps import TuningStep

model_path = f"s3://{bucket}/{prefix}/{base_job_prefix}/ca-housing-experiment-pipeline"

image_uri = sagemaker.image_uris.retrieve(
 framework="xgboost",
 region=region,
 version="1.2-2",
 py_version="py3",
 instance_type=training_instance_type,
)

xgb_train = Estimator(
 image_uri=image_uri,
 instance_type=training_instance_type,
 instance_count=training_instance_count,
 output_path=model_path,
 base_job_name=f"{base_job_prefix}/ca-housing-train",
 sagemaker_session=sagemaker_session,
 role=role,
)

xgb_train.set_hyperparameters(
 eval_metric="rmse",
 objective="reg:squarederror", # Define the object metric for the training job
 num_round=50,
 max_depth=5,
 eta=0.2,
 gamma=4,
 min_child_weight=6,
 subsample=0.7,
 verbosity=1
)

objective_metric_name = "validation:rmse"

hyperparameter_ranges = {
 "lambda": ContinuousParameter(0.01, 10, scaling_type="Logarithmic")
}

tuner_log = HyperparameterTuner(
 xgb_train,
 objective_metric_name,
 hyperparameter_ranges,
 max_jobs=10,
 max_parallel_jobs=3,
 strategy="Bayesian",
 objective_type="Minimize",
)

tune_step = TuningStep(
 name="HPTuning",
 tuner=tuner_log,
 inputs={
 "train": TrainingInput(
 s3_data=process_step.properties.ProcessingOutputConfig.Outputs[
 "train"
 ].S3Output.S3Uri,
 content_type="text/csv",
 ),
 "validation": TrainingInput(
 s3_data=process_step.properties.ProcessingOutputConfig.Outputs[
 "validation"
 ].S3Output.S3Uri,
 content_type="text/csv",
 ),
 },
 cache_config=cache_config
)

After the training completes, model artifacts for multiple iterations get stored in s3. This step creates a model from the best artifact that was generated in hyperparameter tuning. We can safely assume this model provides the lowest RMSE and highly optimized.

In [None]:
from sagemaker.model import Model
from sagemaker.xgboost import XGBoostPredictor
from sagemaker.workflow.steps import CreateModelStep

## Create model
model_bucket_key = f"{bucket}/{prefix}/{base_job_prefix}/ca-housing-experiment-pipeline"
model_candidate = Model(
 image_uri=image_uri,
 model_data=tune_step.get_top_model_s3_uri(top_k=0, s3_bucket=model_bucket_key),
 sagemaker_session=sagemaker_session,
 role=role,
 predictor_cls=XGBoostPredictor,
)

create_model_step = CreateModelStep(
 name="CreateTopModel",
 model=model_candidate,
 inputs=sagemaker.inputs.CreateModelInput(instance_type="ml.m4.large"),
)

Now, we focus on integrating our pipeline with SageMaker Experiments. We therefore assemble the steps into a pipeline associated with an experiment name. If the experiment already exists, we just add the trial runs to it. If not,we create a new experiment as shown below.

In [None]:
from smexperiments.experiment import Experiment
from smexperiments.trial import Trial
from smexperiments.trial_component import TrialComponent

# create an experiment if it doesnt exist
try:
 demo_experiment = Experiment.load(experiment_name=experiment_name)
 print("existing experiment loaded")
except Exception as ex:
 if "ResourceNotFound" in str(ex):
 demo_experiment = Experiment.create(
 experiment_name=experiment_name,
 description = "Demo experiment",
 tags = [{'Key': 'demo-experiments', 'Value': 'demo1'}]
 )
 print("new experiment created")
 else:
 print(f"Unexpected {ex}, {type(ex)}")
 print("Dont go forward!")
 raise

`PipelineExperimentConfig` allows customers to refer and link an experiment to a specific Pipeline execution 

In [None]:
from sagemaker.workflow.pipeline import PipelineExperimentConfig

pipeline_name = f"CAHousingExperimentsPipeline"

#Pipeline experiment config
ca_housing_experiment_config = PipelineExperimentConfig(
 experiment_name,
 Join(
 on="-",
 values=[
 "pipeline-execution",
 ExecutionVariables.PIPELINE_EXECUTION_ID
 ],
 )
)

Now that we have an experiment config object and Pipeline steps configured, we create a pipeline with a sequence of steps defined above and pass the parameters as below.

In [None]:
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
 name=pipeline_name,
 pipeline_experiment_config=ca_housing_experiment_config,
 parameters=[
 processing_instance_count,
 processing_instance_type,
 training_instance_count,
 training_instance_type
 ],
 steps=[process_step,tune_step,create_model_step],
)

### Run the pipeline to create a model

In [None]:
pipeline.upsert(role_arn=sagemaker.get_execution_role())
execution = pipeline.start()

We wait for the pipeline execution to complete. It should take around ~20 minutes.

In [None]:
%%time 

execution.wait()
execution.describe()

### Explore the results of hyperparameter tuning
Plot the metrics against the hyperparameters.

Get the trial hyperparameters and metrics.

In [None]:
from sagemaker.analytics import ExperimentAnalytics
from smexperiments.search_expression import Filter, Operator, SearchExpression

# SM Pipeline injects the Execution ID into trial component names
execution_id = execution.describe()['PipelineExecutionArn'].split('/')[-1]
source_arn_filter = Filter(
 name="TrialComponentName", operator=Operator.CONTAINS, value=execution_id
)

source_type_filter = Filter(
 name="Source.SourceType", operator=Operator.EQUALS, value="SageMakerTrainingJob"
)

search_expression = SearchExpression(
 filters=[source_arn_filter, source_type_filter]
)

trial_component_analytics = ExperimentAnalytics(
 sagemaker_session=sagemaker_session,
 experiment_name=experiment_name,
 search_expression=search_expression.to_boto()
)
analytic_table = trial_component_analytics.dataframe()
analytic_table.head()

Plot the last validation dataset RMSE value of each trial run against the lambda the run used.

In [None]:
ax = analytic_table.plot.scatter("lambda", "validation:rmse - Last", grid=True)
analytic_table["TrialComponentID"] = [str(int(x.split('-')[3])) for x in analytic_table["TrialComponentName"]]
for _, v in analytic_table[["TrialComponentID", "lambda", "validation:rmse - Last"]].iterrows():
 ax.annotate(v.TrialComponentID, v[1:])

### Clean up

Uncomment the cell below to remove all Experiments, and their associated Trials and TrialsComponents.

In [None]:
#demo_experiment.delete_all(action="--force")

### Next steps

As a next step, plan to use these three Amazon SageMaker features, Amazon SageMaker Studio, Amazon SageMaker Experiments and Amazon SageMaker Pipelines, for your next ML project.