# SageMaker Pipelines workshop

This notebook works well with the `Python 3 (Data Science)` kernel on SageMaker Studio.

---

---

This workshop is based on the [amazon sagemaker drift detection project](https://github.com/aws-samples/amazon-sagemaker-drift-detection) on github, available in the aws-samples repository. 

Amazon SageMaker Model Building Pipelines offers machine learning (ML) application developers and operations engineers the ability to orchestrate SageMaker jobs and author reproducible ML pipelines. It also enables them to deploy custom-build models for inference in real-time with low latency, run offline inferences with Batch Transform, and track lineage of artifacts. They can institute sound operational practices in deploying and monitoring production workflows, deploying model artifacts, and tracking artifact lineage through a simple interface, adhering to safety and best practice paradigms for ML application development.

The SageMaker Pipelines service supports a SageMaker Pipeline domain specific language (DSL), which is a declarative JSON specification. This DSL defines a directed acyclic graph (DAG) of pipeline parameters and SageMaker job steps. The SageMaker Python Software Developer Kit (SDK) streamlines the generation of the pipeline DSL using constructs that engineers and scientists are already familiar with.

## SageMaker Pipelines

SageMaker Pipelines supports the following activities, which are demonstrated in this notebook:

* Pipelines - A DAG of steps and conditions to orchestrate SageMaker jobs and resource creation.
* Processing job steps - A simplified, managed experience on SageMaker to run data processing workloads, such as feature engineering, data validation, model evaluation, and model interpretation.
* Training job steps - An iterative process that teaches a model to make predictions by presenting examples from a training dataset.
* Conditional execution steps - A step that provides conditional execution of branches in a pipeline.
* Register model steps - A step that creates a model package resource in the Model Registry that can be used to create deployable models in Amazon SageMaker.
* Create model steps - A step that creates a model for use in transform steps or later publication as an endpoint.
* Transform job steps - A batch transform to preprocess datasets to remove noise or bias that interferes with training or inference from a dataset, get inferences from large datasets, and run inference when a persistent endpoint is not needed.
* Parametrized Pipeline executions - Enables variation in pipeline executions according to specified parameters.

## Notebook Overview

This notebook shows how to:

* Define a set of Pipeline parameters that can be used to parametrize a SageMaker Pipeline.
* Define a Processing step that performs cleaning, feature engineering, and splitting the input data into train and test data sets.
* Define a Training step that trains a model on the preprocessed train data set.
* Define a Processing step that evaluates the trained model's performance on the test dataset.
* Define a Create Model step that creates a model from the model artifacts used in training.
* Define a Transform step that performs batch transformation based on the model that was created.
* Define a Register Model step that creates a model package from the estimator and model artifacts used to train the model.
* Define a Conditional step that measures a condition based on output from prior steps and conditionally executes other steps.
* Define and create a Pipeline definition in a DAG, with the defined parameters and steps.
* Start a Pipeline execution and wait for execution to complete.
* Download the model evaluation report from the S3 bucket for examination.
* Start a second Pipeline execution.

We will consider 3 teams in this scenario:
* Engineering team - ML or data engineering
* Data science team - developing the models
* Dev ops team - automating and integrating the pipeline

## A SageMaker Pipeline

The pipeline that you create follows a typical machine learning (ML) application pattern of preprocessing, training, evaluation, model creation, batch transformation, and model registration:

![A typical ML Application pipeline](https://raw.githubusercontent.com/aws/amazon-sagemaker-examples/a78c7255cbe7892408d8ea2b15a7a2117703befc/sagemaker-pipelines/tabular/abalone_build_train_deploy/img/pipeline-full.png)

In [None]:
import numpy as np 
import pandas as pd 
import json
import os

import boto3
import sagemaker
import sagemaker.session
from sagemaker import get_execution_role

from sagemaker.estimator import Estimator
from sagemaker.debugger import Rule, rule_configs
from sagemaker.inputs import TrainingInput
from sagemaker.model_monitor.dataset_format import DatasetFormat
from sagemaker.model_metrics import (
 MetricsSource,
 ModelMetrics,
)
from sagemaker.processing import (
 ProcessingInput,
 ProcessingOutput,
 Processor,
 ScriptProcessor,
)
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.s3 import S3Uploader
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

from sagemaker.workflow.parameters import (
 ParameterInteger,
 ParameterString,
)
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.steps import (
 ProcessingStep,
 TrainingStep,
 CacheConfig,
)
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.utils import name_from_base

In [None]:
role = get_execution_role()

region = boto3.Session().region_name

sagemaker_session = sagemaker.Session()
pipeline_session = sagemaker.workflow.pipeline_context.PipelineSession()

base_job_prefix = 'Demo-random-forest-dm-pipeline'

bucket=sagemaker.Session().default_bucket()
lab_pipelines_prefix = f"{base_job_prefix}/demo-pipelines"


## Dataset
We shall reuse the data [direct marketing dataset](https://sagemaker-sample-data-us-west-2.s3-us-west-2.amazonaws.com/autopilot/direct_marketing/bank-additional.zip) downloaded from notebook [F1-xgboost_direct_marketing_sagemaker.ipynb](./F1-xgboost_direct_marketing_sagemaker.ipynb).

> If you haven't run [F1-xgboost_direct_marketing_sagemaker.ipynb](./F1-xgboost_direct_marketing_sagemaker.ipynb), please open it and execute the code cells for data downloading.

Now lets read this into a Pandas data frame and take a look.

In [None]:
# cell 
data_file = "./bank-additional/bank-additional-full.csv"
data = pd.read_csv(data_file)
pd.set_option('display.max_columns', 500) # Make sure we can see all of the columns
pd.set_option('display.max_rows', 20) # Keep the output on one page
data

## Preparation


Let's start by specifying:

- The S3 bucket and prefix that you want to use for training and model data. This should be within the same region as the Notebook Instance, training, and hosting.
- The IAM role arn used to give training and hosting access to your data. See the documentation for how to create these. Note, if more than one role is required for notebook instances, training, and/or hosting, please replace the boto regexp with a the appropriate full IAM role arn string(s).

In [None]:
# processing input / output path
processing_input_path = f's3://{bucket}/{lab_pipelines_prefix}/processing/input'
processing_output_path = f's3://{bucket}/{lab_pipelines_prefix}/processing/output'

# upload data files 
sagemaker.s3.S3Uploader.upload(data_file, processing_input_path, sagemaker_session = sagemaker_session)
input_data = f'{processing_input_path}/bank-additional-full.csv'

### Define Parameters to Parametrize Pipeline Execution

Define Pipeline parameters that you can use to parametrize the pipeline. Parameters enable custom pipeline executions and schedules without having to modify the Pipeline definition.

The supported parameter types include:

* `ParameterString` - represents a `str` Python type
* `ParameterInteger` - represents an `int` Python type
* `ParameterFloat` - represents a `float` Python type

These parameters support providing a default value, which can be overridden on pipeline execution. The default value specified should be an instance of the type of the parameter.

The parameters defined in this workflow include:

* `processing_instance_type` - The `ml.*` instance type of the processing job.
* `processing_instance_count` - The instance count of the processing job.
* `training_instance_type` - The `ml.*` instance type of the training job.
* `model_approval_status` - What approval status to register the trained model with for CI/CD purposes ( "PendingManualApproval" is the default).
* `input_data` - The S3 bucket URI location of the input bank customer data

![Define Parameters](https://raw.githubusercontent.com/aws/amazon-sagemaker-examples/a78c7255cbe7892408d8ea2b15a7a2117703befc/sagemaker-pipelines/tabular/abalone_build_train_deploy/img/pipeline-1.png)

In [None]:
# parameters for pipeline execution
 
input_data = ParameterString(
 name="InputDataUrl",
 default_value=input_data,
)

processing_instance_count = ParameterInteger(
 name="ProcessingInstanceCount", default_value=2
)
processing_instance_type = ParameterString(
 name="ProcessingInstanceType", default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
 name="TrainingInstanceType", default_value="ml.m5.xlarge"
)
model_approval_status = ParameterString(
 name="ModelApprovalStatus", default_value="PendingManualApproval"
)
model_output = ParameterString(
 name="ModelOutputUrl",
 default_value=f"s3://{bucket}/{lab_pipelines_prefix}/model",
)


In [None]:
# Create cache configuration (Unable to pass parameter for expire_after value)
cache_config = CacheConfig(enable_caching=True, expire_after="PT1H")

### Define a Processing Step for Feature Engineering

First, develop a preprocessing script that is specified in the Processing step.

The Processing step executes the script on the input data. The Training step uses the preprocessed training features and labels to train a model. The Evaluation step uses the trained model and preprocessed test features and labels to evaluate the model.

![Define a Processing Step for Feature Engineering](https://raw.githubusercontent.com/aws/amazon-sagemaker-examples/a78c7255cbe7892408d8ea2b15a7a2117703befc/sagemaker-pipelines/tabular/abalone_build_train_deploy/img/pipeline-2.png)

Next, create an instance of an `SKLearnProcessor` processor and use that in our `ProcessingStep`.

You also specify the `framework_version` to use throughout this notebook.

Note the `processing_instance_type` and `processing_instance_count` parameters used by the processor instance.

In [None]:
# processing step for feature engineering
sklearn_processor = SKLearnProcessor(
 framework_version="1.0-1",
 instance_type=processing_instance_type,
 instance_count=processing_instance_count,
 base_job_name=f"{base_job_prefix}-sklearn-preprocess",
 sagemaker_session=pipeline_session,
 role=role,
)

Finally, use the processor instance to construct a `ProcessingStep`, along with the input and output channels, and the code that will be executed when the pipeline invokes pipeline execution. This is similar to a processor instance's `run` method in the Python SDK.

Note the `input_data` parameters passed into `ProcessingStep` is the input data used in the step. This input data is used by the processor instance when it is run.

Also, note the `"train"`, `"validation"` and `"test"` named channels specified in the output configuration for the processing job. Step `Properties` can be used in subsequent steps and resolve to their runtime values at execution. Specifically, this usage is called out when you define the training step.

In [None]:
%%writefile ./preprocessing.py

import argparse
import os

import pandas as pd
import numpy as np

PROCESSING_INPUT_DIR = "/opt/ml/processing/input"
PROCESSING_OUTPUT_DIR = "/opt/ml/processing/output"

def preprocess(args): 
 
 input_data_path = os.path.join(args.processing_input_dir, "bank-additional-full.csv")
 
 print(f"Reading input data from {input_data_path}")
 data = pd.read_csv(input_data_path)
 
 # Indicator variable to capture when pdays takes a value of 999
 data['no_previous_contact'] = np.where(data['pdays'] == 999, 1, 0) 

 # Indicator for individuals not actively employed
 data['not_working'] = np.where(np.in1d(data['job'], ['student', 'retired', 'unemployed']), 1, 0) 

 # remove unnecessary data
 data = data.drop(
 ['duration', 
 'emp.var.rate', 
 'cons.price.idx', 
 'cons.conf.idx', 
 'euribor3m', 
 'nr.employed'
 ], 
 axis=1)

 # Convert categorical variables to sets of indicators
 model_data = pd.get_dummies(data) 

 # Replace "y_no" and "y_yes" with a single label column, and bring it to the front:
 model_data = pd.concat([model_data['y_yes'], model_data.drop(['y_no', 'y_yes'], axis=1)], axis=1)
 
 # Randomly sort the data then split 
 validation_test_split_ratio = args.train_test_split_ratio + (1 - args.train_test_split_ratio) * 0.8
 train_data, validation_data, test_data = np.split(
 model_data.sample(frac=1, random_state=1729), 
 [int(args.train_test_split_ratio * len(model_data)), 
 int(validation_test_split_ratio * len(model_data))]) 
 print(f"total dataset length:{len(model_data)}")
 print(f"train data length:{len(train_data)}")
 print(f"validation data length:{len(validation_data)}")
 print(f"test data length:{len(test_data)}")
 
 # output to local folder
 train_data_output_path = os.path.join(args.processing_output_dir, "train/train.csv")
 train_data.to_csv(train_data_output_path, header=True, index=False)

 validation_data_output_path = os.path.join(args.processing_output_dir, "validation/validation.csv")
 validation_data.to_csv(validation_data_output_path, header=True, index=False)
 
 test_data_output_path = os.path.join(args.processing_output_dir, "test/test.csv")
 test_data.to_csv(test_data_output_path, header=True, index=False)

 # For batch transform
 bt_data_output_path = os.path.join(args.processing_output_dir, "batchtransform/data.csv")
 test_data.to_csv(bt_data_output_path, header=False, index=False)

 
if __name__ == "__main__":
 parser = argparse.ArgumentParser()
 parser.add_argument("--train-test-split-ratio", type=float, default=0.7)
 parser.add_argument("--processing-input-dir", type=str, default=PROCESSING_INPUT_DIR)
 parser.add_argument("--processing-output-dir", type=str, default=PROCESSING_OUTPUT_DIR)
 args, _ = parser.parse_known_args()
 
 print(f"Received arguments: {args}")

 preprocess(args)
 

In [None]:
process_script = './preprocessing.py'

In [None]:
step_process = ProcessingStep(
 name="PreprocessData",
 processor=sklearn_processor,
 inputs=[
 ProcessingInput(
 source=input_data,
 destination="/opt/ml/processing/input",
 s3_data_distribution_type="FullyReplicated",
 )
 ],
 outputs=[
 ProcessingOutput(output_name="train", source="/opt/ml/processing/output/train"),
 ProcessingOutput(output_name="validation", source="/opt/ml/processing/output/validation"),
 ProcessingOutput(output_name="test", source="/opt/ml/processing/output/test"),
 ProcessingOutput(output_name="batchtransform", source="/opt/ml/processing/output/batchtransform"),
 ],
 code=process_script,
 cache_config=cache_config,
)

### Define a Training Step to Train a Model

In this section, use Scikit-Learn Random Forest to train on this dataset. Configure an Estimator for the Random Forest algorithm and the input dataset. A typical training script loads data from the input channels, configures training with hyperparameters, trains a model, and saves a model to `model_output` so that it can be hosted later. 

The model path where the models from training will be saved is also specified.

Note the `training_instance_type` parameter may be used in multiple places in the pipeline. In this case, the `training_instance_type` is passed into the estimator.

![Define a Training Step to Train a Model](https://raw.githubusercontent.com/aws/amazon-sagemaker-examples/a78c7255cbe7892408d8ea2b15a7a2117703befc/sagemaker-pipelines/tabular/abalone_build_train_deploy/img/pipeline-3.png)

In [None]:
%%writefile ./train.py
# Python Built-Ins:
import argparse
import os

# External Dependencies:
#Joblib is a set of tools to provide lightweight pipelining in Python.
# NumPy is a library for the Python programming language, adding support for large, multi-dimensional arrays and matrices, 
# along with a large collection of high-level mathematical functions to operate on these arrays.

# pandas is a software library written for the Python programming language for data manipulation and analysis. 
#In particular, it offers data structures and operations for manipulating numerical tables and time series.

import joblib
import numpy as np
import pandas as pd
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score,recall_score, f1_score

def train(args):

 print("loading training data")
 train_data_path = os.path.join(args.train_dir, "train.csv")
 train_data = pd.read_csv(train_data_path, index_col=None)
 X_train, y_train = train_data.iloc[:, 1:], train_data.iloc[:, 0]
 
 validation_data_path = os.path.join(args.validation_dir, "validation.csv")
 validation_data = pd.read_csv(validation_data_path, index_col=None)
 X_val, y_val = validation_data.iloc[:, 1:], validation_data.iloc[:, 0]

 print("model training")
 
 hyperparameters = {
 "bootstrap": [True],
 "max_depth": [12, 13],
 "max_features": [13, 14],
 "n_estimators": [100, 150]
 }

 model = RandomForestClassifier()

 grid_search = GridSearchCV(model, hyperparameters, cv=5, n_jobs=-1)
 grid_search.fit(X_train, y_train)
 best_model = grid_search.best_estimator_
 print(best_model)
 
 
 # Evaluation on Validation data.
 y_pred = grid_search.predict(X_val)
 accuracy = accuracy_score(y_val, y_pred)
 print(f"accuracy: {accuracy}")
 precision = precision_score(y_val, y_pred)
 print(f"precision: {precision}")
 recall = recall_score(y_val, y_pred)
 print(f"recall: {recall}")
 f1 = f1_score(y_val, y_pred)
 print(f"f1: {f1}")
 
 
 # Saving model
 path = os.path.join(args.model_dir, "model.joblib")
 print("Saving model to {}".format(path))
 joblib.dump(best_model, path)
 
if __name__ == '__main__':

 #------------------------------- parsing input parameters (from command line)
 print('extracting arguments')
 parser = argparse.ArgumentParser()

 # RandomForest hyperparameters
 parser.add_argument('--n-estimators', type=int, nargs="+", default=[100, 150])
 parser.add_argument('--max-depth', type=int, nargs="+", default=[12, 13])
 parser.add_argument('--max-features', type=int, nargs="+", default=[13, 14])
 parser.add_argument('--bootstrap', type=bool, nargs="+", default=[True])

 # Data, model, and output directories
 parser.add_argument('--model-dir', type=str, default=os.environ.get('SM_MODEL_DIR'))
 parser.add_argument('--train-dir', type=str, default=os.environ.get('SM_CHANNEL_TRAIN'))
 parser.add_argument('--validation-dir', type=str, default=os.environ.get('SM_CHANNEL_VALIDATION'))

 args, _ = parser.parse_known_args()
 
 print(f"Received arguments: {args}") 
 
 train(args)

In [None]:
# training step for generating model artifacts

from sagemaker.sklearn.estimator import SKLearn

sklearn_estimator = SKLearn(
 entry_point='./train.py',
 role=get_execution_role(),
 instance_count=1,
 instance_type='ml.m5.xlarge',
 framework_version="1.0-1",
 base_job_name='randomforest-scikit',
 metric_definitions=[
 { 'Name': 'accuracy', 'Regex': 'accuracy: ([0-9\\.]+)' },
 { 'Name': 'precision', 'Regex': 'precision: ([0-9\\.]+)' },
 { 'Name': 'recall', 'Regex': 'recall: ([0-9\\.]+)' },
 { 'Name': 'f1', 'Regex': 'f1: ([0-9\\.]+)' },
 ],
 hyperparameters={
 'n_estimators': [100, 200]
 },
 max_run=20*60, # Maximum allowed active runtime (in seconds)
 # spot instances
 # use_spot_instances=True, # Use spot instances to reduce cost
 # max_wait=30*60, # Maximum clock time (including spot delays)
 disable_profiler=False,
 sagemaker_session=pipeline_session
)

Finally, use the estimator instance to construct a `TrainingStep` as well as the `properties` of the prior `ProcessingStep` used as input in the `TrainingStep` inputs and the code that's executed when the pipeline invokes the pipeline execution. This is similar to an estimator's `fit` method in the Python SDK.

Pass in the `S3Uri` of the `"train"` output channel to the `TrainingStep`. Also, use the other `"validation"` output channel for model evaluation in the pipeline. The `properties` attribute of a Pipeline step matches the object model of the corresponding response of a describe call. These properties can be referenced as placeholder values and are resolved at runtime. For example, the `ProcessingStep` `properties` attribute matches the object model of the [DescribeProcessingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeProcessingJob.html) response object.

In [None]:
step_train = TrainingStep(
 name="TrainModel",
 estimator=sklearn_estimator,
 inputs={
 "train": TrainingInput(
 s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
 content_type="text/csv",
 ),
 "validation": TrainingInput(
 s3_data=step_process.properties.ProcessingOutputConfig.Outputs["validation"].S3Output.S3Uri,
 content_type="text/csv",
 ),
 },
 cache_config=cache_config,
)

### Define a Model Evaluation Step to Evaluate the Trained Model

First, develop an evaluation script that is specified in a Processing step that performs the model evaluation.

After pipeline execution, you can examine the resulting `evaluation.json` for analysis.

The evaluation script uses `SKLearn Container` to do the following:

* Load the model.
* Read the test data.
* Issue predictions against the test data.
* Save the evaluation report to the evaluation directory.

![Define a Model Evaluation Step to Evaluate the Trained Model](https://raw.githubusercontent.com/aws/amazon-sagemaker-examples/a78c7255cbe7892408d8ea2b15a7a2117703befc/sagemaker-pipelines/tabular/abalone_build_train_deploy/img/pipeline-4.png)

Next, create an instance of a `SKLearnProcessor` processor and use it in the `ProcessingStep`.

Note the `processing_instance_type` parameter passed into the processor.

In [None]:
# processing step for evaluation
sklearn_postprocessor = SKLearnProcessor(
 framework_version="1.0-1", 
 role=role, 
 instance_type="ml.m5.xlarge", 
 instance_count=1,
 base_job_name=f"{base_job_prefix}/postprocess-eval",
 sagemaker_session=pipeline_session,
)

Use the processor instance to construct a `ProcessingStep`, along with the input and output channels and the code that will be executed when the pipeline invokes pipeline execution. This is similar to a processor instance's `run` method in the Python SDK.

Specifically, the `S3ModelArtifacts` from the `step_train` `properties` and the `S3Uri` of the `"test"` output channel of the `step_process` `properties` are passed into the inputs. The `TrainingStep` and `ProcessingStep` `properties` attribute matches the object model of the [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html) and [DescribeProcessingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeProcessingJob.html) response objects, respectively.

In [None]:
%%writefile ./evaluation.py

import os
import json
import logging
from pathlib import Path
import tarfile
import argparse
import joblib

import numpy as np
import pandas as pd

from sklearn.metrics import accuracy_score, precision_score,recall_score, f1_score, classification_report


logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())

PROCESSING_INPUT_DIR = '/opt/ml/processing/test'
PROCESSING_MODEL_DIR = '/opt/ml/processing/model'
PROCESSING_EVALUATION_DIR = '/opt/ml/processing/evaluation'

def postprocess(args):
 model_path = Path(args.processing_model_dir) / 'model.tar.gz'
 if model_path.exists():
 logger.info('Extracting model from path: {}'.format(model_path))
 with tarfile.open(model_path) as tar:
 tar.extractall(path=args.processing_model_dir)

 for file in os.listdir(args.processing_model_dir):
 logger.info(file)
 
 logger.debug("Loading the model.")
 model = joblib.load(os.path.join(args.processing_model_dir, "model.joblib"))

 logger.debug("Reading test data.")
 test_data_path = os.path.join(args.processing_input_dir, 'test.csv')
 
 # index_col is from the old version of pandas read_csv() api.
 test_data = pd.read_csv(test_data_path, index_col=None)
 print(test_data.head())
 
 logger.debug("Reading test data.")
 y_test = test_data.iloc[:, 0].values
 test_data.drop(test_data.columns[0], axis=1, inplace=True)
 X_test = test_data.values
 
 logger.info("Performing predictions against test data.")
 y_pred = model.predict(X_test) 
 
 logger.info(f"predictions: {y_pred}")

 print('Creating classification evaluation report')
 accuracy = accuracy_score(y_test, y_pred)
 print(f"accuracy: {accuracy}")
 precision = precision_score(y_test, y_pred)
 print(f"precision: {precision}")
 recall = recall_score(y_test, y_pred)
 print(f"recall: {recall}")
 f1 = f1_score(y_test, y_pred)
 print(f"f1: {f1}")
 
 report_dict = {
 'metrics': {
 'accuracy': accuracy,
 'precision': precision,
 'recall': recall,
 'f1': f1
 },
 }
 
 evaluation_output_path = os.path.join(args.processing_evaluation_dir, 'evaluation.json')
 print('Saving classification report to {}'.format(evaluation_output_path))

 with open(evaluation_output_path, 'w') as f:
 f.write(json.dumps(report_dict))
 
if __name__ == "__main__":
 parser = argparse.ArgumentParser()
 parser.add_argument("--processing-input-dir", type=str, default=PROCESSING_INPUT_DIR)
 parser.add_argument("--processing-model-dir", type=str, default=PROCESSING_MODEL_DIR)
 parser.add_argument("--processing-evaluation-dir", type=str, default=PROCESSING_EVALUATION_DIR)
 args, _ = parser.parse_known_args()
 
 print(f"Received arguments: {args}")

 postprocess(args)

In [None]:
postprocess_script = './evaluation.py'

evaluation_report = PropertyFile(
 name="EvaluationReport",
 output_name="evaluation",
 path="evaluation.json",
)
step_eval = ProcessingStep(
 name="EvaluateModel",
 processor=sklearn_postprocessor,
 inputs=[
 ProcessingInput(
 source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
 destination="/opt/ml/processing/model",
 ),
 ProcessingInput(
 source=step_process.properties.ProcessingOutputConfig.Outputs[
 "test"
 ].S3Output.S3Uri,
 destination="/opt/ml/processing/test",
 ),
 ],
 outputs=[
 ProcessingOutput(
 output_name="evaluation", source="/opt/ml/processing/evaluation"
 ),
 ],
 code=postprocess_script,
 property_files=[evaluation_report],
 cache_config=cache_config,

)

### Define a Create Model Step to Create a Model

In order to perform batch transformation using the example model, create a SageMaker model. 

Specifically, pass in the `S3ModelArtifacts` from the `TrainingStep`, `step_train` properties. The `TrainingStep` `properties` attribute matches the object model of the [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html) response object.

![Define a Create Model Step and Batch Transform to Process Data in Batch at Scale](https://raw.githubusercontent.com/aws/amazon-sagemaker-examples/a78c7255cbe7892408d8ea2b15a7a2117703befc/sagemaker-pipelines/tabular/abalone_build_train_deploy/img/pipeline-5.png)

In [None]:
%%writefile ./inference.py
import os
import joblib

# inference functions ---------------
def model_fn(model_dir):
 clf = joblib.load(os.path.join(model_dir, "model.joblib"))
 return clf


In [None]:
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.steps import CreateModelStep
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.retry import (
 StepRetryPolicy,
 StepExceptionTypeEnum,
 SageMakerJobExceptionTypeEnum,
 SageMakerJobStepRetryPolicy,
)

from sagemaker.model import Model

from sagemaker.sklearn.model import SKLearnModel

model = SKLearnModel(
 model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
 framework_version="1.0-1",
 py_version='py3',
 role=get_execution_role(),
 entry_point='./inference.py',
 sagemaker_session=pipeline_session,
)

step_create_model = ModelStep(
 name="DirectMarketingCreateModel",
 step_args=model.create(instance_type="ml.m5.xlarge"),
 retry_policies=[
 StepRetryPolicy(exception_types=[StepExceptionTypeEnum.THROTTLING], max_attempts=3)
 ]
)

### Define a Transform Step to Perform Batch Transformation

Now that a model instance is defined, create a `Transformer` instance with the appropriate model type, compute instance type, and desired output S3 URI.

Specifically, pass in the `ModelName` from the `CreateModelStep`, `step_create_model` properties. The `CreateModelStep` `properties` attribute matches the object model of the [DescribeModel](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeModel.html) response object.

In [None]:
from sagemaker.transformer import Transformer

batch_output = f"s3://{bucket}/{lab_pipelines_prefix}/batchtransform"
transformer = Transformer(
 model_name=step_create_model.properties.ModelName,
 instance_type="ml.m5.xlarge",
 instance_count=1,
 output_path=batch_output,
 accept="text/csv",
 assemble_with="Line", 
 sagemaker_session=pipeline_session
)

Pass in the transformer instance and the `TransformInput` with the `batch_data` pipeline parameter defined earlier. More details of the `TransformInput` can be found [here](https://github.com/aws/sagemaker-python-sdk/blob/2594ffb3eaefaf55936b71ea0c38442135223602/src/sagemaker/inputs.py#L140). We use `input_filter` to filter out the first column, which is the label column, in the test data. We also set the batch job to join the input source with the batch job output. Please check the document about [associating inference with input record](https://docs.aws.amazon.com/sagemaker/latest/dg/batch-transform-data-processing.html) for more details

In [None]:
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep


step_transform = TransformStep(
 name="DirectMarketingTransform",
 transformer=transformer,
 inputs=TransformInput(
 data=step_process.properties.ProcessingOutputConfig.Outputs[
 "batchtransform"
 ].S3Output.S3Uri,
 content_type="text/csv", 
 split_type="Line",
 input_filter="$[1:]",
 join_source="Input",
 ),
 cache_config=cache_config,
)


### Define a Register Model Step to Create a Model Package

Use the estimator instance specified in the training step to construct an instance of `RegisterModel`. The result of executing `RegisterModel` in a pipeline is a model package. A model package is a reusable model artifacts abstraction that packages all ingredients required for inference. Primarily, it consists of an inference specification that defines the inference image to use along with an optional model weights location.

A model package group is a collection of model packages. A model package group can be created for a specific ML business problem, and new versions of the model packages can be added to it. Typically, customers are expected to create a ModelPackageGroup for a SageMaker pipeline so that model package versions can be added to the group for every SageMaker Pipeline run.

The construction of `RegisterModel` is similar to an estimator instance's `register` method in the Python SDK.

Specifically, pass in the `S3ModelArtifacts` from the `TrainingStep`, `step_train` properties. The `TrainingStep` `properties` attribute matches the object model of the [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html) response object.

Note that the specific model package group name provided in this notebook can be used in the model registry and CI/CD work with SageMaker Projects.

In [None]:
# register model step that will be conditionally executed
model_metrics = ModelMetrics(
 model_statistics=MetricsSource(
 s3_uri="{}/evaluation.json".format(
 step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"][
 "S3Uri"
 ]
 ),
 content_type="application/json",
 )
)
model_package_group_name = "DirectMarketingRandomForestModelGroup"

register_arg = model.register(
 content_types=["text/csv"],
 response_types=["text/csv"],
 inference_instances=["ml.t2.medium", "ml.m5.large"],
 transform_instances=["ml.m5.xlarge"],
 model_package_group_name=model_package_group_name,
 approval_status=model_approval_status,
 model_metrics=model_metrics,
)
step_register = ModelStep(
 name="RegisterModel",
 step_args=register_arg,
)


### Define a Condition Step to Check Accuracy and Conditionally Create a Model and Run a Batch Transformation and Register a Model in the Model Registry

In this step, the model is registered only if the accuracy of the model, as determined by the evaluation step `step_eval`, exceeded a specified value. A `ConditionStep` enables pipelines to support conditional execution in the pipeline DAG based on the conditions of the step properties. 

In the following section, you:

* Define a `ConditionLessThanOrEqualTo` on the accuracy value found in the output of the evaluation step, `step_eval`.
* Use the condition in the list of conditions in a `ConditionStep`.
* Pass the `CreateModelStep` and `TransformStep` steps, and the `RegisterModel` step collection into the `if_steps` of the `ConditionStep`, which are only executed, if the condition evaluates to `True`.

![Define a Condition Step to Check Accuracy and Conditionally Execute Steps](https://raw.githubusercontent.com/aws/amazon-sagemaker-examples/a78c7255cbe7892408d8ea2b15a7a2117703befc/sagemaker-pipelines/tabular/abalone_build_train_deploy/img/pipeline-6.png)

In [None]:
# condition step for evaluating model quality and branching execution
cond_lte = ConditionGreaterThanOrEqualTo(
 left=JsonGet(
 step_name=step_eval.name,
 property_file=evaluation_report,
 json_path="metrics.accuracy",
 ),
 right=0.55,
)

# step_transform is ignored as the testing data is not ready for batch transform
step_cond = ConditionStep(
 name="CheckEvaluation",
 conditions=[cond_lte],
 if_steps=[step_register, step_create_model, step_transform],
 else_steps=[],
)

### Define a Pipeline of Parameters, Steps, and Conditions

In this section, combine the steps into a Pipeline so it can be executed.

A pipeline requires a `name`, `parameters`, and `steps`. Names must be unique within an `(account, region)` pair.

Note:

* All of the parameters used in the definitions must be present.
* Steps passed into the pipeline do not have to be listed in the order of execution. The SageMaker Pipeline service resolves the _data dependency_ DAG as steps for the execution to complete.
* Steps must be unique to across the pipeline step list and all condition step if/else lists.

![Define a Pipeline of Parameters, Steps, and Conditions](https://raw.githubusercontent.com/aws/amazon-sagemaker-examples/a78c7255cbe7892408d8ea2b15a7a2117703befc/sagemaker-pipelines/tabular/abalone_build_train_deploy/img/pipeline-7.png)

In [None]:
# pipeline instance
pipeline = Pipeline(
 name="DirectMarketing-ModelBuildPipeline",
 parameters=[
 input_data,
 processing_instance_type,
 processing_instance_count,
 training_instance_type,
 model_approval_status,
 model_output,
 ],
 steps=[step_process, step_train, step_eval, step_cond],
 sagemaker_session=pipeline_session,
)

#### (Optional) Examining the pipeline definition

The JSON of the pipeline definition can be examined to confirm the pipeline is well-defined and the parameters and step properties resolve correctly.

In [None]:
import json

definition = json.loads(pipeline.definition())


### Submit the pipeline to SageMaker and start execution

Submit the pipeline definition to the Pipeline service. The role passed in will be used by the Pipeline service to create all the jobs defined in the steps.

In [None]:
pipeline.upsert(role_arn=role)

### Pipeline Operations: Examining and Waiting for Pipeline Execution

Describe the pipeline execution.

In [None]:
execution = pipeline.start()

In [None]:
execution.describe()

In [None]:
# Wait for the execution to complete.
execution.wait()

### Examining the Evalution

Examine the resulting model evaluation after the pipeline completes. Download the resulting `evaluation.json` file from S3 and print the report.

In [None]:
from pprint import pprint

evaluation_json = sagemaker.s3.S3Downloader.read_file("{}/evaluation.json".format(
 step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
))
pprint(json.loads(evaluation_json))

### Lineage

Review the lineage of the artifacts generated by the pipeline.

In [None]:
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer


viz = LineageTableVisualizer(sagemaker.session.Session())
for execution_step in reversed(execution.list_steps()):
 print(execution_step)
 display(viz.show(pipeline_execution_step=execution_step))
 time.sleep(5)

### Parametrized Executions

You can run additional executions of the pipeline and specify different pipeline parameters. The parameters argument is a dictionary containing parameter names, and where the values are used to override the defaults values.

Based on the performance of the model, you might want to kick off another pipeline execution on a compute-optimized instance type and set the model approval status to "Approved" automatically. This means that the model package version generated by the `RegisterModel` step is automatically ready for deployment through CI/CD pipelines, such as with SageMaker Projects.

In [None]:
execution = pipeline.start(
 parameters=dict(
 ProcessingInstanceType="ml.c5.xlarge",
 ModelApprovalStatus="Approved",
 )
)

In [None]:
execution.wait()

In [None]:
execution.list_steps()