# SageMaker Pipelines Customer Churn Prediction

Amazon SageMaker Model Building Pipelines offers machine learning (ML) application developers and operations engineers the ability to orchestrate SageMaker jobs and author reproducible ML pipelines. It also enables them to deploy custom-build models for inference in real-time with low latency, run offline inferences with Batch Transform, and track lineage of artifacts. They can institute sound operational practices in deploying and monitoring production workflows, deploying model artifacts, and tracking artifact lineage through a simple interface, adhering to safety and best practice paradigms for ML application development.

The SageMaker Pipelines service supports a SageMaker Pipeline domain specific language (DSL), which is a declarative JSON specification. This DSL defines a directed acyclic graph (DAG) of pipeline parameters and SageMaker job steps. The SageMaker Python Software Developer Kit (SDK) streamlines the generation of the pipeline DSL using constructs that engineers and scientists are already familiar with.

## SageMaker Pipelines

SageMaker Pipelines supports the following activities, which are demonstrated in this notebook:

* Pipelines - A DAG of steps and conditions to orchestrate SageMaker jobs and resource creation.
* Processing job steps - A simplified, managed experience on SageMaker to run data processing workloads, such as feature engineering, data validation, model evaluation, and model interpretation.
* Training job steps - An iterative process that teaches a model to make predictions by presenting examples from a training dataset.
* Conditional execution steps - A step that provides conditional execution of branches in a pipeline.
* Register model steps - A step that creates a model package resource in the Model Registry that can be used to create deployable models in Amazon SageMaker.
* Create model steps - A step that creates a model for use in transform steps or later publication as an endpoint.
* Parametrized Pipeline executions - Enables variation in pipeline executions according to specified parameters.

## Notebook Overview

This notebook shows how to:

* Define a set of Pipeline parameters that can be used to parametrize a SageMaker Pipeline.
* Define a Processing step that extracts data from feature store to create the train, validation and test data sets.
* Define a Training step that trains a model on the preprocessed train data set.
* Define a Processing step that evaluates the trained model's performance on the test dataset.
* Define a Conditional step that measures a condition based on output from prior steps and conditionally executes other steps.
* Define a Register Model step that creates a model package from the estimator and model artifacts used to train the model.
* Define and create a Pipeline definition in a DAG, with the defined parameters and steps.
* Start a Pipeline execution and wait for execution to complete.
* Download the model evaluation report from the S3 bucket for examination.
* Deploy registered models for real-time inference
* Invoke the real-time endpoint using features retrieved from SageMaker Online Feature Store


### Data

Mobile operators have historical records on which customers ultimately ended up churning and which continued using the service. We can use this historical information to construct an ML model of one mobile operator's churn using a process called training. After training the model, we can pass the profile information of an arbitrary customer (the same profile information that we used to train the model) to the model, and have the model predict whether this customer is going to churn. Of course, we expect the model to make mistakes. After all, predicting the future is tricky business!

The dataset we will use is synthetically generated, but indictive of the types of features you'd see in this use case. By modern standards, itâ€™s a relatively small dataset, with only 5,000 records which are stored in SageMaker offline feature store, where each record uses 23 attributes to describe the profile of a customer of an unknown US mobile operator. The attributes are:
- `customerID`: Unique identifier of each customer
- `Account length`: the number of days that this account has been active,
- `Intl_Plan`: whether the customer has an international calling plan: yes/no,
- `VMail_Plan`: whether the customer has a voice mail feature: yes/no,
- `VMail_Message`: the average number of voice mail messages per month,
- `Day_Mins`: the total number of calling minutes used during the day,
- `Day_Calls`: the total number of calls placed during the day,
- `Eve Mins, Eve Calls`: the billed cost for calls placed during the evening,
- `Night Mins`, `Night Calls`: the billed cost for calls placed during nighttime,
- `Intl_Mins`, `Intl Calls`: the billed cost for international calls,
- `CustServ_Calls`: the number of calls placed to Customer Service,
- `pastSenti_pos`: the number of past calls with positive sentiment,
- `pastSenti_nut`: the number of past calls with nutral sentiment,
- `pastSenti_neg`: the number of past calls with negative sentiment,
- `sentiment`: sentiment of the real-time call,
- `Churn_true`: whether the customer left the service (this is the label)

## Section 1: Model Training using SageMaker Pipelines

The pipeline that you create follows a typical machine learning (ML) application pattern of preprocessing, training, evaluation, and model registration:

![A typical ML Application pipeline](./img/pipeline-full.png)

## Preparation


Let's start by importing necessary packages and specifying:

- The S3 bucket and prefix that you want to use for training and model data.  This should be within the same region as the Notebook Instance, training, and hosting.
- The IAM role arn used to give training and hosting access to your data. See the documentation for how to create these.  Note, if more than one role is required for notebook instances, training, and/or hosting, please replace the boto regexp with a the appropriate full IAM role arn string(s).

In [None]:
# cell-00
# install additional packages needed for this workshop
%pip install graphviz -q

In [None]:
# cell-01
import json
import os
import io
from io import StringIO

import boto3
import sagemaker
import sagemaker.session
from sagemaker import get_execution_role

from typing import Dict
from pprint import pprint
import datetime
import pandas as pd

from sagemaker.dataset_definition.inputs import (
    AthenaDatasetDefinition,
    DatasetDefinition,
)
from sagemaker.workflow.clarify_check_step import (
    DataBiasCheckConfig,
    ClarifyCheckStep,
    ModelBiasCheckConfig,
    ModelExplainabilityCheckConfig,
    ClarifyCheckConfig,
)
from sagemaker.clarify import (
    DataConfig,
    BiasConfig,
    ModelConfig,
    ModelPredictedLabelConfig,
    SHAPConfig,
)
from sagemaker.drift_check_baselines import DriftCheckBaselines
from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker.inputs import TrainingInput
from sagemaker.lambda_helper import Lambda
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.model_monitor.dataset_format import DatasetFormat
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import (
    ProcessingInput,
    ProcessingOutput,
    Processor,
    ScriptProcessor,
)
from sagemaker.workflow.check_job_config import CheckJobConfig
from sagemaker.workflow.clarify_check_step import ClarifyCheckStep, DataBiasCheckConfig
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.execution_variables import ExecutionVariables
from sagemaker.workflow.functions import Join
from sagemaker.workflow.parameters import (
    ParameterFloat,
    ParameterInteger,
    ParameterString,
)
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
from sagemaker.model import Model

from sagemaker.workflow.step_collections import EstimatorTransformer, RegisterModel
from sagemaker.workflow.steps import CacheConfig, ProcessingStep, Step, TrainingStep
from sagemaker.xgboost.estimator import XGBoost
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.functions import JsonGet
from sagemaker.utils import name_from_base

import utilities as utl
import time
import uuid
import csv
from urllib.parse import urlparse

In [None]:
# cell-02
role = get_execution_role()

sagemaker_session = sagemaker.Session()
pipeline_session = sagemaker.workflow.pipeline_context.PipelineSession()
region = sagemaker_session.boto_region_name
sagemaker_client = sagemaker_session.sagemaker_client
sagemaker_runtime = boto3.client("sagemaker-runtime")

bucket=sagemaker.Session().default_bucket()
prefix = 'sagemaker/DEMO-xgboost-customer-churn-connect'
base_job_prefix = 'Demo-xgboost-churn-connect'

s3_client = boto3.client("s3")

fg_name = "fg-contact-center-data"  # this is the feature group name that has already been created during account setup

### Define Parameters to Parametrize Pipeline Execution

Define Pipeline parameters that you can use to parametrize the pipeline. Parameters enable custom pipeline executions and schedules without having to modify the Pipeline definition.

The supported parameter types include:

* `ParameterString` - represents a `str` Python type
* `ParameterInteger` - represents an `int` Python type
* `ParameterFloat` - represents a `float` Python type

These parameters support providing a default value, which can be overridden on pipeline execution. The default value specified should be an instance of the type of the parameter.

The parameters defined in this workflow include:

* `processing_instance_type` - The `ml.*` instance type of the processing job.
* `processing_instance_count` - The instance count of the processing job.
* `training_instance_type` - The `ml.*` instance type of the training job.
* `train_instance_count` - The instance count of the training job.
* `model_approval_status` - What approval status to register the trained model with for CI/CD purposes ( "PendingManualApproval" is the default).
* `model_output` - The S3 bucket URI location of the model output path

![Define Parameters](./img/pipeline-1.png)

In [None]:
# cell-03
# parameters for pipeline execution

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount", default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType", default_value="ml.m5.xlarge"
)

train_instance_count = ParameterInteger(
    name="TrainingInstanceCount",
    default_value=1,
)
train_instance_type = ParameterString(
    name="TrainingInstance",
    default_value="ml.m5.xlarge",
)


model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval",
    enum_values=[
        "PendingManualApproval",
        "Approved",
    ],
)
model_output = ParameterString(
    name="ModelOutputUrl",
    default_value=f"s3://{bucket}/{prefix}/model",
)


### Define a Processing Step for dataset creation from the SageMaker Feature Store

First, develop a preprocessing script that is specified in the Processing step.

In this example, we have already created the `create_dataset.py` in the `scripts` folder, which contains the dataset creation script, and stored the path to the script. You can use the following cell to load the stored value.

The Processing step executes the script on the input data. The Training step uses the preprocessed training features and labels to train a model. The Evaluation step uses the trained model and preprocessed test features and labels to evaluate the model.

![Define a Processing Step for Feature Engineering](./img/pipeline-2.png)

Next, create an instance of an `SKLearnProcessor` processor and use that in our `ProcessingStep`.

You also specify the `framework_version` to use throughout this notebook.

Note the `processing_instance_type` and `processing_instance_count` parameters used by the processor instance.

When creating a [Processing job](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateProcessingJob.html), one needs to specify the `ProcessingInputs` parameter which tell the SageMaker service where to get the input data. If the data is already available on S3, we can use the S3Input to define the inputs for the processing job. However, in our example, the data is stored in the offline Feature Store, we can use the [DatasetDefinition](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DatasetDefinition.html) which supports the data sources like S3 which can be queried via Athena and Redshift. We use the [AthenaDatasetDefinition](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_AthenaDatasetDefinition.html) option, it executes SQL queries and generate datasets to S3 which will be available as the inputs of the processing job.

In [None]:
# cell-04
# processing step for dataset creation
dataset_dict = {
    "customers_fg_name": fg_name, # fill your fg name that was created in your account, for example "fg-contact-center-data-352ebe3e"
    "label_name": "churn_true",
    "features_names": [
        'account_length', 
        'vmail_message', 
        'day_mins',
        'day_calls', 
        'eve_mins', 
        'eve_calls', 
        'night_mins', 
        'night_calls',
        'intl_mins', 
        'intl_calls', 
        'custserv_calls', 
        'sentiment',
        'pastsenti_nut', 
        'pastsenti_pos', 
        'pastsenti_neg', 
        'mth_remain',
        'int_l_plan_no', 
        'int_l_plan_yes', 
        'vmail_plan_no', 
        'vmail_plan_yes',
    ],
}

customers_fg_info = utl.get_fg_info(
    dataset_dict["customers_fg_name"],
    sagemaker_session=sagemaker_session,
)

label_name = dataset_dict["label_name"]
features_names = dataset_dict["features_names"]
training_columns = [label_name] + features_names
training_columns_string = ", ".join(f'"{c}"' for c in training_columns)

query_string = f"""SELECT DISTINCT {training_columns_string}
    FROM "{customers_fg_info.table_name}" 
"""
print(query_string)

In [None]:
# cell-05
athena_data_path = "/opt/ml/processing/athena"
script_path = './scripts/create_dataset.py'

create_dataset_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name=f"{base_job_prefix}-create-dataset",
    sagemaker_session=pipeline_session,
    role=role,
)

In [None]:
# cell-06
data_sources = [
    ProcessingInput(
        input_name="athena_dataset",
        dataset_definition=DatasetDefinition(
            local_path=athena_data_path,
            data_distribution_type="FullyReplicated",
            athena_dataset_definition=AthenaDatasetDefinition(
                catalog=customers_fg_info.catalog,
                database=customers_fg_info.database,
                query_string=query_string,
                output_s3_uri=Join(
                    on="/",
                    values=[
                        "s3:/",
                        bucket,
                        prefix,
                        ExecutionVariables.PIPELINE_EXECUTION_ID,
                        "raw_dataset",
                    ],
                ),
                output_format="PARQUET",
            ),
        ),
    )
]

Finally, use the processor instance to construct a `ProcessingStep`, along with the input and output channels, and the code that will be executed when the pipeline invokes pipeline execution. This is similar to a processor instance's `run` method in the Python SDK. Also, note the `"train"`, `"validation"` and `"test"` named channels specified in the output configuration for the processing job. Step `Properties` can be used in subsequent steps and resolve to their runtime values at execution. Specifically, this usage is called out when you define the training step.

In [None]:
# cell-07
step_args = create_dataset_processor.run(
    inputs=data_sources,
    outputs=[
        ProcessingOutput(
            output_name="train_data",
            source="/opt/ml/processing/output/train",
            destination=Join(
                on="/",
                values=[
                    "s3:/",
                    bucket,
                    prefix,
                    ExecutionVariables.PIPELINE_EXECUTION_ID,
                    "train_dataset",
                ],
            ),
        ),
        ProcessingOutput(
            output_name="validation_data",
            source="/opt/ml/processing/output/validation",
            destination=Join(
                on="/",
                values=[
                    "s3:/",
                    bucket,
                    prefix,
                    ExecutionVariables.PIPELINE_EXECUTION_ID,
                    "validation_dataset",
                ],
            ),
        ),
        ProcessingOutput(
            output_name="test_data",
            source="/opt/ml/processing/output/test",
            destination=Join(
                on="/",
                values=[
                    "s3:/",
                    bucket,
                    prefix,
                    ExecutionVariables.PIPELINE_EXECUTION_ID,
                    "test_dataset",
                ],
            ),
        ),
    ],
    arguments=[
        "--athena-data",
        athena_data_path,
    ],
    code=script_path,
)

create_dataset_step = ProcessingStep(
   name="CreateDataSet",
   step_args=step_args,
)


### Define a Training Step to Train a Model

In this section, use Amazon SageMaker's [XGBoost Algorithm](https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html) to train on this dataset. Configure an Estimator for the XGBoost algorithm and the input dataset. A typical training script loads data from the input channels, configures training with hyperparameters, trains a model, and saves a model to `model_output` so that it can be hosted later. 

The model path where the models from training will be saved is also specified.

Note the `training_instance_type` parameter may be used in multiple places in the pipeline. In this case, the `training_instance_type` and `train_instance_count` is passed into the estimator.

![Define a Training Step to Train a Model](./img/pipeline-3.png)

In [None]:
# cell-08
# training step for generating model artifacts
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
)
xgb_train = sagemaker.estimator.Estimator(
    image_uri=image_uri,
    instance_type=train_instance_type,
    instance_count=train_instance_count,
    output_path=model_output,
    base_job_name=f"{base_job_prefix}-train",
    sagemaker_session=pipeline_session,
    role=role,
)

# Set some hyper parameters
# https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost_hyperparameters.html
xgb_train.set_hyperparameters(
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.8,
    silent=0,
    objective="binary:logistic",
    num_round=100,
    eval_metric='auc'
)


Finally, use the estimator instance to construct a `TrainingStep` as well as the `properties` of the prior `ProcessingStep` used as input in the `TrainingStep` inputs and the code that's executed when the pipeline invokes the pipeline execution. This is similar to an estimator's `fit` method in the Python SDK.

Pass in the `S3Uri` of the `"train"` output channel to the `TrainingStep`. Also, use the other `"validation"` output channel for model evaluation in the pipeline. The `properties` attribute of a Pipeline step matches the object model of the corresponding response of a describe call. These properties can be referenced as placeholder values and are resolved at runtime. For example, the `ProcessingStep` `properties` attribute matches the object model of the [DescribeProcessingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeProcessingJob.html) response object.

In [None]:
# cell-09
train_step_args = xgb_train.fit(
    inputs={
        "train": TrainingInput(
            s3_data=create_dataset_step.properties.ProcessingOutputConfig.Outputs[
                "train_data"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=create_dataset_step.properties.ProcessingOutputConfig.Outputs[
                "validation_data"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    },
)

step_train = TrainingStep(
    name="TrainModel",
    step_args=train_step_args,
)

### Define a Model Evaluation Step to Evaluate the Trained Model

First, develop an evaluation script that is specified in a Processing step that performs the model evaluation.

After pipeline execution, you can examine the resulting `evaluation.json` for analysis.

The evaluation script uses `xgboost` to do the following:

* Load the model.
* Read the test data.
* Issue predictions against the test data.
* Build a classification report, including mae, mse, rmse and r2 metrics.
* Save the evaluation report to the evaluation directory.

![Define a Model Evaluation Step to Evaluate the Trained Model](./img/pipeline-4.png)

Next, create an instance of a `ScriptProcessor` processor and use it in the `ProcessingStep`.

Note the `processing_instance_type` parameter passed into the processor.

In [None]:
# cell-10
# processing step for evaluation
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name=f"{base_job_prefix}/script-eval",
    sagemaker_session=pipeline_session,
    role=role,
)


Use the processor instance to construct a `ProcessingStep`, along with the input and output channels and the code that will be executed when the pipeline invokes pipeline execution. This is similar to a processor instance's `run` method in the Python SDK.

Specifically, the `S3ModelArtifacts` from the `step_train` `properties` and the `S3Uri` of the `"test"` output channel of the `step_process` `properties` are passed into the inputs. The `TrainingStep` and `ProcessingStep` `properties` attribute matches the object model of the [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html) and  [DescribeProcessingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeProcessingJob.html) response objects, respectively.

In [None]:
# cell-11
evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation.json",
)
eval_step_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=create_dataset_step.properties.ProcessingOutputConfig.Outputs[
                "test_data"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name="evaluation", source="/opt/ml/processing/evaluation"
        ),
    ],
    code="./scripts/evaluation.py",
)

step_eval = ProcessingStep(
    name="EvaluateModel",
    step_args=eval_step_args,
    property_files=[evaluation_report],

)

### Define a Register Model Step to Create a Model Package

Use the estimator instance specified in the training step to construct an instance of `RegisterModel`. The result of executing `RegisterModel` in a pipeline is a model package. A model package is a reusable model artifacts abstraction that packages all ingredients required for inference. Primarily, it consists of an inference specification that defines the inference image to use along with an optional model weights location.

A model package group is a collection of model packages. A model package group can be created for a specific ML business problem, and new versions of the model packages can be added to it. Typically, customers are expected to create a ModelPackageGroup for a SageMaker pipeline so that model package versions can be added to the group for every SageMaker Pipeline run.

The construction of `RegisterModel` is similar to an estimator instance's `register` method in the Python SDK.

Specifically, pass in the `S3ModelArtifacts` from the `TrainingStep`, `step_train` properties. The `TrainingStep` `properties` attribute matches the object model of the [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html) response object.

Note that the specific model package group name provided in this notebook can be used in the model registry and CI/CD work with SageMaker Projects.

![Define Parameters](./img/pipeline-7.png)

In [None]:
# cell-12
# register model step that will be conditionally executed
model = Model(
    image_uri=xgb_train.image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"][
                "S3Uri"
            ]
        ),
        content_type="application/json",
    )
)

model_package_group_name = "CustomerChurnModelPackage"
register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.t2.large", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)


step_register = ModelStep(
    name="RegisterModel",
    step_args=register_args,
)

### Define a Condition Step to Check Accuracy and Conditionally Register a Model in the Model Registry

In this step, the model is registered only if the accuracy of the model, as determined by the evaluation step `step_eval`, exceeded a specified value. A `ConditionStep` enables pipelines to support conditional execution in the pipeline DAG based on the conditions of the step properties. 

In the following section, you:

* Define a `ConditionLessThanOrEqualTo` on the [roc_auc](https://en.wikipedia.org/wiki/Receiver_operating_characteristic) value found in the output of the evaluation step, `step_eval`.
* Use the condition in the list of conditions in a `ConditionStep`.
* Pass the `ModelStep` and the `RegisterModel` step collection into the `if_steps` of the `ConditionStep`, which are only executed, if the condition evaluates to `True`.

![Define a Condition Step to Check Accuracy and Conditionally Execute Steps](./img/pipeline-8.png)

In [None]:
# cell-13
# condition step for evaluating model quality and branching execution
cond_lte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="binary_classification_metrics.roc_auc.value",
    ),
    right=0.90,
)
step_cond = ConditionStep(
    name="CheckEvaluation",
    conditions=[cond_lte],
    if_steps=[step_register],
    else_steps=[],
)

### Define a Pipeline of Parameters, Steps, and Conditions

In this section, combine the steps into a Pipeline so it can be executed.

A pipeline requires a `name`, `parameters`, and `steps`. Names must be unique within an `(account, region)` pair.

Note:

* All of the parameters used in the definitions must be present.
* Steps passed into the pipeline do not have to be listed in the order of execution. The SageMaker Pipeline service resolves the _data dependency_ DAG as steps for the execution to complete.
* Steps must be unique to across the pipeline step list and all condition step if/else lists.

![Define a Pipeline of Parameters, Steps, and Conditions](./img/pipeline-9.png)

In [None]:
# cell-14
# pipeline instance
pipeline_name = f"Demo-customer-churn-pipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        train_instance_count,
        train_instance_type,
        model_approval_status,
        model_output
    ],
    steps=[create_dataset_step, step_train, step_eval, step_cond],
    sagemaker_session=pipeline_session,
)

### Submit the pipeline to SageMaker and start execution

Submit the pipeline definition to the Pipeline service. The role passed in will be used by the Pipeline service to create all the jobs defined in the steps.

Once the pipeline starts execution, we will describe the pipeline to confirm it's setting.

In [None]:
# cell-15
pipeline.upsert(role_arn=role)

# start the pipeline
execution = pipeline.start()

# describe the pipeline execution
execution.describe()

### Pipeline Operations: Waiting for Pipeline Execution
Wait for the execution to complete.

In [None]:
# cell-16
execution.wait()

### Examining the Evalution

Examine the resulting model evaluation after the pipeline completes. Download the resulting `evaluation.json` file from S3 and print the report.

In [None]:
# cell-17
evaluation_json = sagemaker.s3.S3Downloader.read_file("{}/evaluation.json".format(
    step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
))
pprint(json.loads(evaluation_json))

In [None]:
# cell-18
# list the model packages in the model package group and get the model version arn
list_model_packages_response = sagemaker_client.list_model_packages(
    ModelPackageGroupName=model_package_group_name
)
model_version_arn = list_model_packages_response["ModelPackageSummaryList"][0][
    "ModelPackageArn"
]
print("registered model version arn: {}".format(model_version_arn))

### Update Model Approval Status
After you create a model version, you typically want to evaluate its performance before you deploy it to a production endpoint. If it performs to your requirements, you can update the approval status of the model version to `Approved`. Setting the status to Approved can initiate CI/CD deployment for the model. If the model version does not perform to your requirements, you can update the approval status to `Rejected`.

In [None]:
# cell-19
model_package_update_input_dict = {
    "ModelPackageArn": model_version_arn,
    "ModelApprovalStatus": "Approved",
}
model_package_update_response = sagemaker_client.update_model_package(**model_package_update_input_dict)

### The End of Section 1
Congratulations of successfully building and executing the SageMaker pipeline. 
Please refer to the instructions for the next step.

----------------
## Section 2: Deploy registered models for real-time inference
Next we will create a SageMaker real-time endpoint for each of the registered model version.

After the model package is approved, we can create a SageMaker Model using the approved model version.

In [None]:
# cell-20
now = f"{datetime.datetime.now():%Y-%m-%d-%H-%M-%S}"
model_name = f"sagemaker-endpoint-customerchurn-{now}"
primary_container = {
    "ModelPackageName": model_version_arn,
}

create_model_response = sagemaker_client.create_model(
    ModelName=model_name, ExecutionRoleArn=role, PrimaryContainer=primary_container
)

print("Model arn : {}".format(create_model_response["ModelArn"]))

### Create an Endpoint Config from the model
This will create an endpoint configuration that Amazon SageMaker hosting services uses to deploy models. In the configuration, you identify one or more models, created using the `CreateModel` API, to deploy and the resources that you want Amazon SageMaker to provision. Then you call the `CreateEndpoint` API.

In [None]:
# cell-21
deploy_instance_type = "ml.m5.xlarge"
endpoint_config_name = f"sagemaker-endpoint-customerchurn-{now}"
endpoint_config_response = sagemaker_client.create_endpoint_config(
    EndpointConfigName=endpoint_config_name,
    ProductionVariants=[
        {
            "VariantName": "AllTrafficVariant",
            "ModelName": model_name,
            "InitialInstanceCount": 1,
            "InstanceType": deploy_instance_type,
            "InitialVariantWeight": 1,
        },
    ],
)

In [None]:
# cell-22
endpoint_name = f"sagemaker-endpoint-xgboost-customerchurn"
create_endpoint_response = sagemaker_client.create_endpoint(
    EndpointName=endpoint_name,
    EndpointConfigName=endpoint_config_name,
)

In [None]:
# cell-23
# wait for the endpoint creation to be successful
describe_endpoint_response = sagemaker_client.describe_endpoint(EndpointName=endpoint_name)

while describe_endpoint_response["EndpointStatus"] == "Creating":
    describe_endpoint_response = sagemaker_client.describe_endpoint(EndpointName=endpoint_name)
    print(describe_endpoint_response["EndpointStatus"])
    time.sleep(15)

### The End of Section 2
Congratulations of successfully deploying the trained model to a SageMaker real-time endpoint. 
Please refer to the instructions for the next step.

-------------------
## Section 3: Model Inference - Invoke the real-time endpoint using features retrieved from the SageMaker Online Feature Store
We will test the endpoint by fetching the online SageMaker Feature Store features and invoke the endpoint. 
- featch the customer ID from the original dataset that is stored on S3
- read the customer ID and the churn status
- use the customre ID to fetch the features from Online Feature Store
- invoke the endpoint using boto3 API call

In [None]:
# cell-24
# read the csv file that was used to ingeste data into the Feature Store
s3 = boto3.resource("s3")
s3_obj = s3.Object(bucket_name=bucket, key=f"{prefix}/input/churn_processed.csv")
body = s3_obj.get()['Body']
csv_string = body.read().decode('utf-8')
df = pd.read_csv(StringIO(csv_string))
df.head()

In [None]:
# cell-25
# fetch the features from Online Feature Store with the first customer ID in the csv file
boto_session = boto3.Session()
customerid = str(df.iloc[0]["customerID"])
churn_state = df.iloc[0]["Churn"]
featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region)
fsresponse = featurestore_runtime.get_record(FeatureGroupName=fg_name, RecordIdentifierValueAsString=customerid)
Record = fsresponse ['Record']

df_record = pd.DataFrame(Record).set_index('FeatureName').transpose()
df_record.drop(['event_time','customerID'], inplace=True, axis=1)

In [None]:
# cell-26
# invoke the endpoit with the features received from the Online Feature Store
csv_buffer = io.StringIO()
csv_writer = csv.writer(csv_buffer, delimiter=",")
csv_writer.writerow(df_record.values[0][1:])

response = sagemaker_runtime.invoke_endpoint(
  EndpointName=endpoint_name, ContentType="text/csv", Body=csv_buffer.getvalue().rstrip("\r\n")
)
churn_pred = response["Body"].read().decode("ascii")
print(f"churn prediction score is {churn_pred} and the customer churn status is {churn_state}")

-------------
## Congratulations!!! You have now successfully finished the model training and deployment lab. 

### Please move on to the next part of the workshop.

## Appendix

### Clean Up 
Uncomment the below cell to clean up the resources created during the workshop in SageMaker at the end of the workshop

In [None]:
# # cell-27
# # delete endpoint
# sagemaker_client.delete_endpoint(EndpointName=endpoint_name)

# # delete pipeline
# sagemaker_client.delete_pipeline(PipelineName=pipeline_name)

# # delete feature group
# sagemaker_client.delete_feature_group(FeatureGroupName=fg_name)

# # delete model package
# model_packages = sagemaker_client.list_model_packages(ModelPackageGroupName=model_package_group_name)["ModelPackageSummaryList"]
# for mp in model_packages:
#     sagemaker_client.delete_model_package(ModelPackageName=mp["ModelPackageArn"])
#     print(f"\nDeleted model package: {mp['ModelPackageArn']}")
#     time.sleep(1)
# sagemaker_client.delete_model_package_group(ModelPackageGroupName=model_package_group_name)

# # delete s3 objects
# delete_s3_objects = False # change this value to True if you would like to delete the s3 objects created by this project
# if delete_s3_objects == True and bucket is not None:
#     s3 = boto3.resource("s3")
#     bucket = s3.Bucket(bucket)
#     bucket.objects.filter(Prefix=f"{prefix}/").delete()
#     print(f"\nDeleted contents of {bucket_name}/{prefix}")