# Build a SageMaker Pipeline

---

## Contents

1. [Introduction](#Introduction)
1. [Setup](#Setup)
1. [Source the libraries](#Source-the-libraries)
1. [Task 1: Set Up Global Variables](#Task-1:-Set-Up-Global-Variables)
1. [Task 2: Define the Step Function Execution Inputs](#Task-2:-Define-the-Step-Function-Execution-Inputs)
1. [Task 3: Define the Processing Step](#Task-3:-Define-the-Processing-Step)
1. [Task 4: Define the Training Step](#Task-4:-Define-the-Training-Step)
1. [Task 5: Define the Endpoint Steps](#Task-5:-Define-the-Endpoint-Steps)
1. [Task 6: Define the Step Function Workflow](#Task-6:-Define-the-Step-Function-Workflow)
1. [Task 7: Start and Monitor the Step Function Workflow](#Task-7:-Start-and-Monitor-the-Step-Function-Workflow)


---

## Introduction

This notebook demonstrates the use of Amazon SageMaker XGBoost to process data, train a model and host an endpoint. We will be orchestrating the steps using [AWS Step Functions](https://aws.amazon.com/step-functions/?step-functions.sort-by=item.additionalFields.postDateTime&step-functions.sort-order=desc). 

We use the [Predictive Maintenance Dataset](https://static.us-east-1.prod.workshops.aws/public/6f2f7cb1-bfda-4b34-ae39-928502784393/static/datasets/maintenance_dataset.csv), originally from the [UCI data repository](http://archive.ics.uci.edu/ml). More details about the original dataset can be found [here](https://archive.ics.uci.edu/ml/datasets/AI4I+2020+Predictive+Maintenance+Dataset).

---


## Setup

### Add a policy to your SageMaker role in IAM

**If you are running this notebook on an Amazon SageMaker notebook instance**, the IAM role assumed by your notebook instance needs permission to create and run workflows in AWS Step Functions. To provide this permission to the role, do the following.

1. Open the Amazon [SageMaker console](https://console.aws.amazon.com/sagemaker/). 
2. Select **Notebook instances** and choose the name of your notebook instance
3. Under **Permissions and encryption** select the role ARN to view the role on the IAM console
4. Choose **Attach policies** and search for `AWSStepFunctionsFullAccess`.
5. Select the check box next to `AWSStepFunctionsFullAccess` and choose **Attach policy**

If you are running this notebook in a local environment, the SDK will use your configured AWS CLI configuration. For more information, see [Configuring the AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-configure.html).

Next, create an execution role in IAM for Step Functions. 

### Create an execution role for Step Functions

You need an execution role so that you can create and execute workflows in Step Functions.

1. Go to the [IAM console](https://console.aws.amazon.com/iam/)
2. Select **Roles** and then **Create role**.
3. Under **Choose the service that will use this role** select **Step Functions**
4. Choose **Next** until you can enter a **Role name**
5. Enter a name such as `AmazonSageMaker-StepFunctionsWorkflowExecutionRole` and then select **Create role**


Attach a policy to the role you created. The following steps attach a policy that provides full access to Step Functions, however as a good practice you should only provide access to the resources you need.  

1. Under the **Add Permissions** tab, click **Attach policy**. Select the following AWS Managed Policies:
    - `AmazonSageMakerFullAccess`
    - `CloudWatchFullAccess`
    - `CloudWatchEventsFullAccess`

2. Click **Attach Policy** 
3. Copy and save the **Role ARN** at the top of the **Summary**. We will use it in [Task 1](#Task-1:-Set-Up-Global-Variables)
---

## Sourcing the libraries

In [None]:
# cell 1

import sys
!{sys.executable} -m pip install stepfunctions

In [None]:
# cell 2

import sagemaker
from sagemaker import get_execution_role
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.xgboost.estimator import XGBoost
from sagemaker.processing import ProcessingInput, ProcessingOutput, ScriptProcessor

from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.pipeline import PipelineModel
from sagemaker.s3 import S3Uploader

import stepfunctions
from stepfunctions import steps
from stepfunctions.inputs import ExecutionInput
from stepfunctions.workflow import Workflow

import boto3
import time

---
## Task 1: Set Up Global Variables

**Steps**
* Put the name of your bucket and filename into S3_BUCKET and KEY respectively - as you did in the previous notebook `cell 3`
* Optional: Change the `PREFIX` name (not mandatory) `cell 3`
* Compare the ARN saved in the [Setup](#Setup) with the ARN printed in `cell 4`. They should be the same.

### Set S3 bucket and data prefix

In [None]:
# cell 3

# Provide information to where the training and validation data will be uploaded to 
S3_BUCKET = 'YOUR_S3_BUCKET' # YOUR_S3_BUCKET
KEY = "YOUR_OBJECT_ON_S3" # YOUR_OBJECT_ON_S3
PREFIX = 'pred-maintenance'
input_data = f"s3://{S3_BUCKET}/{KEY}"

### Set role and global vars

We are going to set the global variables that are going to be used by the SageMaker Resources, `cell 4`:
* `sagemaker_session` is an object that manage interactions with the Amazon SageMaker APIs and any other AWS services needed.
* `region` is the AWS Region where you want to create your object 
* `role` is the IAM role that the instances will use when executing a job.

In the same way as in `ImmersionDay_Notebook2_SageMaker_Resources.ipynb`, in this notebook, the data processing, model training and inference step are runnig using [SageMaker Docker containers](https://docs.aws.amazon.com/sagemaker/latest/dg/docker-containers.html). For each step, a new instance is started to execute the code. The instance specifications are defined in `cell 4`.

To get familiar with the [SageMaker](https://docs.aws.amazon.com/sagemaker/index.html) resources, also look at:
* [SageMaker Python SDK Documentation](https://sagemaker.readthedocs.io/en/stable/#)
* [Sagemaker Developer Guide](https://docs.aws.amazon.com/sagemaker/latest/dg/whatis.html)

In [None]:
# cell 4

# Get a SageMaker-compatible role used by this function and the session.
sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name
role = get_execution_role()

# the step functions execution role
account = boto3.client('sts').get_caller_identity().get('Account')
workflow_execution_role = "arn:aws:iam::{}:role/AmazonSageMaker-StepFunctionsWorkflowExecutionRole".format(account)
print("The Step Function Execution Role ARN: ", workflow_execution_role)

# Set your instance count and type
instance_type = 'ml.m5.xlarge'
instance_count = 1

## Task 2: Define the Step Function Execution Inputs
We will be orchastrating the steps using [AWS Step Functions](https://aws.amazon.com/step-functions/?step-functions.sort-by=item.additionalFields.postDateTime&step-functions.sort-order=desc). To get familiar with the Step Functions, also look at:
* [Step Functions Developer Guide](https://docs.aws.amazon.com/step-functions/latest/dg/welcome.html)
* [Step Function Python SDK Documentation](https://aws-step-functions-data-science-sdk.readthedocs.io/en/stable/#).

Through out the Lab, you cn refer to the [Step Function Python SDK Documentation](https://aws-step-functions-data-science-sdk.readthedocs.io/en/stable/#) and the [Sagemaker Python SDK Documentation](https://sagemaker.readthedocs.io/en/stable/#) for a better understanding of the inputs and outputs

TODO add an image of the step function workflow

In `cell 5`, we define the execution schema and the execution input placeholders.

In [None]:
# cell 5

# define the execution input schema
schema = {
    "ProcessingJobName": str,
    "TrainingJobName": str,
    "ModelName": str,
    "EndpointName": str,
}

# define the execution input placeholders, which needs to be passed in this format to the state machine
execution_input = ExecutionInput(schema=schema)

---
## Task 3: Define the Processing Step

In this task, we are going to run a processing script using [SageMaker Processing Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-processing). To do so, we:
1. Instantiate the Processor by specifying the Docker image location and the instance specifications
1. We define the Processing Step by specifying the script to run, the inputs and the outputs.

Before you can run this code, please update the file found under `src/preprocessor.py` by adding the required code snippets from your notebook `ImmersionDay_Notebook1_GettingStarted.ipynb`.

**Steps**

* Open the notebook `ImmersionDay_Notebook1_GettingStarted.ipynb`
* Open the file `src/preprocessor.py`
* Fill the `ACTION` parts of the `preprocess.py` file by copy & pasting your code from the notebook
    * Here we copy the `feature_column_names` and `label_column` as well as the `train_test_split`

### Retrieve the XGBoost image from SageMaker and instantiate the Processor
In `cell 6`, we are:
* Retreiving the location of the XGBoost Docker Image from [Amazon Elastic Container Registry (Amazon ECR)](https://aws.amazon.com/ecr/). 
* Definition the Processor by specifying the location of the docker image and the instance specification.

In [None]:
# cell 6

framework_version = "1.3-1"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version=framework_version,
    py_version="py3",
    instance_type=instance_type)

script_processor = ScriptProcessor(
    image_uri=image_uri,
    instance_type=instance_type,
    instance_count=instance_count,
    base_job_name=PREFIX,
    command=["python3"],
    sagemaker_session=sagemaker_session,
    role=role,
    volume_size_in_gb=10,
)

### Creating a Processing Step

In [None]:
# cell 7

# Uploading your code folder
gid = time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
input_processing_code = sagemaker_session.upload_data(
    "src/preprocess.py",
    bucket=S3_BUCKET,
    key_prefix=f"{PREFIX}/{gid}/code",
)


# SageMaker Processing
inputs = [
    ProcessingInput(
        source=input_processing_code,
        destination="/opt/ml/processing/input/code",
        input_name="code",
    ),
]
preprocess_step = steps.ProcessingStep(
    'Preprocess Data',
    processor=script_processor,
    job_name=execution_input['ProcessingJobName'],
    inputs=inputs,
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train", destination=f"s3://{S3_BUCKET}/stepfunctions/output/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation", destination=f"s3://{S3_BUCKET}/stepfunctions/output/validation"),
    ],
    container_arguments=["--input-data", input_data],
    container_entrypoint = ["python3", "/opt/ml/processing/input/code/preprocess.py"],
    wait_for_completion=True
)


---
## Task 4: Define the Training Step
Before you can run this code, please update the file found under `src/train.py` by adding the required code snippets from your notebook `ImmersionDay_Notebook1_GettingStarted.ipynb`.

**Steps**

* Open the notebook `ImmersionDay_Notebook1_GettingStarted.ipynb`
* Open the file `src/train.py`
* Fill the `ACTION` parts of the `train.py` file by copy & pasting your code from the notebook
    * Here we copy the `numeric_features` and `categorical_features`
    * Create an `X_train`, `y_train`, `X_val` and `y_val` from your datasets
    * Add your model code

### Create your training container/estimator in SageMaker

In `cell 8`, we define an XGBoost Estimator by specifying:
* the location of the training script
* the hyper parameters
* the role and instance specification
* output path on S3

### Create your training container/estimator in SageMaker

In [None]:
# cell 8

# Set your code folder
entry_point = 'src/train.py'

# Set the hyperparamters for this estimator
hyperparameters = {
    'n_estimators': 100,
    'max_depth': 10,
    'random_state': 42
}

xgb_estimator = XGBoost(
    entry_point=entry_point,
    hyperparameters=hyperparameters,
    role=role,
    instance_count=instance_count,
    instance_type=instance_type,
    framework_version=framework_version,
    sagemaker_session=sagemaker_session,
    output_path=f's3://{S3_BUCKET}/{PREFIX}/model/',)

### Create your Training Step

In [None]:
# cell 9

# Set the model training step
train_step = steps.TrainingStep(
    'Train Model',
    estimator=xgb_estimator,
    data={
        "train": TrainingInput(
            s3_data=preprocess_step.output()['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri'],
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=preprocess_step.output()['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri'],
            content_type="text/csv",
        ),
    },
    job_name=execution_input['TrainingJobName'],
    wait_for_completion=True
)


---
## Task 5: Define the Endpoint Steps

Run throught the cells below to:
1. Create a Model Step in `cell 10` which [creates a model in SageMaker](https://docs.aws.amazon.com/sagemaker/latest/dg/API_CreateModel.html) from the trained estimator
1. Create a Endpoint Configuration Step in `cell 11` which [create an endpoint configuration in SageMaker](https://docs.aws.amazon.com/sagemaker/latest/dg/API_CreateEndpointConfig.html).
1. Creates an Endpoint Step in `cell 12` to create or update an endpoint in SageMaker.

### Create your Model Step

In [None]:
# cell 10

# Save the model to Sagemaker
model_step = steps.ModelStep(
    'RegisterModel',
    model=train_step.get_expected_model(),
    model_name=execution_input['ModelName']
)

### Create your Endpoint Configuration Step

In [None]:
# cell 11

endpoint_config_step = steps.EndpointConfigStep(
    "Create Endpoint Config",
    endpoint_config_name=execution_input["ModelName"],
    model_name=execution_input["ModelName"],
    initial_instance_count=1,
    instance_type="ml.m5.xlarge",
)

### Create your Endpoint Step

In [None]:
# cell 12

endpoint_step = steps.EndpointStep(
    "Create Endpoint",
    endpoint_name=execution_input["EndpointName"],
    endpoint_config_name=execution_input["ModelName"],
)


---
## Task 6: Define the Step Function Workflow

Run throught the cells below to:
1. Define the order of the steps `cell 13` 
1. Define the order of the Step Function Wokflow `cell 14` 

In [None]:
# cell 13

# Defining the steps order
workflow_definition = steps.Chain(
    [
        preprocess_step,
        train_step,
        model_step,
        endpoint_config_step,
        endpoint_step
    ]
)

In [None]:
# cell 14

workflow = Workflow(
    name="sagemaker-pred-maintenance-pipeline",
    definition=workflow_definition,
    role=workflow_execution_role,
    execution_input=execution_input,
)

workflow.create()

workflow.update(definition=workflow_definition, role=workflow_execution_role)
time.sleep(10)

---
## Task 7: Start and Monitor the Step Function Execution

* Start the pipeline in `cell 15` 
* Monitor progress in [Step Function Console](https://eu-west-2.console.aws.amazon.com/states/home?region=eu-west-2#/statemachines/view/arn:aws:states:eu-west-2:273786305532:stateMachine:sagemaker-pred-maintenance-pipeline):

In [None]:
# cell 15

gid = time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
inputs = {
    "TrainingJobName": "sagemaker-xgboost-job-{}".format(gid),
    "ModelName": "sagemaker-xgboost-job-{}".format(gid),
    "EndpointName": "sagemaker-xgboost-job-{}".format(gid),
    "ProcessingJobName": "sagemaker-xgboost-job-{}".format(gid),
}

execution = workflow.execute(inputs=inputs)