# SageMaker Pipelines EMR Step With Cluster Lifecycle Management


---

This notebook's CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook. 

![This us-west-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/us-west-2/sagemaker-pipelines|tabular|emr-step|sagemaker-pipelines-emr-step-with-cluster-lifecycle-management.ipynb)

---


This notebook illustrates how an [EMR step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-emr) can be run from a SageMaker Pipeline. This example requires a cluster config. The EMR step uses the cluster config to create an EMR cluster, performs the required job and finally closes the cluster.

The steps in this pipeline include:
* Preprocess the Abalone dataset with PySpark on EMR
* Train an XGBoost Model
* Evaluate the model performance
* Create a model


A step to register the model into a Model Registry can be added to the pipeline using the `RegisterModel` step.

## Contents

1. [Prerequisites](#Prerequisites)
1. [Configuration Setup](#Configuration-Setup)
1. [Parameters](#Parameters)
1. [Data Preparation](#Data-Preparation)
1. [Model Training and Evaluation](#Model-Training-and-Evaluation)
1. [Model Registry](#Model-registry)
1. [Execute the Pipeline](#Execute-the-Pipeline)
1. [Cleanup](#Cleanup)


## Prerequisites

To run this notebook you will need:
* EMR roles
* IAM policies which enable the notebook to run a step on an Amazon EMR cluster 
 
#### EMR roles
You'll need:
* Service role for Amazon EMR (EMR role) - this is passed as the `ServiceRole` parameter
* Service role for cluster EC2 instances (EC2 instance profile) - this is passed as the `JobFlowRole` parameter 

See ['EMR IAM roles'](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-iam-roles.html) for more details.

#### IAM policy
The notebook execution role should have policies which enable the notebook to run a step on an Amazon EMR cluster. The Amazon managed policy `AmazonSageMakerPipelinesIntegrations` should be added to the notebook execution role.


## Setup 

### Setup Dependencies

In [None]:
%pip install --upgrade sagemaker

In [None]:
import os
import json

import boto3
import sagemaker
import sagemaker.session

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
from sagemaker.model_metrics import (
 MetricsSource,
 ModelMetrics,
)
from sagemaker.processing import (
 ProcessingInput,
 ProcessingOutput,
 ScriptProcessor,
)
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import (
 ConditionStep,
)
from sagemaker.workflow.functions import (
 JsonGet,
)
from sagemaker.workflow.parameters import (
 ParameterInteger,
 ParameterString,
)
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.steps import (
 ProcessingStep,
 TrainingStep,
)
from sagemaker.workflow.model_step import ModelStep
from sagemaker.model import Model

from sagemaker.workflow.emr_step import EMRStep, EMRStepConfig
from sagemaker.workflow.pipeline_context import PipelineSession

## Configuration Setup

Let's now configure the setup we need, which includes the session object from the SageMaker Python SDK, and necessary configurations for the pipelines, such as object types, input and output buckets and so on.

In [None]:
# Create the SageMaker Session

sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
region = sagemaker_session.boto_region_name
boto_session = boto3.Session(region_name=region)
sagemaker_client = sagemaker_session.sagemaker_client
default_bucket = sagemaker_session.default_bucket()


account = boto_session.client("sts").get_caller_identity()["Account"]


pipeline_session = PipelineSession(
 boto_session=boto_session,
 sagemaker_client=sagemaker_client,
 default_bucket=default_bucket,
)

## Parameters


In [None]:
model_package_group_name = "AbalonePackageGroup"
pipeline_name = "EMRStepPipeline"
base_job_prefix = "emr-step-pipeline"
processing_instance_type = "ml.m5.xlarge"
training_instance_type = "ml.m5.xlarge"
BASE_DIR = "code"

job_flow_role = f"arn:aws:iam::{account}:instance-profile/EMR_EC2_DefaultRole"
service_role = f"arn:aws:iam::{account}:role/EMR_DefaultRole_V2"

In [None]:
# Define variables and parameters needed for the Pipeline steps
# parameters for pipeline execution

processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
model_approval_status = ParameterString(
 name="ModelApprovalStatus", default_value="PendingManualApproval"
)
input_data = ParameterString(
 name="InputDataUrl",
 default_value=f"s3://sagemaker-example-files-prod-{region}/datasets/tabular/uci_abalone/abalone.csv",
)

output_path = f"s3://{default_bucket}/{base_job_prefix}/prep"

## Data Preparation

A PySpark job on EMR is used to prepare the for the training job. Using the script `preprocess.py`, the dataset is featurized and split into train, test, and validation datasets.

The output of this step is used as the input to the TrainingStep.

The dataset you use is the [UCI Machine Learning Abalone Dataset](https://archive.ics.uci.edu/ml/datasets/abalone) [1]. The aim for this task is to determine the age of an abalone snail from its physical measurements. At the core, this is a regression problem.

The dataset contains several features: length (the longest shell measurement), diameter (the diameter perpendicular to length), height (the height with meat in the shell), whole_weight (the weight of whole abalone), shucked_weight (the weight of meat), viscera_weight (the gut weight after bleeding), shell_weight (the weight after being dried), sex ('M', 'F', 'I' where 'I' is Infant), and rings (integer).

The number of rings turns out to be a good approximation for age (age is rings + 1.5). However, to obtain this number requires cutting the shell through the cone, staining the section, and counting the number of rings through a microscope, which is a time-consuming task. However, the other physical measurements are easier to determine. You use the dataset to build a predictive model of the variable rings through these other physical measurements.

Before you upload the data to an S3 bucket, install the SageMaker Python SDK and gather some constants you can use later in this notebook.

> [1] Dua, D. and Graff, C. (2019). [UCI Machine Learning Repository](http://archive.ics.uci.edu/ml). Irvine, CA: University of California, School of Information and Computer Science.

In [None]:
%mkdir code

In [None]:
%%writefile code/preprocess.py

from pyspark.ml.feature import StringIndexer
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.functions import vector_to_array
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.ml import Pipeline
from argparse import ArgumentParser


def process(args):
 print("Starting spark session")
 spark = SparkSession.builder.appName("preprocess").getOrCreate()
 spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

 schema = StructType(
 [
 StructField("sex", StringType(), True),
 StructField("length", DoubleType(), True),
 StructField("diameter", DoubleType(), True),
 StructField("height", DoubleType(), True),
 StructField("whole_weight", DoubleType(), True),
 StructField("shucked_weight", DoubleType(), True),
 StructField("viscera_weight", DoubleType(), True),
 StructField("shell_weight", DoubleType(), True),
 StructField("rings", DoubleType(), True),
 ]
 )

 print("Reading source data")
 df = spark.read.csv(args.input, schema=schema).fillna("missing", subset=["sex"])

 from pyspark.ml.feature import (
 Imputer,
 OneHotEncoder,
 StandardScaler,
 OneHotEncoder,
 VectorAssembler,
 )

 numerical_features = [
 "length",
 "diameter",
 "height",
 "whole_weight",
 "shucked_weight",
 "viscera_weight",
 "shell_weight",
 ]
 print("Performing feature engineering")
 pipeline = Pipeline(
 stages=[
 StringIndexer(inputCol="sex", outputCol="cat_sex"),
 OneHotEncoder(
 inputCols=["cat_sex"],
 outputCols=["feature_sex"],
 dropLast=False,
 ),
 Imputer(
 inputCols=numerical_features,
 outputCols=[f"impute_{c}" for c in numerical_features],
 strategy="median",
 ),
 VectorAssembler(
 inputCols=[f"impute_{c}" for c in numerical_features], outputCol="vector"
 ),
 StandardScaler(inputCol="vector", outputCol="features"),
 ]
 )
 print("Fitting transformers")
 model = pipeline.fit(df)
 print("Transforming source data")
 df_out = (
 model.transform(df)
 .select(
 "rings",
 vector_to_array(F.col("features")).alias("features"),
 vector_to_array(F.col("feature_sex")).alias("feature_sex"),
 )
 .select(
 [F.col("rings")]
 + [F.col("features")[idx] for idx in range(len(numerical_features))]
 + [F.col("feature_sex")[idx] for idx in range(3)]
 )
 )

 # shuffle
 # split train/test/valid
 # write out to csvs without headers or indices
 print("Writing train/valid/test spits")
 train, valid, test = df_out.orderBy(F.rand()).randomSplit([0.7, 0.15, 0.15])
 prefix = args.output
 train.repartition(1).write.mode("overwrite").csv(f"{prefix}/train")
 valid.repartition(1).write.mode("overwrite").csv(f"{prefix}/valid")
 test.repartition(1).write.mode("overwrite").csv(f"{prefix}/test")

 spark.stop()


if __name__ == "__main__":
 parser = ArgumentParser()
 parser.add_argument("--input")
 parser.add_argument("--output")
 args, _ = parser.parse_known_args()
 process(args)

In [None]:
script = sagemaker_session.upload_data("code/preprocess.py", key_prefix=f"{base_job_prefix}/app")

In [None]:
# Process the training data step using a PySpark script.
# Split the training data set into train, test, and validation datasets
# Run as a step as a job flow on EMR
emr_config = EMRStepConfig(
 jar="command-runner.jar",
 args=[
 "spark-submit",
 "--deploy-mode",
 "cluster",
 script,
 "--input",
 input_data,
 "--output",
 output_path,
 ],
)


step_emr = EMRStep(
 name="EMRStep",
 cluster_id=None,
 step_config=emr_config,
 display_name="Preprocess",
 description="preprocess data for XGBoost",
 cluster_config={
 "Applications": [
 {
 "Name": "Spark",
 }
 ],
 "Instances": {
 "InstanceGroups": [
 {"InstanceRole": "MASTER", "InstanceCount": 1, "InstanceType": "m5.2xlarge"},
 {"InstanceRole": "CORE", "InstanceCount": 2, "InstanceType": "m5.2xlarge"},
 ]
 },
 "BootstrapActions": [],
 "ReleaseLabel": "emr-6.6.0",
 "JobFlowRole": job_flow_role,
 "ServiceRole": service_role,
 },
)

## Model Training and Evaluation

We will now train an XGBoost model using the SageMaker Python SDK and the output of the EMR Step.

In [None]:
# training step for generating model artifacts
model_path = f"s3://{sagemaker_session.default_bucket()}/{base_job_prefix}/AbaloneTrain"
image_uri = sagemaker.image_uris.retrieve(
 framework="xgboost",
 region=region,
 version="1.0-1",
 py_version="py3",
 instance_type=training_instance_type,
)
xgb_train = Estimator(
 image_uri=image_uri,
 instance_type=training_instance_type,
 instance_count=1,
 output_path=model_path,
 base_job_name=f"{base_job_prefix}/abalone-train",
 sagemaker_session=pipeline_session,
 role=role,
)
xgb_train.set_hyperparameters(
 objective="reg:linear",
 num_round=50,
 max_depth=5,
 eta=0.2,
 gamma=4,
 min_child_weight=6,
 subsample=0.7,
 silent=0,
)
step_args = xgb_train.fit(
 inputs={
 "train": TrainingInput(
 s3_data=f"{output_path}/train",
 content_type="text/csv",
 ),
 "validation": TrainingInput(
 s3_data=f"{output_path}/valid",
 content_type="text/csv",
 ),
 },
)
step_train = TrainingStep(
 name="TrainAbaloneModel",
 step_args=step_args,
)

In [None]:
step_train.add_depends_on([step_emr])

#### Evaluating the model

Use a processing job to evaluate the model from the TrainingStep. If the output of the evaluation is True, a model is created.

In [None]:
%%writefile code/evaluate.py

"""Evaluation script for measuring mean squared error."""
import json
import logging
import pathlib
import pickle
import tarfile

import numpy as np
import pandas as pd
import xgboost
import glob

from sklearn.metrics import mean_squared_error

logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())


if __name__ == "__main__":
 logger.debug("Starting evaluation.")
 model_path = "/opt/ml/processing/model/model.tar.gz"
 with tarfile.open(model_path) as tar:
 tar.extractall(path=".")

 logger.debug("Loading xgboost model.")
 model = pickle.load(open("xgboost-model", "rb"))

 logger.debug("Reading test data.")

 test_path = "/opt/ml/processing/test/"
 test_file = glob.glob(f"{test_path}/*.csv")[0]
 df = pd.read_csv(test_file, header=None)

 logger.debug("Reading test data.")
 y_test = df.iloc[:, 0].to_numpy()
 df.drop(df.columns[0], axis=1, inplace=True)
 X_test = xgboost.DMatrix(df.values)

 logger.info("Performing predictions against test data.")
 predictions = model.predict(X_test)

 logger.debug("Calculating mean squared error.")
 mse = mean_squared_error(y_test, predictions)
 std = np.std(y_test - predictions)
 report_dict = {
 "regression_metrics": {
 "mse": {"value": mse, "standard_deviation": std},
 },
 }

 output_dir = "/opt/ml/processing/evaluation"
 pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

 logger.info("Writing out evaluation report with mse: %f", mse)
 evaluation_path = f"{output_dir}/evaluation.json"
 with open(evaluation_path, "w") as f:
 f.write(json.dumps(report_dict))

In [None]:
script_eval = ScriptProcessor(
 image_uri=image_uri,
 command=["python3"],
 instance_type=processing_instance_type,
 instance_count=1,
 base_job_name=f"{base_job_prefix}/script-abalone-eval",
 sagemaker_session=pipeline_session,
 role=role,
)
step_args = script_eval.run(
 inputs=[
 ProcessingInput(
 source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
 destination="/opt/ml/processing/model",
 ),
 ProcessingInput(
 source=f"{output_path}/test",
 destination="/opt/ml/processing/test",
 ),
 ],
 outputs=[
 ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
 ],
 code=os.path.join(BASE_DIR, "evaluate.py"),
)
evaluation_report = PropertyFile(
 name="AbaloneEvaluationReport",
 output_name="evaluation",
 path="evaluation.json",
)
step_eval = ProcessingStep(
 name="EvaluateAbaloneModel",
 step_args=step_args,
 property_files=[evaluation_report],
)

In [None]:
step_eval.add_depends_on([step_emr])

## Model registry

In [None]:
# register model step that will be conditionally executed
model_metrics = ModelMetrics(
 model_statistics=MetricsSource(
 s3_uri="{}/evaluation.json".format(
 step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
 ),
 content_type="application/json",
 )
)
model = Model(
 image_uri=image_uri,
 model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
 sagemaker_session=pipeline_session,
 role=role,
)
step_args = model.register(
 content_types=["text/csv"],
 response_types=["text/csv"],
 inference_instances=["ml.t2.medium", "ml.m5.large"],
 transform_instances=["ml.m5.large"],
 model_package_group_name=model_package_group_name,
 approval_status=model_approval_status,
 model_metrics=model_metrics,
)
step_register = ModelStep(
 name="RegisterAbaloneModel",
 step_args=step_args,
)

# condition step for evaluating model quality and branching execution
cond_lte = ConditionLessThanOrEqualTo(
 left=JsonGet(
 step_name=step_eval.name,
 property_file=evaluation_report,
 json_path="regression_metrics.mse.value",
 ),
 right=6.0,
)
step_cond = ConditionStep(
 name="CheckMSEAbaloneEvaluation",
 conditions=[cond_lte],
 if_steps=[step_register],
 else_steps=[],
)

In [None]:
# Use the same pipeline name across executions for cache usage.

pipeline = Pipeline(
 name=pipeline_name,
 parameters=[
 processing_instance_type,
 processing_instance_count,
 training_instance_type,
 model_approval_status,
 input_data,
 ],
 steps=[step_emr, step_train, step_eval, step_cond],
 sagemaker_session=pipeline_session,
)

## Execute the pipeline

In [None]:
definition = json.loads(pipeline.definition())
definition

In [None]:
pipeline.upsert(role_arn=role)

In [None]:
execution = pipeline.start()

In [None]:
execution.wait()

## Cleanup
Running the following cell will delete the following resources created in this notebook
* SageMaker Pipeline


In [None]:
# Delete the Pipeline
sagemaker_client.delete_pipeline(PipelineName=pipeline_name)

## Notebook CI Test Results

This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.

![This us-east-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/us-east-1/sagemaker-pipelines|tabular|emr-step|sagemaker-pipelines-emr-step-with-cluster-lifecycle-management.ipynb)

![This us-east-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/us-east-2/sagemaker-pipelines|tabular|emr-step|sagemaker-pipelines-emr-step-with-cluster-lifecycle-management.ipynb)

![This us-west-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/us-west-1/sagemaker-pipelines|tabular|emr-step|sagemaker-pipelines-emr-step-with-cluster-lifecycle-management.ipynb)

![This ca-central-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ca-central-1/sagemaker-pipelines|tabular|emr-step|sagemaker-pipelines-emr-step-with-cluster-lifecycle-management.ipynb)

![This sa-east-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/sa-east-1/sagemaker-pipelines|tabular|emr-step|sagemaker-pipelines-emr-step-with-cluster-lifecycle-management.ipynb)

![This eu-west-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-west-1/sagemaker-pipelines|tabular|emr-step|sagemaker-pipelines-emr-step-with-cluster-lifecycle-management.ipynb)

![This eu-west-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-west-2/sagemaker-pipelines|tabular|emr-step|sagemaker-pipelines-emr-step-with-cluster-lifecycle-management.ipynb)

![This eu-west-3 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-west-3/sagemaker-pipelines|tabular|emr-step|sagemaker-pipelines-emr-step-with-cluster-lifecycle-management.ipynb)

![This eu-central-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-central-1/sagemaker-pipelines|tabular|emr-step|sagemaker-pipelines-emr-step-with-cluster-lifecycle-management.ipynb)

![This eu-north-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-north-1/sagemaker-pipelines|tabular|emr-step|sagemaker-pipelines-emr-step-with-cluster-lifecycle-management.ipynb)

![This ap-southeast-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-southeast-1/sagemaker-pipelines|tabular|emr-step|sagemaker-pipelines-emr-step-with-cluster-lifecycle-management.ipynb)

![This ap-southeast-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-southeast-2/sagemaker-pipelines|tabular|emr-step|sagemaker-pipelines-emr-step-with-cluster-lifecycle-management.ipynb)

![This ap-northeast-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-northeast-1/sagemaker-pipelines|tabular|emr-step|sagemaker-pipelines-emr-step-with-cluster-lifecycle-management.ipynb)

![This ap-northeast-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-northeast-2/sagemaker-pipelines|tabular|emr-step|sagemaker-pipelines-emr-step-with-cluster-lifecycle-management.ipynb)

![This ap-south-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-south-1/sagemaker-pipelines|tabular|emr-step|sagemaker-pipelines-emr-step-with-cluster-lifecycle-management.ipynb)
