# Amazon SageMaker Workshop
### _**Pipelines**_

---
In this part of the workshop we will all our previous work from the labs and will automate the whole ML workflow. With that we can make the whole process more robust and any updates to the data preparation, modeling, evaluation, inference and monitoring will be put into production faster and more reliable.

---

## Contents

a. [Background](#background) - Getting the work from previous labs.

b. [Create the training pipeline](#Create_pipeline) - featuring [SageMaker Pipelines](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines.html)
1. [Creating data preparation step](#dataprep_step)
2. [Creating training step](#train_step)
3. [Creating evaluation step](#eval_step)
4. [Creating approve and register model steps](#appr_model_reg_step)
5. [Finish the pipeline](#end_creation_pipe)

d. [Create the end-to-end solution automatically](#SM_Projects) - Create end-to-end ML solutions with CI/CD (featuring [SageMaker Projects](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-projects.html))
1. customize the project with our pipeline and code
2. trigger training pipeline
3. trigger deployment pipeline

---


## Background

In the previous labs we created multiple resources to prepare the data (_2-DataPrep_), train the model (_3-Modeling_), evaluate model performance (_4-Evaluation_), deploy and customize inference logic (_4-Deployment/RealTime_) and monitor the deployed model (_5-Monitoring_).

Now it's time to **bring everything together**!

We will create a pipeline with 5 steps:

1. Data preparation
2. Training
3. Evaluation
4. Approve model
5. Save to model registry step

We will build our pipeline iterating little by little.

---

### - if you _skipped_ some/all of the previous labs, follow instructions:

 - **run this [notebook](./config/pre_setup.ipynb)**

---
Load all variables (and modules) for this lab:

In [None]:
%store -r bucket
%store -r prefix
%store -r region
%store -r docker_image_name

In [None]:
bucket, prefix, region, docker_image_name

In [None]:
#Supress default INFO logging
import logging
logger = logging.getLogger()
logger.setLevel(logging.ERROR)

In [None]:
import sagemaker 
role = sagemaker.get_execution_role()
sagemaker_session = sagemaker.session.Session()

---

# Create the training pipeline with SageMaker Pipelines



## 1. Create data preparation step

Get the raw data location and the S3 URI where our code for data preparation was stored:

In [None]:
%store -r s3uri_raw
%store -r s3_dataprep_code_uri
s3uri_raw, s3_dataprep_code_uri

In [None]:
from sagemaker.workflow.steps import (
 ProcessingStep,
 TrainingStep,
)
from sagemaker.processing import (
 ProcessingInput,
 ProcessingOutput,
 ScriptProcessor,
)

This first step will receive some inputs:

In [None]:
from sagemaker.workflow.parameters import (
 ParameterInteger,
 ParameterString,
)

In [None]:
# Parameters for data preparation step
input_data = ParameterString(
 name="InputDataUrl",
 default_value=s3uri_raw # S3 URI where we stored the raw data
)
processing_instance_count = ParameterInteger(
 name="ProcessingInstanceCount", default_value=1
)
processing_instance_type = ParameterString(
 name="ProcessingInstanceType", default_value="ml.m5.xlarge"
)

In [None]:
from my_labs_solutions.dataprep_solution import get_dataprep_processor
sklearn_processor = get_dataprep_processor(processing_instance_type, processing_instance_count, role)
sklearn_processor

In [None]:
# Processing step for feature engineering
step_process = ProcessingStep(
 name="CustomerChurnProcess", # choose any name
 processor=sklearn_processor,
 outputs=[
 ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
 ProcessingOutput(
 output_name="validation", source="/opt/ml/processing/validation"
 ),
 ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
 ],
 code=s3_dataprep_code_uri,
 job_arguments=["--input-data", input_data],
)

## Create the first iteration of the Pipeline

We will create a simple pipeline that receives some inputs and just have 1 data preparation step:

In [None]:
from time import strftime, gmtime
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.step_collections import RegisterModel

You can associate SageMaker Experiments with Pipelines to help track multiple moving pieces (ML hyperparameters, data, artifacts, plots, metrics, etc. - a.k.a. [ML lineage tracking](https://docs.aws.amazon.com/sagemaker/latest/dg/lineage-tracking.html)) 

In [None]:
# Experiment configs
create_date = lambda: strftime("%Y-%m-%d-%H-%M-%S", gmtime())

experiment_name=f"pipeline-customer-churn-prediction-xgboost-{create_date()}"
trial_name=f"pipeline-framework-trial-{create_date()}"
pipeline_name = f"ChurnMLPipeline"

In [None]:
pipeline_experiment_config = PipelineExperimentConfig(
 experiment_name = experiment_name,
 trial_name = trial_name
)

In [None]:
# Pipeline with just input parameters and 1 step for data prep
pipeline = Pipeline(
 name=pipeline_name,
 parameters=[
 input_data,
 processing_instance_type,
 processing_instance_count,
 ],
 steps=[step_process],
 sagemaker_session=sagemaker_session,
 )

In [None]:
# Validate that pipeline was configured correctly and load its definition
import json
json.loads(pipeline.definition())

#### Ok, looks good. Let's create the pipeline:

In [None]:
pipeline.upsert(role_arn=role)

1. Go to the pipeline and see its DAG:



2. Right-click the ChurnMLPipeline -> `Open pipeline details`.

Check its DAG (with just the data prep step:



3. Click on `Parameters` to see the default parameter inputs for a execution:




> Remember that we set the inputs as:
```python
# Parameters for data preparation step
input_data = ParameterString(
 name="InputDataUrl",
 default_value=s3uri_raw # S3 URI where we stored the raw data
)
processing_instance_count = ParameterInteger(
 name="ProcessingInstanceCount", default_value=1
)
processing_instance_type = ParameterString(
 name="ProcessingInstanceType", default_value="ml.m5.xlarge"
)
```

**Let's programatically execute the pipeline with defaults:**


In [None]:
execution = pipeline.start()

In [None]:
execution.describe()

In [None]:
execution.list_steps()

In [None]:
# If we wanted to wait for execution to end:
# execution.wait()


4. Right-click the `Executions` tab:



5. Select the only execution (should be in status "Executing") and double click on it:



6. Wait for a few minutes (for the data preparation step and the SageMaker Processing Job under the hood to finish):



7. If you go to `Experiments and trials` tab you will see that SageMaker Pipelines created an experiment called `churnmlpipeline`.

Also if we select our data prep Processing job, we can see that it correctly created 3 dataset as output: `train`, `validation` and `test`:




---



# 2. Create modeling step

In [None]:
%store -r s3_modeling_code_uri
%store -r train_script_name
s3_modeling_code_uri, train_script_name

In [None]:
from my_labs_solutions.modeling_solution import get_modeling_estimator

xgb_train = get_modeling_estimator(bucket,
 prefix,
 s3_modeling_code_uri, 
 docker_image_name,
 role,
 entry_point_script = train_script_name)
xgb_train

In [None]:
from sagemaker.inputs import TrainingInput

In [None]:
step_train = TrainingStep(
 name="CustomerChurnTrain",
 estimator=xgb_train,
 inputs={
 "train": TrainingInput(
 s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
 "train"
 ].S3Output.S3Uri,
 content_type="text/csv"
 ),
 "validation": TrainingInput(
 s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
 "validation"
 ].S3Output.S3Uri,
 content_type="text/csv"
 )
 }
)

Notice that we can link one step's output to other steps input by accessing the properties:
```python
# Get output from processing step with key `train`
step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri
```

## Create the second iteration of the Pipeline (updating the definition)

We will update the pipeline adding an input parameter for the training Step and also the training Step itself, resulting in a pipeline with 2 step:

In [None]:
# Add an input parameter to define the training instance type
training_instance_type = ParameterString(
 name="TrainingInstanceType", default_value="ml.m5.xlarge"
)

In [None]:
pipeline = Pipeline(
 name=pipeline_name,
 parameters=[
 input_data,
 processing_instance_type,
 processing_instance_count,
 training_instance_type,
 ],
 steps=[step_process, step_train],
 sagemaker_session=sagemaker_session,
)

In [None]:
# Update the pipeline
pipeline.upsert(role_arn=role)

1. If we go to the pipeline and click on the refresh button, we see now its 2 steps and the new input parameter:





2. Now, let's execute the new pipeline in the Studio UI. Click on `Start an execution`:



3. The default input configurations should appear in the Studio UI. Click on `Start`:



4. Refreshing the executions we should see:



5. Click on `View details` or select the execution in the list (status "Executing") and double click:



6. Wait a few minutes to the data prep Processing job and the training job finish. You should see this:



If you click on the training step and select `Outputs` you will also be able to see the final training and validation log losses.

---



# 3. Create evaluation step

In [None]:
from my_labs_solutions.evaluation_solution import get_evaluation_processor
script_eval = get_evaluation_processor(docker_image_name, role)
script_eval

In [None]:
from sagemaker.workflow.properties import PropertyFile

In [None]:
evaluation_report = PropertyFile(
 name="EvaluationReport",
 output_name="evaluation",
 path="evaluation.json",
)

In [None]:
%store -r s3_evaluation_code_uri
s3_evaluation_code_uri

In [None]:
# Processing step for evaluation
step_eval = ProcessingStep(
 name="CustomerChurnEval",
 processor=script_eval,
 inputs=[
 ProcessingInput(
 source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
 destination="/opt/ml/processing/model",
 ),
 ProcessingInput(
 source=step_process.properties.ProcessingOutputConfig.Outputs[
 "test"
 ].S3Output.S3Uri,
 destination="/opt/ml/processing/test",
 ),
 ],
 outputs=[
 ProcessingOutput(
 output_name="evaluation", source="/opt/ml/processing/evaluation"
 ),
 ],
 code=s3_evaluation_code_uri,
 property_files=[evaluation_report],
 )

Now, notice that we get the model from the training step and also the test dataset from the data preparation step:
```python
# Get output model artifact from training step
step_train.properties.ModelArtifacts.S3ModelArtifacts

# Get the test dataset - the output of data preparation step with key `test`
step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri
```

## Create the third iteration of the Pipeline (updating the definition)

We will update the pipeline adding the evaluation step, resulting in a pipeline with 3 step: data prep, training and evaluation.

In [None]:
pipeline = Pipeline(
 name=pipeline_name,
 parameters=[
 input_data,
 processing_instance_type,
 processing_instance_count,
 training_instance_type,
 ],
 steps=[step_process, step_train, step_eval],
 sagemaker_session=sagemaker_session,
)

In [None]:
# Update the pipeline
pipeline.upsert(role_arn=role)

1. If we go to the pipeline and click on the refresh button again:



2. Now, _**let's execute the new pipeline programatically here...**_

> ### Wait! You must be wondering...
> Do I have to keep re-running everything from beginning every time?!

No, you don't. 

**SageMaker Pipelines can cache results from previous step.** Hence, the executions will be a lot faster and you won't keep spending money with steps that would generate in the exact same outputs!

Let's run this 3rd iteration of the pipeline caching both data preparation and training steps:

In [None]:
from sagemaker.workflow.steps import CacheConfig

# Cache for 30 minutes
cache_config = CacheConfig(enable_caching=True, expire_after="T30m")

In [None]:
# Minor change in data preparation steps
step_process = ProcessingStep(
 name="CustomerChurnProcess", # choose any name
 processor=sklearn_processor,
 outputs=[
 ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
 ProcessingOutput(
 output_name="validation", source="/opt/ml/processing/validation"
 ),
 ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
 ],
 code=s3_dataprep_code_uri,
 job_arguments=["--input-data", input_data],
 cache_config=cache_config
)

In [None]:
# Minor change in data training steps
step_train = TrainingStep(
 name="CustomerChurnTrain",
 estimator=xgb_train,
 inputs={
 "train": TrainingInput(
 s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
 "train"
 ].S3Output.S3Uri,
 content_type="text/csv"
 ),
 "validation": TrainingInput(
 s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
 "validation"
 ].S3Output.S3Uri,
 content_type="text/csv"
 )
 },
 cache_config=cache_config
)

In [None]:
# Processing step for evaluation
step_eval = ProcessingStep(
 name="CustomerChurnEval",
 processor=script_eval,
 inputs=[
 ProcessingInput(
 source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
 destination="/opt/ml/processing/model",
 ),
 ProcessingInput(
 source=step_process.properties.ProcessingOutputConfig.Outputs[
 "test"
 ].S3Output.S3Uri,
 destination="/opt/ml/processing/test",
 ),
 ],
 outputs=[
 ProcessingOutput(
 output_name="evaluation", source="/opt/ml/processing/evaluation"
 ),
 ],
 code=s3_evaluation_code_uri,
 property_files=[evaluation_report],
 cache_config=cache_config
 )

Update the pipeline definition and pipelines with the cache configuration of 30 min for all 3 steps:

In [None]:
pipeline = Pipeline(
 name=pipeline_name,
 parameters=[
 input_data,
 processing_instance_type,
 processing_instance_count,
 training_instance_type,
 ],
 steps=[step_process, step_train, step_eval],
 sagemaker_session=sagemaker_session,
)

# Update the pipeline
pipeline.upsert(role_arn=role)

In [None]:
execution = pipeline.start()

Ok, so you should see:



This is the first execution with the cache configuration (we will have to wait one more time).

3. Wait a few minutes to the data prep step, the training step and evaluation step finish:




---



# 4. Create approve model and save to model registry steps

In [None]:
from sagemaker.workflow.conditions import (
 ConditionGreaterThanOrEqualTo,
)
from sagemaker.workflow.condition_step import (
 ConditionStep,
 JsonGet,
)
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.model_metrics import (
 MetricsSource,
 ModelMetrics,
)

Add new input parameter for the model registration step:

In [None]:
model_approval_status = ParameterString(
 name="ModelApprovalStatus",
 default_value="PendingManualApproval", # ModelApprovalStatus can be set to a default of "Approved" if you don't want manual approval.
)

Create register model step:

In [None]:
# Model metrics that will be associated with RegisterModel step
'''
model_metrics = ModelMetrics(
 model_statistics=MetricsSource(
 s3_uri="{}/evaluation.json".format(
 step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"][
 "S3Uri"
 ]
 ),
 content_type="application/json",
 )
)
'''

In [None]:
model_package_group_name="CustomerChurnPackageGroup"

# Register model step that will be conditionally executed
step_register = RegisterModel(
 name="CustomerChurnRegisterModel",
 estimator=xgb_train,
 model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
 content_types=["text/csv"],
 response_types=["text/csv"],
 inference_instances=["ml.t2.medium", "ml.m5.large"],
 transform_instances=["ml.m5.large"],
 model_package_group_name=model_package_group_name,
 approval_status=model_approval_status,
 #model_metrics=model_metrics,
)

Create condition step for **accuracy above 0.8**:

In [None]:
# Condition step for evaluating model quality and branching execution
cond_lte = ConditionGreaterThanOrEqualTo( # You can change the condition here
 left=JsonGet(
 step=step_eval,
 property_file=evaluation_report,
 json_path="binary_classification_metrics.accuracy.value", # This should follow the structure of your report_dict defined in the evaluate.py file.
 ),
 right=0.8, # You can change the threshold here
)
step_cond = ConditionStep(
 name="CustomerChurnAccuracyCond",
 conditions=[cond_lte],
 if_steps=[step_register],
 else_steps=[],
)

## Create the forth and final iteration of the Pipeline (updating the definition)

We will update the pipeline the final approve model (contidion) and save model steps, resulting in a pipeline with 5 steps: data prep, training, evaluation, approval, save to registry steps.

In [None]:
pipeline = Pipeline(
 name=pipeline_name,
 parameters=[
 input_data,
 processing_instance_type,
 processing_instance_count,
 training_instance_type,
 model_approval_status,
 ],
 steps=[step_process, step_train, step_eval, step_cond],
 sagemaker_session=sagemaker_session,
 )

In [None]:
pipeline.upsert(role_arn=role)

Let's start final execution:

In [None]:
execution = pipeline.start()


 
# 5. End of pipeline creation

With the caches all should be faster now.

Let's get the final result of the pipeline. Read evaluation report:

In [None]:
evaluation_json = sagemaker.s3.S3Downloader.read_file("{}/evaluation.json".format(
 step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
))
json.loads(evaluation_json)

Ok, so you should see in the execution:



If we go to the register model step we can see that it was approved (accuracy was 0.95 > 0.8):


Since the model was approved it was saved in the **SageMaker Model Registry**:



Select our model group:



Open its details:



Select the model that was saved in the Model Registry:




Here we can see the model details (if you click on the metrics you can visualize the auc and accuracy metrics).



Now, we can manually approve the model:



After selecting approve and click on `Update status`, the model will be updated:



We also can see the metrics to this model we just approved:



and its details:



#### Run again with caches and changing input parameters:

In [None]:
# Obs.: If we want to override the input parameters with other ones:

execution = pipeline.start(
 parameters=dict(
 ProcessingInstanceType="ml.c5.xlarge",
 ModelApprovalStatus="Approved", # Would approve automatically
 )
)

Now with the cache everything will be even faster (check the `Elapsed time`):




Since we overrode the input `ModelApprovalStatus` to "Approved", this time model will be approved automatically and saved to the Model Registry:



Let's compare the models. Just select both, right-click and then choose `Compare models`:



Obviously both executions were identical and the 2 models have the same metrics:



In [None]:
# # Obs.: if we wanted to stop pipeline execution:
# execution.stop()

In [None]:
# # Obs.: if we wanted to delete the whole pipeline:
# pipeline.delete()

Let's put the whole pipeline code into a python script:

In [None]:
%%writefile my_labs_solutions/pipeline_definition.py
import os
import json
from time import strftime, gmtime

import sagemaker
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import (
 ProcessingStep,
 TrainingStep,
)
from sagemaker.processing import (
 ProcessingInput,
 ProcessingOutput,
 ScriptProcessor,
)
from sagemaker.workflow.parameters import (
 ParameterInteger,
 ParameterString,
)

from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.steps import CacheConfig
from sagemaker.workflow.conditions import (
 ConditionGreaterThanOrEqualTo,
)
from sagemaker.workflow.condition_step import (
 ConditionStep,
 JsonGet,
)
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.model_metrics import (
 MetricsSource,
 ModelMetrics,
)

from .dataprep_solution import get_dataprep_processor
from .modeling_solution import get_modeling_estimator
from .evaluation_solution import get_evaluation_processor

BASE_DIR = os.path.dirname(os.path.realpath(__file__))

def get_my_solutions_vars():
 vars_path = os.path.join(".", "pipelines", "my_labs_solutions", "my-solution-vars.json")

 with open(vars_path, "rb") as f:
 my_vars = json.loads(f.read())
 
 return my_vars

def get_pipeline(region,
 role=None,
 default_bucket=None,
 model_package_group_name="MLOpsCustomerChurnPackageGroup", # Choose any name
 pipeline_name="MLOpsFinalChurnMLPipeline", # You can find your pipeline name in the Studio UI (project -> Pipelines -> name)
 base_job_prefix="CustomerChurn", # Choose any name
 ) -> Pipeline:
 
 # Get config vars
 my_vars = get_my_solutions_vars()
 bucket = my_vars["bucket"]
 prefix = my_vars["prefix"]
 region = my_vars["region"]
 docker_image_name = my_vars["docker_image_name"]
 s3uri_raw = my_vars["s3uri_raw"]
 s3_dataprep_code_uri = my_vars["s3_dataprep_code_uri"]
 s3_modeling_code_uri = my_vars["s3_modeling_code_uri"]
 train_script_name = my_vars["train_script_name"]
 s3_evaluation_code_uri = my_vars["s3_evaluation_code_uri"]
 role = my_vars["role"]

 sagemaker_session = sagemaker.session.Session()

 # Parameters for data preparation step
 input_data = ParameterString(
 name="InputDataUrl",
 default_value=s3uri_raw # S3 URI where we stored the raw data
 )
 processing_instance_count = ParameterInteger(
 name="ProcessingInstanceCount", default_value=1
 )
 processing_instance_type = ParameterString(
 name="ProcessingInstanceType", default_value="ml.m5.xlarge"
 )

 # Add an input parameter to define the training instance type
 training_instance_type = ParameterString(
 name="TrainingInstanceType", default_value="ml.m5.xlarge"
 )
 model_approval_status = ParameterString(
 name="ModelApprovalStatus",
 default_value="PendingManualApproval", # ModelApprovalStatus can be set to a default of "Approved" if you don't want manual approval.
 )


 # Cache for 30 minutes
 cache_config = CacheConfig(enable_caching=True, expire_after="T30m")

 sklearn_processor = get_dataprep_processor(processing_instance_type, processing_instance_count, role)

 # Processing step for feature engineering
 step_process = ProcessingStep(
 name="CustomerChurnProcess", # choose any name
 processor=sklearn_processor,
 outputs=[
 ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
 ProcessingOutput(
 output_name="validation", source="/opt/ml/processing/validation"
 ),
 ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
 ],
 code=s3_dataprep_code_uri,
 job_arguments=["--input-data", input_data],
 cache_config=cache_config
 )


 xgb_train = get_modeling_estimator(bucket,
 prefix,
 s3_modeling_code_uri, 
 docker_image_name,
 role,
 entry_point_script = train_script_name)


 step_train = TrainingStep(
 name="CustomerChurnTrain",
 estimator=xgb_train,
 inputs={
 "train": TrainingInput(
 s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
 "train"
 ].S3Output.S3Uri,
 content_type="text/csv"
 ),
 "validation": TrainingInput(
 s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
 "validation"
 ].S3Output.S3Uri,
 content_type="text/csv"
 )
 },
 cache_config=cache_config
 ) 


 evaluation_report = PropertyFile(
 name="EvaluationReport",
 output_name="evaluation",
 path="evaluation.json",
 )

 script_eval = get_evaluation_processor(docker_image_name, role)

 # Processing step for evaluation
 step_eval = ProcessingStep(
 name="CustomerChurnEval",
 processor=script_eval,
 inputs=[
 ProcessingInput(
 source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
 destination="/opt/ml/processing/model",
 ),
 ProcessingInput(
 source=step_process.properties.ProcessingOutputConfig.Outputs[
 "test"
 ].S3Output.S3Uri,
 destination="/opt/ml/processing/test",
 ),
 ],
 outputs=[
 ProcessingOutput(
 output_name="evaluation", source="/opt/ml/processing/evaluation"
 ),
 ],
 code=s3_evaluation_code_uri,
 property_files=[evaluation_report],
 cache_config=cache_config
 )


 # Model metrics that will be associated with RegisterModel step
 model_metrics = ModelMetrics(
 model_statistics=MetricsSource(
 s3_uri="{}/evaluation.json".format(
 step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"][
 "S3Uri"
 ]
 ),
 content_type="application/json",
 )
 )

 #model_package_group_name="CustomerChurnPackageGroup"

 # Register model step that will be conditionally executed
 step_register = RegisterModel(
 name="CustomerChurnRegisterModel",
 estimator=xgb_train,
 model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
 content_types=["text/csv"],
 response_types=["text/csv"],
 inference_instances=["ml.t2.medium", "ml.m5.large"],
 transform_instances=["ml.m5.large"],
 model_package_group_name=model_package_group_name,
 approval_status=model_approval_status,
 model_metrics=model_metrics,
 )


 # Condition step for evaluating model quality and branching execution
 cond_lte = ConditionGreaterThanOrEqualTo( # You can change the condition here
 left=JsonGet(
 step=step_eval,
 property_file=evaluation_report,
 json_path="binary_classification_metrics.accuracy.value", # This should follow the structure of your report_dict defined in the evaluate.py file.
 ),
 right=0.8, # You can change the threshold here
 )
 step_cond = ConditionStep(
 name="CustomerChurnAccuracyCond",
 conditions=[cond_lte],
 if_steps=[step_register],
 else_steps=[],
 )



 # Experiment configs
 create_date = lambda: strftime("%Y-%m-%d-%H-%M-%S", gmtime())

 experiment_name=f"pipeline-customer-churn-prediction-xgboost-{create_date()}"
 trial_name=f"pipeline-framework-trial-{create_date()}"

 pipeline_experiment_config = PipelineExperimentConfig(
 experiment_name = experiment_name,
 trial_name = trial_name
 )


 pipeline = Pipeline(
 name=pipeline_name,
 parameters=[
 input_data,
 processing_instance_type,
 processing_instance_count,
 training_instance_type,
 model_approval_status,
 ],
 steps=[step_process, step_train, step_eval, step_cond],
 sagemaker_session=sagemaker_session,
 )
 
 return pipeline



# Customizing the Build/Train/Deploy MLOps Project Template

SageMaker Projects introduce MLOps templates that automatically provision the underlying resources needed to enable 
CI/CD capabilities for your Machine Learning Development Lifecycle (MLDC). Customers can use a number of built-in 
templates or create your own custom templates.

This workshop we will use one of the **pre-built MLOps templates** to bootstrap your ML project and establish a CI/CD 
pattern from seed code.

### MLOps Template for Build, Train, and Deploy

> Imagine now that you are a data scientist that just joined the company. You need to get access to the ML resources.

To get started with SageMaker Projects, [they must be first enabled in the SageMaker Studio console](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-projects-studio-updates.html). 
This can be done for existing users or while creating new ones:



Within Amazon SageMaker Studio, you can now select “Projects” from a drop-down menu on the “Components and registries” 
tab as shown below:



From the projects page you’ll have the option to launch a pre-configured SageMaker MLOps template. Click on `Create project` and we'll select the build, train and deploy template:



Name the project `ChurnProject`.

> NOTE: Launching this template will kick off a model building pipeline by default and will train a regression model. This will incur a small cost.

Once the project is created from the MLOps template, the following architecture will be deployed:




## Modifying IAM Role for Code Build

Attach AdminRole on Code Build Role

## Modifying the Seed Code for Custom Use Case

After your project has been created the architecture shown above will be deployed and the visualization of the 
Pipeline will be available in the “Pipelines” drop down menu within SageMaker Studio.

In order to modify the seed code from this launched template, we’ll first need to clone the AWS CodeCommit 
repositories to our local SageMaker Studio instance. From the list of projects, select the one that was just 
created. Under the “Repositories” tab you can select the hyperlinks to locally clone the AWS CodeCommit repos:




### Clone the `...modelbuild` repo (click on `clone repo...`)

The SageMaker project template will create this repositories.

In the `...-modelbuild` repository there's the code for preprocessing, training, and evaluating the model. This pre-built template includes another example for a regression model related to the [UCI Abalone dataset](https://archive.ics.uci.edu/ml/datasets/abalone):




**In our case we want to create a pipeline for predicting Churn (previous labs).** We can modify these files in order to solve our own customer churn use-case.

---

### Modifying the code for the Churn problem

This is the sample structure of the Project (Abalone):




#### Let's use everything we just built:

In the `...modelbuild` repo:

1. replace `codebuild-buildspec.yml` in your current Studio project (Abalone) with the one found in [modelbuild/codebuild-buildspec.yml](modelbuild/codebuild-buildspec.yml) (Churn)

The final `codebuild-buildspec.yml` should be this one (with the comment at the top 1st line)




2. go to `pipelines`. Delete the `abalone` directory. 




3. Cut `my_labs_solutions` directory and paste it to the `...modelbuild/pipelines` repo.





In the end the `...modelbuild` repo should look like this:



## Trigger a new training Pipeline Execution through git commit

By committing these changes to the AWS CodeCommit repository (easily done in SageMaker Studio source control tab), a 
new Pipeline execution will be triggered since there is an EventBridge monitoring for commits. After a few moments, 
we can monitor the execution by selecting your Pipeline inside of the SageMaker Project.

Go to the directory of the `...modelbuild/pipelines` repo. Click on the git symbol:



This triggers the pipelines for training. Go to our `“Pipelines”` tab inside of the SageMaker Project. Click on our only pipeline. And you'll see:



Select the most recent execution:




## Trigger the ModelDeploy Pipeline

Once the train pipeline is completed, we can go to our `“Model groups”` tab inside of the SageMaker Project and inspect the metadata attached to the model artifacts. If everything looks good, we can manually approve the model:





This approval will trigger the ModelDeploy pipeline (in CodePipeline):



After we deploy to a staging environment and run some tests, we will have to **approve the deployment to production** by approving in the `ApproveDeployment` stage:





Finally, if we go back to Studio, we will see the Production endpoint for real time inference.



---