## Module 4: Build End-to-end Machine Learning Pipeline on SageMaker

MLOps (Machine Learning Operations) is important because it enables the seamless deployment, monitoring, and management of machine learning models in production. It involves a combination of DevOps, DataOps, and ML engineering practices to build and deploy machine learning models at scale.

One important aspect of MLOps is the use of an orchestration pipeline, which helps to automate the process of building, testing, and deploying machine learning models. An orchestration pipeline provides a framework for managing the entire machine learning workflow, from data preparation to model deployment and monitoring.

SageMaker pipeline provides a fully-managed continuous integration and continuous delivery (CI/CD) service for building, training, and deploying machine learning models. It helps address some of the operational challenges in MLOps by providing a visual interface to create and manage machine learning workflows, automating common tasks like data preprocessing and model training, and enabling easy model deployment and monitoring.

In this module, we are going to convert the manually executed notebook in module 3 into a fully automated ML pipeline using SageMaker Pipelines.

![Pipeline Image](statics/module_04_pipeline01.png)

**If you DID NOT run the previous modules, please run [0_setup.ipynb notebook](0_setup.ipynb) first before running this notebook**

**This Demo is optimized for SageMaker Studio using Studio notebook in Data Science Kernel**

In [None]:
!pip install -Uq pip --quiet

!pip install -Uq awswrangler sagemaker boto3 --quiet

Import Libraries we need

In [None]:
import json
import boto3
import pandas as pd
import string

from sagemaker.processing import (
 ProcessingInput,
 ProcessingOutput,
)
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.xgboost import XGBoostProcessor

from sagemaker.workflow.steps import (
 ProcessingStep, 
 TrainingStep, 
 CacheConfig, 
 CreateModelStep
)

import sagemaker

from sagemaker.workflow.conditions import (
 ConditionGreaterThanOrEqualTo,
)
from sagemaker.workflow.condition_step import ConditionStep, JsonGet
# from sagemaker.workflow.functions import JsonGet

from sagemaker.xgboost.estimator import XGBoost
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.properties import PropertyFile
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.pipeline_context import PipelineSession

Define global parameters

In [None]:
sagemaker_session = sagemaker.Session()

region = sagemaker_session.boto_region_name
sagemaker_role = sagemaker.get_execution_role()

bucket = sagemaker_session.default_bucket()

s3_client = boto3.client("s3", region_name=region)
sagemaker_client = boto3.client("sagemaker")

%store -r fg_name

%store -r prefix

In [None]:
# ======> variables used for parameterizing the notebook run
flow_instance_count = 1
flow_instance_type = "ml.m5.4xlarge"

dataprep_instance_count = 1
dataprep_instance_type = "ml.m5.xlarge"

train_instance_count = 1
train_instance_type = "ml.m5.xlarge"

eval_instance_count = 1
eval_instance_type = "ml.m5.xlarge"

deploy_instance_count = 1
deploy_instance_type = "ml.m5.xlarge"

cache_config = CacheConfig(enable_caching=True, expire_after="PT1H")

## Architecture: Create a SageMaker Pipeline to Automate All the Steps from Data Prep to Model Deployment
----

## Breaking down the process into steps

- [Step 1: Data Wrangler Preprocessing Step](#Step-1:-Data-Wrangler-Preprocessing-Step)
- [Step 2: Create Dataset and Train/Test Split](#Step-2:-Create-Dataset-and-Train/Test-Split)
- [Step 3: Train XGBoost Model](#Step-3:-Train-XGBoost-Model)
- [Step 4: Model Evaluation](#Step-4:-Model-Evaluation)
- [Step 5: Register Model](#Step-5:-Register-Model)
- [Step 6: Deploy Model](#Step-6:-Deploy-Model)
- [Step 7: Combine and Run the Pipeline Steps](#Step-7:-Combine-and-Run-the-Pipeline-Steps)

### Define Pipeline Parameters
Pipeline parameters are variables that can be defined and used within a SageMaker pipeline to enable dynamic configuration of pipeline components and their execution.

In [None]:
from sagemaker.workflow.parameters import ParameterInteger, ParameterFloat, ParameterString

train_instance_param = ParameterString(
 name="TrainingInstance",
 default_value="ml.m5.xlarge",
)

model_approval_status = ParameterString(
 name="ModelApprovalStatus", default_value="PendingManualApproval"
)

In [None]:
cell5g_s3_key = f"{prefix}/data/raw/5gcell.csv"

s3_client.upload_file(
 Filename="data/5gcell.csv", Bucket=bucket, Key=cell5g_s3_key
)

## Step 1: Data Wrangler Preprocessing Step

### Update attributes within the `.flow` file 
Data Wrangler will generate a .flow file. It contains a reference to an S3 bucket used during the Wrangling. This may be different from the one you have as a default in this notebook eg if the Wrangling was done by someone else, you will probably not have access to their bucket and you now need to point to your own S3 bucket so you can actually load the .flow file into Data Wrangler or access the data.

After running the cell below you can open the `5gcell.flow` file and export the data to S3

In [None]:
template_file = "pipeline/5gcell_flow_template_2"

with open(template_file, "r") as f:
 template = f.read()
 template = template.replace('${bucket}', bucket)
 template = template.replace('${prefix}', prefix)
 flow = json.loads(template)

flow_file = "pipeline/5gcell.flow"
with open(flow_file, "w") as f:
 json.dump(flow, f)

#### Upload flow to S3
This will become an input to the first step and, as such, needs to be in S3.

In [None]:
flow_key = f"{prefix}/dataprep-notebooks/{flow_file.split('/')[-1]}"

s3_client.upload_file(
 Filename=flow_file, Bucket=bucket, Key=flow_key
)
flow_uri = f"s3://{bucket}/{flow_key}"
print(f"Flow file uploaded: {flow_uri}")

#### Define the first Data Wrangler step's inputs

In [None]:
with open(flow_file, "r") as f:
 flow = json.load(f)

flow_step_inputs = []

#input data location in the process container
processing_dir='/opt/ml/processing'

# flow file contains the code for each transformation
flow_file_input = sagemaker.processing.ProcessingInput(
 source=flow_uri, destination=f"{processing_dir}/flow", input_name="flow"
)

flow_step_inputs.append(flow_file_input)

# parse the flow file for S3 inputs to Data Wranger job
for node in flow["nodes"]:
 if "dataset_definition" in node["parameters"]:
 data_def = node["parameters"]["dataset_definition"]
 name = data_def["name"]
 s3_input = sagemaker.processing.ProcessingInput(
 source=data_def["s3ExecutionContext"]["s3Uri"],
 destination=f"{processing_dir}/{name}",
 input_name=name,
 )
 flow_step_inputs.append(s3_input)

#### Define outputs for first Data Wranger step

In [None]:
output_name = (
 f"{flow['nodes'][-1]['node_id']}.{flow['nodes'][-1]['outputs'][0]['name']}"
)

flow_step_outputs = []

flow_output = sagemaker.processing.ProcessingOutput(
 output_name=output_name,
 feature_store_output=sagemaker.processing.FeatureStoreOutput(feature_group_name=fg_name),
 app_managed=True,
)

flow_step_outputs.append(flow_output)

#### Define processor and processing step

In [None]:
# Pulls the latest data-wrangler container tag, i.e. "1.x"
# The latest tested container version was "1.11.0"
image_uri = sagemaker.image_uris.retrieve(framework="data-wrangler", region=region)

print("image_uri: {}".format(image_uri))

flow_processor = sagemaker.processing.Processor(
 role=sagemaker_role,
 image_uri=image_uri,
 instance_count=flow_instance_count,
 instance_type=flow_instance_type,
 max_runtime_in_seconds=86400,
)

# Output configuration used as processing job container arguments
output_config = {output_name: {"content_type": "CSV"}}

flow_step = ProcessingStep(
 name="DataWranglerProcessingStep",
 processor=flow_processor,
 inputs=flow_step_inputs,
 outputs=flow_step_outputs,
 job_arguments=[f"--output-config '{json.dumps(output_config)}'"],
 cache_config=cache_config
)

### Step 2: Create Dataset and Train/Test Split

In [None]:
create_dataset_script = "pipeline/create_dataset.py"
create_dataset_key = f"{prefix}/code/{create_dataset_script.split('/')[-1]}"

s3_client.upload_file(
 Filename=create_dataset_script, Bucket=bucket, Key=create_dataset_key
)

create_dataset_script_uri = f"s3://{bucket}/{create_dataset_key}"

create_dataset_processor = SKLearnProcessor(
 framework_version="0.23-1",
 role=sagemaker_role,
 instance_type=dataprep_instance_type,
 instance_count=dataprep_instance_count,
 base_job_name=f"{prefix}-dataprep",
 sagemaker_session=sagemaker_session,
)

create_dataset_step = ProcessingStep(
 name="CreateDataset",
 processor=create_dataset_processor,
 outputs=[
 sagemaker.processing.ProcessingOutput(
 output_name="train_data", source="/opt/ml/processing/output/train"
 ),
 sagemaker.processing.ProcessingOutput(
 output_name="test_data", source="/opt/ml/processing/output/test"
 ),
 ],
 job_arguments=[
 "--feature-group-name",
 fg_name,
 "--bucket-name",
 bucket,
 "--bucket-prefix",
 prefix,
 "--region",
 region,
 ],
 code=create_dataset_script_uri,
 depends_on=[flow_step.name],
 cache_config=cache_config
)

### Step 3: Train XGBoost Model
In this step we use the ParameterString `train_instance_param` defined at the beginning of the pipeline.

In [None]:
hyperparameters = {
 "max_depth": "3",
 "eta": "0.2",
 "objective": "binary:logistic",
 "num_round": "100",
 "region":region
}

training_job_output_path = f"s3://{bucket}/{prefix}/training_jobs"

xgb_estimator = XGBoost(
 entry_point="xgboost_starter_script.py",
 source_dir="pipeline/code",
 output_path=training_job_output_path,
 code_location=training_job_output_path,
 hyperparameters=hyperparameters,
 role=sagemaker_role,
 instance_count=train_instance_count,
 instance_type=train_instance_type, 
 framework_version="1.5-1",
)

train_step = TrainingStep(
 name="XgboostTrain",
 estimator=xgb_estimator,
 inputs={
 "train": sagemaker.inputs.TrainingInput(
 s3_data=create_dataset_step.properties.ProcessingOutputConfig.Outputs[
 "train_data"
 ].S3Output.S3Uri
 )
 },
 cache_config=cache_config
)

#### Create SageMaker Model

In [None]:
model = sagemaker.model.Model(
 name="XgboostAnomalyModel",
 image_uri=train_step.properties.AlgorithmSpecification.TrainingImage,
 model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
 sagemaker_session=sagemaker_session,
 role=sagemaker_role,
)

inputs = sagemaker.inputs.CreateModelInput(instance_type=deploy_instance_type)

create_model_step = CreateModelStep(name="XgboostModel", model=model, inputs=inputs)

### Step 4: Model Evaluation

In [None]:
model_eval_script = "pipeline/model_eval.py"
model_eval_key = f"{prefix}/code/{model_eval_script.split('/')[-1]}"

s3_client.upload_file(
 Filename=model_eval_script, Bucket=bucket, Key=model_eval_key
)

model_eval_script_uri = f"s3://{bucket}/{model_eval_key}"

eval_processor = XGBoostProcessor(
 framework_version='1.5-1',
 role=sagemaker_role,
 instance_type=eval_instance_type,
 instance_count=eval_instance_count,
 base_job_name=f'{prefix}-eval',
 sagemaker_session=PipelineSession(),
)

step_args = eval_processor.run(
 code="pipeline/model_eval.py",
 inputs=[ProcessingInput(source=train_step.properties.ModelArtifacts.S3ModelArtifacts, 
 destination="/opt/ml/processing/model"),
 ProcessingInput(source=create_dataset_step.properties.ProcessingOutputConfig.Outputs["test_data"].S3Output.S3Uri, 
 destination="/opt/ml/processing/input/test")
 ],
 outputs=[
 ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
 ],
 arguments=["--model-file", "model.tar.gz", "--cutoff-threshold", "0.4"],
)


evaluation_report = PropertyFile(
 name="EvaluationReport",
 output_name="evaluation",
 path="evaluation.json",
)

eval_step = ProcessingStep(
 name=f"ModelEval",
 step_args=step_args,
 property_files=[evaluation_report],
 cache_config=cache_config
)

### Step 5: Register Model
In this step you will use the ParameterString `model_approval_status` defined at the outset of the pipeline code.

In [None]:
model_metrics = ModelMetrics(
 model_statistics=MetricsSource(
 s3_uri="{}/evaluation.json".format(
 eval_step.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"][
 "S3Uri"
 ]
 ),
 content_type="application/json",
 )
)

register_step = RegisterModel(
 name="Xgboost",
 estimator=xgb_estimator,
 model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
 content_types=["text/csv"],
 response_types=["text/csv"],
 inference_instances=["ml.m5.xlarge"],
 transform_instances=["ml.m5.xlarge"],
 model_package_group_name=f"{prefix}-model",
 approval_status=model_approval_status,
 model_metrics=model_metrics,
)

In [None]:
# Condition step for evaluating model quality and branching execution
cond_lte = ConditionGreaterThanOrEqualTo( # You can change the condition here
 left=JsonGet(
 step=eval_step,
 property_file=evaluation_report,
 json_path="multiclass_classification_metrics.accuracy.value", # This should follow the structure of your report_dict defined in the evaluate.py file.
 ),
 right=0.8, # You can change the threshold here
)
cond_step = ConditionStep(
 name="AccuracyCond",
 conditions=[cond_lte],
 if_steps=[register_step],
 else_steps=[],
)

### Step 6: Deploy Model

In [None]:
model_deploy_script = "pipeline/deploy_model.py"
model_deploy_key = f"{prefix}/code/{model_deploy_script.split('/')[-1]}"

s3_client.upload_file(
 Filename=model_deploy_script, Bucket=bucket, Key=model_deploy_key
)

model_deploy_script_uri = f"s3://{bucket}/{model_deploy_key}"

deploy_model_processor = SKLearnProcessor(
 framework_version="0.23-1",
 role=sagemaker_role,
 instance_type=deploy_instance_type,
 instance_count=deploy_instance_count,
 base_job_name=f"{prefix}-deploy",
 sagemaker_session=sagemaker_session,
)

deploy_step = ProcessingStep(
 name="DeployModel",
 processor=deploy_model_processor,
 job_arguments=[
 "--model-name",
 create_model_step.properties.ModelName,
 "--region",
 region,
 "--endpoint-instance-type",
 deploy_instance_type,
 "--endpoint-name",
 f"{prefix}-endpoint",
 ],
 code=model_deploy_script_uri,
)

### Step 7: Combine and Run the Pipeline Steps

In [None]:
pipeline = Pipeline(
 name=prefix,
 parameters=[model_approval_status],
 steps=[
 flow_step,
 create_dataset_step,
 train_step,
 create_model_step,
 eval_step,
 cond_step,
 deploy_step
 ],
)

### Submit the pipeline definition to SageMaker
Note: If an existing pipeline has the same name it will be overwritten.

In [None]:
pipeline.upsert(role_arn=sagemaker_role)

In [None]:
start_response = pipeline.start()
# start_response.wait(delay=60, max_attempts=500)
start_response.describe()

## Clean Up
----
After running the demo, you should remove the resources which were created. You can also delete all the objects in the project's S3 directory by passing the keyword argument `delete_s3_objects=True`.

In [None]:
from delete_pipeline import delete_pipeline_resources

In [None]:
delete_pipeline_resources(
 sagemaker_boto_client=sagemaker_client,
 pipeline_name=prefix,
 mpg_name=f"{prefix}-model",
 prefix=prefix,
 delete_s3_objects=True,
 bucket_name=bucket,
)