{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Implement ML pipeline Using the AWS Glue Workflow\n", "\n", "1. [Introduction](#Introduction)\n", "1. [Setup](#Setup)\n", "1. [Build a Machine Learning Workflow](#Build-a-Machine-Learning-Workflow)\n", "1. [Run the Workflow](#Run-the-Workflow)\n", "1. [Evaluate the deployed model](#Evaluate-the-deployed-model)\n", "1. [Clean Up](#Clean-Up)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "\n", "## Introduction\n", "\n", "This notebook describes how to use Glue Workflow with PySpark scripts to create a machine learning pipeline across data preparation, model training, model evaluation and model register. The defintion of workflow as beflow:\n", "\n", "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Setup" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### IAM Permission and Role\n", "\n", "* Required IAM roles on services.\n", "\n", "To execute the notebook and Glue Workflow, we will need to manage access control for services.\n", "\n", " * IAM role for SageMaker (Studio) Notebook - the execution role configuration\n", " * Open the Amazon [SageMaker console](https://console.aws.amazon.com/sagemaker/). \n", " * Get the SageMaker execution role from console (via opening SageMaker Notebook Instance detail or opening user profile detail under SageMaker Studio domain)\n", " * Open the SageMaker execution role from IAM, and attached below managed IAM policy for it:\n", " * arn:aws:iam::aws:policy/AWSGlueConsoleSageMakerNotebookFullAccess\n", " \n", " * IAM role for Glue job to execute data access from S3 and model training on SageMaker\n", " * With executing a script to create role `AWS-Glue-S3-SageMaker-Access` below\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Import the Required Modules" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "import sys\n", "import uuid\n", "import logging\n", "import boto3\n", "import time\n", "from datetime import datetime\n", "\n", "import sagemaker\n", "\n", "from sagemaker.s3 import S3Uploader, S3Downloader\n", "\n", "sys.path.insert( 0, os.path.abspath(\"./code\") )\n", "import setup_iam_roles\n", "\n", "session = sagemaker.Session()\n", "\n", "region = boto3.Session().region_name\n", "bucket = session.default_bucket()\n", "\n", "id = uuid.uuid4().hex\n", "\n", "# SageMaker Execution Role\n", "sagemaker_execution_role = sagemaker.get_execution_role()\n", "\n", "# Create a unique name for the AWS Glue job to be created. If you change the\n", "# default name, you may need to change the Step Functions execution role.\n", "glue_job_prefix = \"customer-churn-etl\"\n", "glue_job_name = f\"{glue_job_prefix}-{id}\"\n", "\n", "# Create a unique name for the AWS Lambda function to be created. If you change\n", "# the default name, you may need to change the Step Functions execution role.\n", "query_function_prefix = \"query-evaluation-result\"\n", "query_function_name = f\"{query_function_prefix}-{id}\"\n", "\n", "# S3 data folder prefix\n", "prefix = 'sagemaker/DEMO-xgboost-customer-churn'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create an IAM role for Glue Job\n", "* Providing access on the S3 bucket\n", "* Executing SageMaker training job and model deployment" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "glue_role_name = \"AWS-Glue-S3-SageMaker-Access\"\n", "glue_role_arn = setup_iam_roles.create_glue_role(glue_role_name, bucket)\n", "glue_role_arn" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Prepare the Dataset\n", "This notebook uses the XGBoost algorithm to automate the classification of unhappy customers for telecommunication service providers. The goal is to identify customers who may cancel their service soon so that you can entice them to stay. This is known as customer churn prediction.\n", "\n", "The dataset we use is publicly available and was mentioned in the book [Discovering Knowledge in Data](https://www.amazon.com/dp/0470908742/) by Daniel T. Larose. It is attributed by the author to the University of California Irvine Repository of Machine Learning Datasets." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "train_prefix = \"train\"\n", "val_prefix = \"validation\"\n", "test_prefix = \"test\"\n", "\n", "raw_data = f\"s3://{bucket}/{prefix}/input\"\n", "batch_transform_output = f\"s3://{bucket}/{prefix}/batch_transform\"\n", "processed_data = f\"s3://{bucket}/{prefix}/processed\"\n", "\n", "train_data = f\"{processed_data}/{train_prefix}/\"\n", "validation_data = f\"{processed_data}/{val_prefix}/\"\n", "test_data = f\"{processed_data}/{test_prefix}/\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Upload data to `S3 Bucket`" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "S3Uploader.upload(\n", " local_path=\"../data/churn_processed.csv\",\n", " desired_s3_uri=f\"{raw_data}\",\n", " sagemaker_session=session,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Build a Machine Learning Workflow" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We are going to use Glue Workflow as the orchestration engine, Glue Job for the data preprocessing and model training/deployment as the steps\n", "\n", "* [**Glue Workflow**](https://docs.aws.amazon.com/glue/latest/dg/workflows_overview.html) - Orchestration engine for ML workflow.\n", "* [**Glue Job**](https://docs.aws.amazon.com/glue/latest/dg/author-job.html) - Business logic for ETL or python shell.\n", "* [**Glue Trigger**](https://docs.aws.amazon.com/glue/latest/dg/trigger-job.html) - Triggers Glue Job as steps.\n", "\n", "Once the Glue Workflow is created, you may view the the detail via: AWS Glue Console / Workflow / (To select the created workflow). It should be similar like:\n", "\n", "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create AWS Glue Workflow\n", "\n", "#### Create Glue Workflow Object\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "glue_client = boto3.client(\"glue\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "glue_workflow_name = f\"CustomerChurnMLWorkflow-{id}\"\n", "response = glue_client.create_workflow(\n", " Name=glue_workflow_name,\n", " Description='AWS Glue workflow to process data and create training jobs'\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Create Glue Jobs " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Data Processing Job\n", "data_processing_script_path = S3Uploader.upload(\n", " local_path=\"./code/glue_preprocessing.py\",\n", " desired_s3_uri=f\"s3://{bucket}/{prefix}/glue/scripts\",\n", " sagemaker_session=session,\n", ")\n", "data_processing_job_name = f\"DataProcessingJob-{id}\"\n", "response = glue_client.create_job(\n", " Name=data_processing_job_name,\n", " Description='Preparing data for SageMaker training',\n", " Role=glue_role_arn,\n", " ExecutionProperty={\n", " 'MaxConcurrentRuns': 2\n", " },\n", " Command={\n", " 'Name': 'glueetl',\n", " 'ScriptLocation': data_processing_script_path,\n", " },\n", " DefaultArguments={\n", " \"--job-bookmark-option\": \"job-bookmark-enable\",\n", " \"--enable-metrics\": \"\",\n", " \"--additional-python-modules\": \"pyarrow==2,awswrangler==2.9.0,fsspec==0.7.4\",\n", " \"--enable-continuous-cloudwatch-log\": \"true\"\n", " },\n", " MaxRetries=0,\n", " Timeout=60,\n", " MaxCapacity=10.0,\n", " GlueVersion='2.0'\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Model Training & Deployment Job\n", "model_training_deployment_script_path = S3Uploader.upload(\n", " local_path=\"./code/model_training_deployment.py\",\n", " desired_s3_uri=f\"s3://{bucket}/{prefix}/glue/scripts\",\n", " sagemaker_session=session\n", ")\n", "\n", "model_training_deployment_job_name = f\"ModelTrainingDeploymentJob-{id}\"\n", "response = glue_client.create_job(\n", " Name=model_training_deployment_job_name,\n", " Description='Model training and deployment',\n", " Role=glue_role_arn,\n", " ExecutionProperty={\n", " 'MaxConcurrentRuns': 2\n", " },\n", " Command={\n", " 'Name': 'pythonshell',\n", " 'ScriptLocation': model_training_deployment_script_path,\n", " 'PythonVersion': '3'\n", " },\n", " DefaultArguments={\n", " \"--job-bookmark-option\": \"job-bookmark-enable\",\n", " \"--enable-metrics\": \"\",\n", " \"--additional-python-modules\": \"scikit-learn==0.23.1,pandas==1.3.5,numpy=1.21.6\",\n", " \"--enable-continuous-cloudwatch-log\": \"true\"\n", " },\n", " MaxRetries=0,\n", " Timeout=60,\n", " MaxCapacity=1,\n", " GlueVersion='1.0'\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model_output_path = f\"s3://{bucket}/{prefix}/output\"\n", "image_uri = sagemaker.image_uris.retrieve(\n", " framework=\"xgboost\",\n", " region=region,\n", " version=\"1.0-1\",\n", " py_version=\"py3\",\n", ")\n", "\n", "processed_data, sagemaker_execution_role, image_uri, model_output_path" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Create Glue Triggers" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data_processing_trigger_name = f'TriggerDataProcessingJob-{id}'\n", "response = glue_client.create_trigger(\n", " Name=data_processing_trigger_name,\n", " Description='Triggering Data Processing Job',\n", " Type='ON_DEMAND',\n", " WorkflowName=glue_workflow_name,\n", " Actions=[\n", " {\n", " 'JobName': data_processing_job_name,\n", " 'Arguments': {\n", " '--INPUT_DIR': raw_data,\n", " '--PROCESSED_DIR': processed_data\n", " },\n", " },\n", " ]\n", ")\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model_train_deploy_trigger_name = f'TriggerModelTrainingDeploymentJob-{id}'\n", "response = glue_client.create_trigger(\n", " Name=model_train_deploy_trigger_name,\n", " Description='Triggering Model Training Deployment Job',\n", " WorkflowName=glue_workflow_name,\n", " Type='CONDITIONAL',\n", " StartOnCreation=True,\n", " Predicate={\n", " 'Conditions': [\n", " {\n", " 'LogicalOperator': 'EQUALS',\n", " 'JobName': data_processing_job_name,\n", " 'State': 'SUCCEEDED'\n", " },\n", " ]\n", " },\n", " Actions=[\n", " {\n", " 'JobName': model_training_deployment_job_name,\n", " 'Arguments': {\n", " '--train_input_path': processed_data,\n", " '--model_output_path': model_output_path,\n", " '--algorithm_image': image_uri,\n", " '--role_arn': sagemaker_execution_role\n", " }\n", " }\n", " ]\n", ")\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Run the Workflow" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create your workflow using the workflow definition above, and render the graph with [render_graph](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.render_graph):" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# endpoint name\n", "def generate_endpoint_name():\n", " current_time = datetime.now()\n", " timestamp_suffix = str(current_time.month) + \"-\" + str(current_time.day) + \"-\" + str(current_time.hour) + \"-\" + str(current_time.minute)\n", "\n", " return f\"gw-customer-churn-endpoint-{timestamp_suffix}\"\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# quick test\n", "endpoint_name = generate_endpoint_name()\n", "\n", "response = glue_client.start_workflow_run(\n", " Name=glue_workflow_name,\n", " RunProperties={\n", " 'endpoint_name': endpoint_name,\n", " 'evaluation_threshold': \"0.90\" # evaluation_threshold\n", " }\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def check_workflow_state(workflow_name, run_id):\n", " resp = glue_client.get_workflow_run(\n", " Name=workflow_name,\n", " RunId=run_id,\n", " IncludeGraph=True\n", " )\n", " return resp['Run']['Status']\n", "\n", "print('Checking workflow state:')\n", "while True:\n", " workflow_status = check_workflow_state(glue_workflow_name, response['RunId'])\n", " if workflow_status in ['COMPLETED', 'STOPPED', 'ERROR']:\n", " print(workflow_status)\n", " break\n", " else:\n", " print('.')\n", " time.sleep(30)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Workflow Result\n", "\n", "Once the workflow execution finishes, if the trained model meets threshold, it will be deployed as SageMaker realtime endpoint. For more detail, please refer to Glue Jobs CloudWatch logs." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Clean Up" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "When you are done, make sure to clean up your AWS account by deleting resources you won't be reusing. Uncomment the code below and run the cell to delete the Glue job, Lambda function, and Step Function." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "# delete the jobs\n", "for job_name in [data_processing_job_name, model_training_deployment_job_name]:\n", " glue_client.delete_job(JobName=job_name)\n", "\n", "# delete the triggers \n", "for trigger_name in [data_processing_trigger_name, model_train_deploy_trigger_name]:\n", " glue_client.delete_trigger(Name=trigger_name)\n", " \n", "# deletion\n", "response = glue_client.delete_workflow(\n", " Name=glue_workflow_name\n", ")\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sagemaker_client = boto3.Session().client('sagemaker')\n", "\n", "sagemaker_client.delete_endpoint(\n", " EndpointName=endpoint_name\n", ")\n" ] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:ap-southeast-2:452832661640:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 4 }