# SageMaker Pipelines Customer Churn Prediction using Contact Centre Data

---------------------------------
`This notebook should work well with the Python 3 (Data Scienct) kernel in SageMaker Studio`

---------------------------------

Amazon SageMaker Model Building Pipelines offers machine learning (ML) application developers and operations engineers the ability to orchestrate SageMaker jobs and author reproducible ML pipelines. It also enables them to deploy custom-build models for inference in real-time with low latency, run offline inferences with Batch Transform, and track lineage of artifacts. They can institute sound operational practices in deploying and monitoring production workflows, deploying model artifacts, and tracking artifact lineage through a simple interface, adhering to safety and best practice paradigms for ML application development.

The SageMaker Pipelines service supports a SageMaker Pipeline domain specific language (DSL), which is a declarative JSON specification. This DSL defines a directed acyclic graph (DAG) of pipeline parameters and SageMaker job steps. The SageMaker Python Software Developer Kit (SDK) streamlines the generation of the pipeline DSL using constructs that engineers and scientists are already familiar with.

## SageMaker Pipelines

SageMaker Pipelines supports the following activities, which are demonstrated in this notebook:

* Pipelines - A DAG of steps and conditions to orchestrate SageMaker jobs and resource creation.
* Processing job steps - A simplified, managed experience on SageMaker to run data processing workloads, such as feature engineering, data validation, model evaluation, and model interpretation.
* Training job steps - An iterative process that teaches a model to make predictions by presenting examples from a training dataset.
* Conditional execution steps - A step that provides conditional execution of branches in a pipeline.
* Register model steps - A step that creates a model package resource in the Model Registry that can be used to create deployable models in Amazon SageMaker.
* Create model steps - A step that creates a model for use in transform steps or later publication as an endpoint.
* Clarify steps - A ClarifyCheck step that conduct model explainability check which launches a processing job that runs the SageMaker Clarify prebuilt container.
* Parametrized Pipeline executions - Enables variation in pipeline executions according to specified parameters.

## Solution
`TODO:
discuss the solution and show two different pipelines`

### Environment Setup

Note:

The following policies need to be attached to the execution role that you used to run this notebook:

* AmazonSageMakerFullAccess
* AmazonSageMakerFeatureStoreAccess
* AmazonS3FullAccess

Import libraries, setup logging, and define few variables

In [None]:
import pandas as pd
import json
import os
import logging
from pathlib import Path

import boto3
import sagemaker
from sagemaker.session import Session
from sagemaker import get_execution_role
from sagemaker.feature_store.feature_definition import FeatureDefinition
from sagemaker.feature_store.feature_definition import FeatureTypeEnum
from sagemaker.feature_store.feature_group import FeatureGroup

from features_ingestion_pipeline.feature_ingestion_pipeline import create_pipeline
from build_pipeline.model_build_pipeline import get_pipeline
from batch_pipeline.batch_transform_pipeline import get_batch_pipeline

from time import gmtime, strftime
import time
import uuid

In [None]:
role = get_execution_role()

region = boto3.Session().region_name
boto_session = boto3.Session(region_name=region)
sagemaker_session = sagemaker.Session()

sagemaker_client = boto_session.client(service_name='sagemaker', region_name=region)
featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region)

# You can configure this with your own bucket name, e.g.
# bucket = <my-own-storage-bucket>
bucket=sagemaker.Session().default_bucket()
prefix = 'DEMO-xgboost-customer-churn-connect'
base_job_prefix = 'Demo-xgboost-churn-connect'

s3_client = boto3.client("s3")
s3_uploader = sagemaker.s3.S3Uploader


Set up a logger

In [None]:
logger = logging.getLogger("__name__")
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())

In [None]:
# data upload to s3
local_path = Path("data")
data_uri_prefix = s3_uploader.upload(local_path.as_posix(), f"s3://{bucket}/{prefix}/data")
input_data_url = data_uri_prefix + "/dataset.csv"

In [None]:
%store input_data_url

In [None]:
%store
%store -r

## Setup SageMaker Feature Store

SageMaker Feature Store is a SageMaker capability that makes it easy for customers to create and manage curated features for machine learning (ML) development. It erves as the single source of truth to store, retrieve, remove, track, share, discover, and control access to features. SageMaker Feature Store enables data ingestion via a high TPS API and data consumption via the online and offline stores.

### Terminology
* `Feature group` â€“ A FeatureGroup is the main Feature Store resource that contains the metadata for all the data stored in Amazon SageMaker Feature Store. A feature group is a logical grouping of features, defined in the feature store, to describe records. A feature groupâ€™s definition is composed of a list of feature definitions, a record identifier name, and configurations for its online and offline store. 

* `Feature definition` â€“ A FeatureDefinition consists of a name and one of the following data types: an Integral, String or Fractional. A FeatureGroup contains a list of feature definitions. 

* `Record identifier name` â€“ Each feature group is defined with a record identifier name. The record identifier name must refer to one of the names of a feature defined in the feature group's feature definitions. 

* `Event time` â€“ a point in time when a new event occurs that corresponds to the creation or update of a record in a feature group. All records in the feature group must have a corresponding Eventtime. It can be used to track changes to a record over time. The online store contains the record corresponding to the last Eventtime for a record identifier name, whereas the offline store contains all historic records.

* `Online Store` â€“ the low latency, high availability cache for a feature group that enables real-time lookup of records. The online store allows quick access to the latest value for a Record via the GetRecord API. A feature group contains an OnlineStoreConfig controlling where the data is stored.

* `Offline store` â€“ the OfflineStore, stores historical data in your S3 bucket. It is used when low (sub-second) latency reads are not needed. For example, when you want to store and serve features for exploration, model training, and batch inference. A feature group contains an OfflineStoreConfig controlling where the data is stored.

### Define Feature Group
Select Record identifier and Event time feature name. These are required parameters for feature group
creation.
* **Record identifier name** is the name of the feature defined in the feature group's feature definitions 
whose value uniquely identifies a Record defined in the feature group's feature definitions.
* **Event time feature name** is the name of the EventTime feature of a Record in FeatureGroup. An EventTime 
is a timestamp that represents the point in time when a new event occurs that corresponds to the creation or 
update of a Record in the FeatureGroup. All Records in the FeatureGroup must have a corresponding EventTime.

<div class="alert alert-info"> ðŸ’¡Record identifier and Event time feature name are required 
for feature group. After filling in the values, you can choose <b>Run Selected Cell and All Below</b> 
from the Run Menu from the menu bar. 
</div>

In [None]:
record_identifier_feature_name = "customerID"
if record_identifier_feature_name is None:
   raise SystemExit("Select a column name as the feature group record identifier.")

event_time_feature_name = "event_time"
if event_time_feature_name is None:
   raise SystemExit("Select a column name as the event time feature name.")

### Feature Definitions
The following is a list of the feature names and feature types of the final dataset that will be produced 
when your data flow is used to process your input dataset. These are automatically generated from the 
step `Custom Pyspark` from `Source: Answers.Csv`. To save from a different step, go to Data Wrangler to 
select a new step to export.

<div class="alert alert-info"> ðŸ’¡ <strong> Configurable Settings </strong>

1. You can select a subset of the features. By default all columns of the result dataframe will be used as 
features.
2. You can change the Data Wrangler data type to one of the Feature Store supported types 
(<b>Integral</b>, <b>Fractional</b>, or <b>String</b>). The default type is set to <b>String</b>. 
This means that, if a column in your dataset is not a <b>float</b> or <b>long</b> type, it will default 
to <b>String</b> in your Feature Store.

For <b>Event Time</b> features, make sure the format follows the feature store
<strong>
    <a style="color: #0397a7 " href="https://docs.aws.amazon.com/sagemaker/latest/dg/feature-store-quotas.html#feature-store-data-types">
    <u>Event Time feature format</u>
    </a>
</strong>
</div>
The following is a list of the feature names and data types of the final dataset that will be produced when your data flow is used to process your input dataset.

In [None]:
column_schemas = [
    {
        "name": "Churn_true",
        "type": "long"
    },
    {
        "name": "Account_Length",
        "type": "long"
    },
    {
        "name": "customerID",
        "type": "long"
    },
    {
        "name": "VMail_Message",
        "type": "long"
    },
    {
        "name": "Day_Mins",
        "type": "float"
    },
    {
        "name": "Day_Calls",
        "type": "long"
    },
    {
        "name": "Eve_Mins",
        "type": "float"
    },
    {
        "name": "Eve_Calls",
        "type": "long"
    },
    {
        "name": "Night_Mins",
        "type": "float"
    },
    {
        "name": "Night_Calls",
        "type": "long"
    },
    {
        "name": "Intl_Mins",
        "type": "float"
    },
    {
        "name": "Intl_Calls",
        "type": "long"
    },
    {
        "name": "CustServ_Calls",
        "type": "long"
    },
    {
        "name": "pastSenti_nut",
        "type": "long"
    },
    {
        "name": "pastSenti_pos",
        "type": "long"
    },
    {
        "name": "pastSenti_neg",
        "type": "long"
    },
    {
        "name": "mth_remain",
        "type": "long"
    },
    {
        "name": "Int_l_Plan_no",
        "type": "long"
    },
    {
        "name": "Int_l_Plan_yes",
        "type": "long"
    },
    {
        "name": "VMail_Plan_no",
        "type": "long"
    },
    {
        "name": "VMail_Plan_yes",
        "type": "long"
    },
    {
        "name": "event_time",
        "type": "float"
    }
]

Below we create the SDK input for those feature definitions. Some schema types in Data Wrangler are not 
supported by Feature Store. The following will create a default_FG_type set to String for these types.

In [None]:
default_feature_type = FeatureTypeEnum.STRING
column_to_feature_type_mapping = {
    "float": FeatureTypeEnum.FRACTIONAL,
    "long": FeatureTypeEnum.INTEGRAL
}

feature_definitions = [
    FeatureDefinition(
        feature_name=column_schema['name'], 
        feature_type=column_to_feature_type_mapping.get(column_schema['type'], default_feature_type)
    ) for column_schema in column_schemas
]

## Configure Feature Group

<div class="alert alert-info"> ðŸ’¡ <strong> Configurable Settings </strong>

1. <b>feature_group_name</b>: name of the feature group.
1. <b>feature_store_offline_s3_uri</b>: SageMaker FeatureStore writes the data in the OfflineStore of a FeatureGroup to a S3 location owned by you.
1. <b>enable_online_store</b>: controls if online store is enabled. Enabling the online store allows quick access to the latest value for a Record via the GetRecord API.
1. <b>iam_role</b>: IAM role for executing the processing job.
</div>

In [None]:
# flow name and an unique ID for this export (used later as the processing job name for the export)
flow_name = "contact-center-data"
flow_export_id = f"{strftime('%d-%H-%M-%S', gmtime())}-{str(uuid.uuid4())[:8]}"
flow_export_name = f"flow-{flow_export_id}"

# feature group name, with flow_name and an unique id. You can give it a customized name
feature_group_name = f"fg-{flow_name}-{str(uuid.uuid4())[:8]}"
print(f"Feature Group Name: {feature_group_name}")

# SageMaker FeatureStore writes the data in the OfflineStore of a FeatureGroup to a 
# S3 location owned by you.
feature_store_offline_s3_uri = 's3://' + bucket

# controls if online store is enabled. Enabling the online store allows quick access to 
# the latest value for a Record via the GetRecord API.
enable_online_store = True

### Initialize & Create Feature Group

In [None]:
feature_store_session = Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime
)

In [None]:
# Feature group is initialized and created below
feature_group = FeatureGroup(
    name=feature_group_name, sagemaker_session=feature_store_session, feature_definitions=feature_definitions)

feature_group.create(
    s3_uri=feature_store_offline_s3_uri,
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=role,
    enable_online_store=enable_online_store
)

In [None]:
def wait_for_feature_group_creation_complete(feature_group):
    """Helper function to wait for the completions of creating a feature group"""
    response = feature_group.describe()
    status = response.get("FeatureGroupStatus")
    while status == "Creating":
        print("Waiting for Feature Group Creation")
        time.sleep(5)
        response = feature_group.describe()
        status = response.get("FeatureGroupStatus")

    if status != "Created":
        print(f"Failed to create feature group, response: {response}")
        failureReason = response.get("FailureReason", "")
        raise SystemExit(
            f"Failed to create feature group {feature_group.name}, status: {status}, reason: {failureReason}"
        )
    print(f"FeatureGroup {feature_group.name} successfully created.")

wait_for_feature_group_creation_complete(feature_group=feature_group)

Now that the feature group is created, You will create a feature ingestion pipeline to run a processing job to process your 
        data at scale and ingest the transformed data into this feature group.

In [None]:
feature_group_name = feature_group.name
%store feature_group_name

## Feature Ingestion Pipeline Overview

The feature ingestion pipeline shows how to:

* Define a set of Pipeline parameters that can be used to parametrize a SageMaker Pipeline.
* Define a Processing step that uses DataWrangler processor to process the input data based on the DataWrangler flow file.
* Define and create a Pipeline definition in a DAG, with the defined parameters and steps.

Please see the `feature_ingestion_pipeline/create_pipeline.py` for detail.


In [None]:
flow_file_path = 'features_ingestion_pipeline/contact-center-data.flow'
fg_ingest_pipeline_name = f"demo-feature-ingestion-pipeline-{strftime('%d-%m', gmtime())}"

In [None]:
feature_ingestion_pipeline = create_pipeline(
    role,
    fg_ingest_pipeline_name,
    sagemaker_session=sagemaker_session,
    flow_file_path=flow_file_path,
    feature_group_name=feature_group_name
)

### Submit the pipeline definition to the SageMaker Pipeline service
Note: If an existing pipeline has the same name it will be overwritten.

In [None]:
feature_ingestion_pipeline.upsert(role_arn=role)


### View the entire pipeline definition
Viewing the pipeline definition will all the string variables interpolated may help debug pipeline bugs. It is commented out here due to length.

In [None]:
# json.loads(feature_ingestion_pipeline.describe()['PipelineDefinition'])

In [None]:
parameters = { 
    "InstanceType": "ml.m5.4xlarge", 
    "InputDataUrl": input_data_url
}
execution = feature_ingestion_pipeline.start(parameters=parameters)

In [None]:
execution.wait()
execution.describe()

![feature ingestion pipeline](img/feature-ingestion-pipeline.png)

## Model Build Pipeline Overview

The model build pipeline shows how to:

* Define a set of Pipeline parameters that can be used to parametrize a SageMaker Pipeline.
* Define a Processing step that extracts data from feature store to create the train, validation and test data sets.
* Define a Training step that trains a model on the preprocessed train data set.
* Define a Processing step that evaluates the trained model's performance on the test dataset.
* Define a Create Model step that creates a model from the model artifacts used in training.
* Define a Clarify check step that performs model explainability check.
* Define a Conditional step that measures a condition based on output from prior steps and conditionally executes other steps.
* Define a Register Model step that creates a model package from the estimator and model artifacts used to train the model.
* Define and create a Pipeline definition in a DAG, with the defined parameters and steps.



## A SageMaker Pipeline

The pipeline that you create follows a typical machine learning (ML) application pattern of preprocessing, training, evaluation, model creation, and model registration:


## Preparation


Let's start by specifying:

- The S3 bucket and prefix that you want to use for training and model data.  This should be within the same region as the Notebook Instance, training, and hosting.
- The IAM role arn used to give training and hosting access to your data. See the documentation for how to create these.  Note, if more than one role is required for notebook instances, training, and/or hosting, please replace the boto regexp with a the appropriate full IAM role arn string(s).

In [None]:
with open('data_meta.json') as file:
    dataset_dict = json.load(file)
dataset_dict

In [None]:
script_create_dataset = './build_pipeline/create_dataset.py'
script_evaluation = './build_pipeline/evaluation.py'
pipeline_build_name = f"demo-customer-churn-build-pipeline-{strftime('%d-%m', gmtime())}"
mpg_name = "ChurnModelPackage"

In [None]:
pipeline_build = get_pipeline(
    role,
    pipeline_build_name,
    sagemaker_session=sagemaker_session,
    base_job_prefix=base_job_prefix,
    bucket=bucket,
    prefix=prefix,
    label_name=dataset_dict["label_name"],
    features_names=dataset_dict["features_names"],
    model_package_group_name=mpg_name,
    customers_fg_name=feature_group_name,
    script_create_dataset=script_create_dataset,
    script_evaluation=script_evaluation,
)

In [None]:
pipeline_build.upsert(role_arn=role)

In [None]:
# json.loads(feature_ingestion_pipeline.describe()['PipelineDefinition'])

In [None]:
build_execution = pipeline_build.start()

In [None]:
build_execution.wait()
build_execution.describe()

![model build pipeline](img/model-build-pipeline.png)

### Lineage

Review the lineage of the artifacts generated by the pipeline.

In [None]:
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer


viz = LineageTableVisualizer(sagemaker.session.Session())
for execution_step in reversed(build_execution.list_steps()):
    print(execution_step)
    display(viz.show(pipeline_execution_step=execution_step))
    time.sleep(5)

## Batch Inference

Perform batch transform using the approved model and data stored in the offline feature store. You can choose to use a lambda step or a callback step to retrieve the latest approved model artifacts info. More details can be found in this [GitHub repo](https://github.com/aws-samples/sagemaker-pipelines-callback-step-for-batch-transform). In this example, we will use a lambda step to get the latest approved model version from the model regitry.

![batch pipeline](img/batch_pipeline.png)

Before using the latest model version registered in the model registry, we need to make sure the batch transform is using the approved model version. In this example, we will manually approved the model version.

In [None]:
list_model_packages_response = sagemaker_client.list_model_packages(
    ModelPackageGroupName=mpg_name
)
model_version_arn = list_model_packages_response["ModelPackageSummaryList"][0][
    "ModelPackageArn"
]
print(model_version_arn)

In [None]:
describe_response = sagemaker_client.describe_model_package(ModelPackageName=model_version_arn)
if describe_response['ModelApprovalStatus']!='Approved':
    model_package_update_input_dict = {
        "ModelPackageArn": model_version_arn,
        "ModelApprovalStatus": "Approved",
    }
    model_package_update_response = sagemaker_client.update_model_package(**model_package_update_input_dict)
    print(f"Update model package arn {model_version_arn} to approved!")
else:
    print("The latest model version is approved")

### Setting up the custom IAM Role

The Lambda function needs an IAM role that allows it to deploy a SageMaker Endpoint. The role ARN must be provided in the LambdaStep.

A helper function in `iam_helper.py` is available to create the Lambda function role. Please note that the role uses the Amazon managed policy - `SageMakerFullAccess`. This should be replaced with an IAM policy with least privileges as per AWS IAM best practices.

In [None]:
from iam_helper import create_lambda_role

lambda_role = create_lambda_role("lambda-deployment-role")

In [None]:
current_time = time.strftime("%m-%d-%H-%M-%S", time.localtime())
lambda_function_name = f"lambdaBatchTransformPipelineLambda_{current_time}"
pipeline_batch_name = f"demo-customer-churn-batch-pipeline"
lambda_script = "./batch_pipeline/lambda_step_code.py"
lambda_handler = "lambda_step_code.handler"
script_create_batch_dataset = "./batch_pipeline/create_batch_inference_dataset.py"

In [None]:
pipeline_batch = get_batch_pipeline(
    role,
    pipeline_batch_name,
    sagemaker_session=sagemaker_session,
    base_job_prefix=base_job_prefix,
    bucket=bucket,
    prefix=prefix,
    features_names=dataset_dict["features_names"],
    model_package_group_name=mpg_name,
    customers_fg_name=feature_group_name,
    script_create_batch_dataset=script_create_batch_dataset,
    lambda_role=lambda_role,
    lambda_function_name=lambda_function_name,
    lambda_script=lambda_script,
    lambda_handler=lambda_handler
)

In [None]:
pipeline_batch.upsert(role_arn=role)

In [None]:
batch_execution = pipeline_batch.start()

In [None]:
batch_execution.wait()
batch_execution.describe()

### Clean Up Resources

After running the demo, you should remove the resources which were created. You can also delete all the objects in the project's S3 directory by passing the keyword argument `delete_s3_objects=True`.

In [None]:
from demo_helper import delete_project_resources

In [None]:
"""
delete_project_resources(
    sagemaker_boto_client=sagemaker_client,
    pipeline_name=pipeline_build_name,
    mpg_name=mpg_name,
    prefix=prefix,
    fg_name=feature_group_name,
    delete_s3_objects=False,
    bucket_name=bucket)
"""

# GitHub Resource
This demo is available on GitHub: 