# Implement ML pipeline Using the AWS Glue Workflow

1. [Introduction](#Introduction)
1. [Setup](#Setup)
1. [Build a Machine Learning Workflow](#Build-a-Machine-Learning-Workflow)
1. [Run the Workflow](#Run-the-Workflow)
1. [Evaluate the deployed model](#Evaluate-the-deployed-model)
1. [Clean Up](#Clean-Up)

---

## Introduction

This notebook describes how to use Glue Workflow with PySpark scripts to create a machine learning pipeline across data preparation, model training, model evaluation and model register. The defintion of workflow as beflow:

<div align="center"><img width=600 src="images/glue_workflow_pipeline.png"></div>

## Setup

### IAM Permission and Role

* Required IAM roles on services.

To execute the notebook and Glue Workflow, we will need to manage access control for services.

  * IAM role for SageMaker (Studio) Notebook - the execution role configuration
    * Open the Amazon [SageMaker console](https://console.aws.amazon.com/sagemaker/). 
    * Get the SageMaker execution role from console (via opening SageMaker Notebook Instance detail or opening user profile detail under SageMaker Studio domain)
    * Open the SageMaker execution role from IAM, and attached below managed IAM policy for it:
        * arn:aws:iam::aws:policy/AWSGlueConsoleSageMakerNotebookFullAccess
                            
  * IAM role for Glue job to execute data access from S3 and model training on SageMaker
    * With executing a script to create role `AWS-Glue-S3-SageMaker-Access` below



### Import the Required Modules

In [None]:
import os
import sys
import uuid
import logging
import boto3
import time
from datetime import datetime

import sagemaker

from sagemaker.s3 import S3Uploader, S3Downloader

sys.path.insert( 0, os.path.abspath("./code") )
import setup_iam_roles

session = sagemaker.Session()

region = boto3.Session().region_name
bucket = session.default_bucket()

id = uuid.uuid4().hex

# SageMaker Execution Role
sagemaker_execution_role = sagemaker.get_execution_role()

# Create a unique name for the AWS Glue job to be created. If you change the
# default name, you may need to change the Step Functions execution role.
glue_job_prefix = "customer-churn-etl"
glue_job_name = f"{glue_job_prefix}-{id}"

# Create a unique name for the AWS Lambda function to be created. If you change
# the default name, you may need to change the Step Functions execution role.
query_function_prefix = "query-evaluation-result"
query_function_name = f"{query_function_prefix}-{id}"

# S3 data folder prefix
prefix = 'sagemaker/DEMO-xgboost-customer-churn'

Create an IAM role for Glue Job
* Providing access on the S3 bucket
* Executing SageMaker training job and model deployment

In [None]:
glue_role_name = "AWS-Glue-S3-SageMaker-Access"
glue_role_arn = setup_iam_roles.create_glue_role(glue_role_name, bucket)
glue_role_arn

### Prepare the Dataset
This notebook uses the XGBoost algorithm to automate the classification of unhappy customers for telecommunication service providers. The goal is to identify customers who may cancel their service soon so that you can entice them to stay. This is known as customer churn prediction.

The dataset we use is publicly available and was mentioned in the book [Discovering Knowledge in Data](https://www.amazon.com/dp/0470908742/) by Daniel T. Larose. It is attributed by the author to the University of California Irvine Repository of Machine Learning Datasets.

In [None]:
train_prefix = "train"
val_prefix = "validation"
test_prefix = "test"

raw_data = f"s3://{bucket}/{prefix}/input"
batch_transform_output = f"s3://{bucket}/{prefix}/batch_transform"
processed_data = f"s3://{bucket}/{prefix}/processed"

train_data = f"{processed_data}/{train_prefix}/"
validation_data = f"{processed_data}/{val_prefix}/"
test_data = f"{processed_data}/{test_prefix}/"

Upload data to `S3 Bucket`

In [None]:
S3Uploader.upload(
    local_path="../data/churn_processed.csv",
    desired_s3_uri=f"{raw_data}",
    sagemaker_session=session,
)

## Build a Machine Learning Workflow

We are going to use Glue Workflow as the orchestration engine, Glue Job for the data preprocessing and model training/deployment as the steps

* [**Glue Workflow**](https://docs.aws.amazon.com/glue/latest/dg/workflows_overview.html) - Orchestration engine for ML workflow.
* [**Glue Job**](https://docs.aws.amazon.com/glue/latest/dg/author-job.html) - Business logic for ETL or python shell.
* [**Glue Trigger**](https://docs.aws.amazon.com/glue/latest/dg/trigger-job.html) - Triggers Glue Job as steps.

Once the Glue Workflow is created, you may view the the detail via: AWS Glue Console / Workflow / (To select the created workflow). It should be similar like:

<div align="center"><img width=500 src="images/glue_workflow.png"></div>

### Create AWS Glue Workflow

#### Create Glue Workflow Object


In [None]:
glue_client = boto3.client("glue")

In [None]:
glue_workflow_name = f"CustomerChurnMLWorkflow-{id}"
response = glue_client.create_workflow(
    Name=glue_workflow_name,
    Description='AWS Glue workflow to process data and create training jobs'
)

#### Create Glue Jobs 

In [None]:
# Data Processing Job
data_processing_script_path = S3Uploader.upload(
    local_path="./code/glue_preprocessing.py",
    desired_s3_uri=f"s3://{bucket}/{prefix}/glue/scripts",
    sagemaker_session=session,
)
data_processing_job_name = f"DataProcessingJob-{id}"
response = glue_client.create_job(
    Name=data_processing_job_name,
    Description='Preparing data for SageMaker training',
    Role=glue_role_arn,
    ExecutionProperty={
        'MaxConcurrentRuns': 2
    },
    Command={
        'Name': 'glueetl',
        'ScriptLocation': data_processing_script_path,
    },
    DefaultArguments={
        "--job-bookmark-option": "job-bookmark-enable",
        "--enable-metrics": "",
        "--additional-python-modules": "pyarrow==2,awswrangler==2.9.0,fsspec==0.7.4",
        "--enable-continuous-cloudwatch-log": "true"
    },
    MaxRetries=0,
    Timeout=60,
    MaxCapacity=10.0,
    GlueVersion='2.0'
)

In [None]:
# Model Training & Deployment Job
model_training_deployment_script_path = S3Uploader.upload(
    local_path="./code/model_training_deployment.py",
    desired_s3_uri=f"s3://{bucket}/{prefix}/glue/scripts",
    sagemaker_session=session
)

model_training_deployment_job_name = f"ModelTrainingDeploymentJob-{id}"
response = glue_client.create_job(
    Name=model_training_deployment_job_name,
    Description='Model training and deployment',
    Role=glue_role_arn,
    ExecutionProperty={
        'MaxConcurrentRuns': 2
    },
    Command={
        'Name': 'pythonshell',
        'ScriptLocation': model_training_deployment_script_path,
        'PythonVersion': '3'
    },
    DefaultArguments={
        "--job-bookmark-option": "job-bookmark-enable",
        "--enable-metrics": "",
        "--additional-python-modules": "scikit-learn==0.23.1,pandas==1.3.5,numpy=1.21.6",
        "--enable-continuous-cloudwatch-log": "true"
    },
    MaxRetries=0,
    Timeout=60,
    MaxCapacity=1,
    GlueVersion='1.0'
)

In [None]:
model_output_path = f"s3://{bucket}/{prefix}/output"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
)

processed_data, sagemaker_execution_role, image_uri, model_output_path

#### Create Glue Triggers

In [None]:
data_processing_trigger_name = f'TriggerDataProcessingJob-{id}'
response = glue_client.create_trigger(
    Name=data_processing_trigger_name,
    Description='Triggering Data Processing Job',
    Type='ON_DEMAND',
    WorkflowName=glue_workflow_name,
    Actions=[
        {
            'JobName': data_processing_job_name,
            'Arguments': {
                '--INPUT_DIR': raw_data,
                '--PROCESSED_DIR': processed_data
            },
        },
    ]
)


In [None]:
model_train_deploy_trigger_name = f'TriggerModelTrainingDeploymentJob-{id}'
response = glue_client.create_trigger(
    Name=model_train_deploy_trigger_name,
    Description='Triggering Model Training Deployment Job',
    WorkflowName=glue_workflow_name,
    Type='CONDITIONAL',
    StartOnCreation=True,
    Predicate={
        'Conditions': [
            {
                'LogicalOperator': 'EQUALS',
                'JobName': data_processing_job_name,
                'State': 'SUCCEEDED'
            },
        ]
    },
    Actions=[
        {
            'JobName': model_training_deployment_job_name,
            'Arguments': {
                '--train_input_path': processed_data,
                '--model_output_path': model_output_path,
                '--algorithm_image': image_uri,
                '--role_arn': sagemaker_execution_role
            }
        }
    ]
)


## Run the Workflow

Create your workflow using the workflow definition above, and render the graph with [render_graph](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.render_graph):

In [None]:
# endpoint name
def generate_endpoint_name():
    current_time = datetime.now()
    timestamp_suffix = str(current_time.month) + "-" + str(current_time.day) + "-" + str(current_time.hour) + "-" + str(current_time.minute)

    return f"gw-customer-churn-endpoint-{timestamp_suffix}"


In [None]:
# quick test
endpoint_name = generate_endpoint_name()

response = glue_client.start_workflow_run(
    Name=glue_workflow_name,
    RunProperties={
        'endpoint_name': endpoint_name,
        'evaluation_threshold': "0.90" # evaluation_threshold
    }
)

In [None]:
def check_workflow_state(workflow_name, run_id):
    resp = glue_client.get_workflow_run(
        Name=workflow_name,
        RunId=run_id,
        IncludeGraph=True
    )
    return resp['Run']['Status']

print('Checking workflow state:')
while True:
    workflow_status = check_workflow_state(glue_workflow_name, response['RunId'])
    if workflow_status in ['COMPLETED', 'STOPPED', 'ERROR']:
        print(workflow_status)
        break
    else:
        print('.')
    time.sleep(30)

## Workflow Result

Once the workflow execution finishes, if the trained model meets threshold, it will be deployed as SageMaker realtime endpoint. For more detail, please refer to Glue Jobs CloudWatch logs.

## Clean Up

When you are done, make sure to clean up your AWS account by deleting resources you won't be reusing. Uncomment the code below and run the cell to delete the Glue job, Lambda function, and Step Function.

In [None]:
# delete the jobs
for job_name in [data_processing_job_name, model_training_deployment_job_name]:
    glue_client.delete_job(JobName=job_name)

# delete the triggers    
for trigger_name in [data_processing_trigger_name, model_train_deploy_trigger_name]:
    glue_client.delete_trigger(Name=trigger_name)
    
# deletion
response = glue_client.delete_workflow(
    Name=glue_workflow_name
)


In [None]:
sagemaker_client = boto3.Session().client('sagemaker')

sagemaker_client.delete_endpoint(
    EndpointName=endpoint_name
)
