{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Implement ML pipeline Using the AWS Step Functions Data Science SDK\n", "\n", "1. [Introduction](#Introduction)\n", "1. [Setup](#setup)\n", "1. [Create Resources](#Create-Resources)\n", "1. [Build a Machine Learning Workflow](#Build-a-Machine-Learning-Workflow)\n", "1. [Run the Workflow](#Run-the-Workflow)\n", "1. [Clean Up](#Clean-Up)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "## 1. Introduction\n", "\n", "This notebook describes how to use the AWS Step Functions Data Science SDK to create a machine learning pipeline across data preparation, model training, model evaluation and model register. The defintion of workflow as beflow:\n", "\n", "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "## 2. Setup\n", "\n", "First, we'll need to install and load all the required modules. Then we'll create fine-grained IAM roles for the Lambda, Glue, and Step Functions resources that we will create. The IAM roles grant the services permissions within your AWS environment." ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "scrolled": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Requirement already satisfied: stepfunctions in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (2.3.0)\n", "Requirement already satisfied: sagemaker>=2.1.0 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from stepfunctions) (2.86.2)\n", "Requirement already satisfied: boto3>=1.14.38 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from stepfunctions) (1.21.42)\n", "Requirement already satisfied: pyyaml in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from stepfunctions) (5.4.1)\n", "Requirement already satisfied: jmespath<2.0.0,>=0.7.1 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from boto3>=1.14.38->stepfunctions) (0.10.0)\n", "Requirement already satisfied: botocore<1.25.0,>=1.24.42 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from boto3>=1.14.38->stepfunctions) (1.24.42)\n", "Requirement already satisfied: s3transfer<0.6.0,>=0.5.0 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from boto3>=1.14.38->stepfunctions) (0.5.0)\n", "Requirement already satisfied: protobuf>=3.1 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from sagemaker>=2.1.0->stepfunctions) (3.15.2)\n", "Requirement already satisfied: packaging>=20.0 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from sagemaker>=2.1.0->stepfunctions) (21.3)\n", "Requirement already satisfied: numpy>=1.9.0 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from sagemaker>=2.1.0->stepfunctions) (1.19.5)\n", "Requirement already satisfied: attrs==20.3.0 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from sagemaker>=2.1.0->stepfunctions) (20.3.0)\n", "Requirement already satisfied: pandas in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from sagemaker>=2.1.0->stepfunctions) (1.1.5)\n", "Requirement already satisfied: pathos in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from sagemaker>=2.1.0->stepfunctions) (0.2.8)\n", "Requirement already satisfied: importlib-metadata>=1.4.0 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from sagemaker>=2.1.0->stepfunctions) (4.8.3)\n", "Requirement already satisfied: google-pasta in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from sagemaker>=2.1.0->stepfunctions) (0.2.0)\n", "Requirement already satisfied: smdebug-rulesconfig==1.0.1 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from sagemaker>=2.1.0->stepfunctions) (1.0.1)\n", "Requirement already satisfied: protobuf3-to-dict>=0.1.5 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from sagemaker>=2.1.0->stepfunctions) (0.1.5)\n", "Requirement already satisfied: python-dateutil<3.0.0,>=2.1 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from botocore<1.25.0,>=1.24.42->boto3>=1.14.38->stepfunctions) (2.8.1)\n", "Requirement already satisfied: urllib3<1.27,>=1.25.4 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from botocore<1.25.0,>=1.24.42->boto3>=1.14.38->stepfunctions) (1.26.7)\n", "Requirement already satisfied: typing-extensions>=3.6.4 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from importlib-metadata>=1.4.0->sagemaker>=2.1.0->stepfunctions) (3.7.4.3)\n", "Requirement already satisfied: zipp>=0.5 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from importlib-metadata>=1.4.0->sagemaker>=2.1.0->stepfunctions) (3.4.0)\n", "Requirement already satisfied: pyparsing!=3.0.5,>=2.0.2 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from packaging>=20.0->sagemaker>=2.1.0->stepfunctions) (2.4.7)\n", "Requirement already satisfied: six>=1.9 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from protobuf>=3.1->sagemaker>=2.1.0->stepfunctions) (1.15.0)\n", "Requirement already satisfied: pytz>=2017.2 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from pandas->sagemaker>=2.1.0->stepfunctions) (2021.1)\n", "Requirement already satisfied: pox>=0.3.0 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from pathos->sagemaker>=2.1.0->stepfunctions) (0.3.0)\n", "Requirement already satisfied: ppft>=1.6.6.4 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from pathos->sagemaker>=2.1.0->stepfunctions) (1.6.6.4)\n", "Requirement already satisfied: dill>=0.3.4 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from pathos->sagemaker>=2.1.0->stepfunctions) (0.3.4)\n", "Requirement already satisfied: multiprocess>=0.70.12 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from pathos->sagemaker>=2.1.0->stepfunctions) (0.70.12.2)\n" ] } ], "source": [ "import sys\n", "\n", "!{sys.executable} -m pip install --upgrade stepfunctions" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Import the Required Modules" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "import uuid\n", "import logging\n", "import stepfunctions\n", "import boto3\n", "import sagemaker\n", "from sagemaker import get_execution_role\n", "\n", "from sagemaker.amazon.amazon_estimator import image_uris\n", "from sagemaker.inputs import TrainingInput\n", "\n", "from sagemaker.processing import ProcessingInput, ProcessingOutput\n", "from sagemaker.sklearn.processing import SKLearnProcessor\n", "\n", "from sagemaker.s3 import S3Uploader\n", "from stepfunctions import steps\n", "from stepfunctions.steps import TrainingStep, TransformStep\n", "from stepfunctions.inputs import ExecutionInput\n", "from stepfunctions.workflow import Workflow\n", "\n", "session = sagemaker.Session()\n", "stepfunctions.set_stream_logger(level=logging.INFO)\n", "\n", "region = boto3.Session().region_name\n", "bucket = session.default_bucket()\n", "id = uuid.uuid4().hex\n", "\n", "# SageMaker Execution Role\n", "sagemaker_execution_role = (\n", " sagemaker.get_execution_role()\n", ")\n", "\n", "# Create a unique name for the AWS Glue job to be created. If you change the\n", "# default name, you may need to change the Step Functions execution role.\n", "glue_job_prefix = \"glue-customer-churn-etl\"\n", "glue_job_name = f\"{glue_job_prefix}-{id}\"\n", "\n", "# Create a unique name for the AWS Lambda function to be created. If you change\n", "# the default name, you may need to change the Step Functions execution role.\n", "query_function_prefix = \"query-evaluation-result\"\n", "query_function_name = f\"{query_function_prefix}-{id}\"\n", "\n", "prefix = 'sagemaker/DEMO-xgboost-customer-churn-connect'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, we'll create fine-grained IAM roles for the Lambda, Glue, and Step Functions resources. The IAM roles grant the services permissions within your AWS environment.\n", "\n", "### Add permissions to your notebook role in IAM\n", "\n", "The IAM role assumed by your notebook requires permission to create and run workflows in AWS Step Functions. If this notebook is running on a SageMaker notebook instance, do the following to provide IAM permissions to the notebook:\n", "\n", "1. Open the Amazon [SageMaker console](https://console.aws.amazon.com/sagemaker/). \n", "2. Select **Notebook instances** and choose the name of your notebook instance.\n", "3. Under **Permissions and encryption** select the role ARN to view the role on the IAM console.\n", "4. Copy and save the IAM role ARN for later use. \n", "5. Choose **Attach policies** and search for `AWSStepFunctionsFullAccess`.\n", "6. Select the check box next to `AWSStepFunctionsFullAccess` and choose **Attach policy**.\n", "\n", "We also need to provide permissions that allow the notebook instance the ability to create an AWS Lambda function and AWS Glue job. We will edit the managed policy attached to our role directly to encorporate these specific permissions:\n", "\n", "1. Under **Permisions policies** expand the AmazonSageMaker-ExecutionPolicy-******** policy and choose **Edit policy**.\n", "2. Select **Add additional permissions**. Choose **IAM** for Service and **PassRole** for Actions.\n", "3. Under Resources, choose **Specific**. Select **Add ARN** and enter `query_training_status-role` for **Role name with path*** and choose **Add**. You will create this role later on in this notebook.\n", "4. Select **Add additional permissions** a second time. Choose **Lambda** for Service, **Write** for Access level, and **All resources** for Resources.\n", "5. Select **Add additional permissions** a final time. Choose **Glue** for Service, **Write** for Access level, and **All resources** for Resources.\n", "6. Choose **Review policy** and then **Save changes**." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Configure Execution Roles\n", "\n", "Create IAM role for StepFunctions." ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages/boto3/compat.py:88: PythonDeprecationWarning: Boto3 will no longer support Python 3.6 starting May 30, 2022. To continue receiving service updates, bug fixes, and security updates please upgrade to Python 3.7 or later. More information can be found here: https://aws.amazon.com/blogs/developer/python-support-policy-updates-for-aws-sdks-and-tools/\n", " warnings.warn(warning, PythonDeprecationWarning)\n" ] } ], "source": [ "from setup_iam_roles import create_sfn_role\n", "\n", "sfn_role_name = \"AmazonSageMaker-StepFunctionsWorkflowExecutionRole\"\n", "workflow_execution_role = create_sfn_role(sfn_role_name,\n", " sagemaker_execution_role,\n", " glue_job_prefix)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create an IAM role for Glue Job" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "from setup_iam_roles import create_glue_role\n", "\n", "glue_role_name = \"AWS-Glue-S3-Bucket-Access\"\n", "glue_role_arn = create_glue_role(glue_role_name, bucket)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create an IAM role for querying evaluation results and model regisry." ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "from setup_iam_roles import create_lambda_role\n", "import time\n", "\n", "query_lambda_role_name = \"query-modelregistry-lambda-role\"\n", "lambda_role = create_lambda_role(query_lambda_role_name, bucket)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Prepare the Dataset\n", "This notebook uses the XGBoost algorithm to automate the classification of unhappy customers for telecommunication service providers. The goal is to identify customers who may cancel their service soon so that you can entice them to stay. This is known as customer churn prediction.\n", "\n", "The dataset we use is publicly available and was mentioned in the book [Discovering Knowledge in Data](https://www.amazon.com/dp/0470908742/) by Daniel T. Larose. It is attributed by the author to the University of California Irvine Repository of Machine Learning Datasets." ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "train_prefix = \"train\"\n", "val_prefix = \"validation\"\n", "test_prefix = \"test\"\n", "\n", "raw_data = f\"s3://{bucket}/{prefix}/input\"\n", "processed_data = f\"s3://{bucket}/{prefix}/processed\"\n", "\n", "train_data = f\"{processed_data}/{train_prefix}/\"\n", "validation_data = f\"{processed_data}/{val_prefix}/\"\n", "test_data = f\"{processed_data}/{test_prefix}/\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Upload data to `S3 Bucket`" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "upload: ../data/churn_processed.csv to s3://sagemaker-us-east-1-822507008821/sagemaker/DEMO-xgboost-customer-churn-connect/input/churn_processed.csv\n" ] } ], "source": [ "!aws s3 cp ../data/churn_processed.csv $raw_data/churn_processed.csv" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "## 3. Create Resources\n", "In the following steps we'll create the Glue job and Lambda function that are called from the Step Functions workflow." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create the AWS Glue Job for Data Preprocessing" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "glue_script_location = S3Uploader.upload(\n", " local_path=\"code/glue_preprocessing.py\",\n", " desired_s3_uri=f\"s3://{bucket}/{prefix}/pipeline/glue_preprossing.py\",\n", " sagemaker_session=session,\n", ")\n", "\n", "glue_client = boto3.client(\"glue\")\n", "\n", "response = glue_client.create_job(\n", " Name=glue_job_name,\n", " Description='Prepare data for SageMaker training',\n", " Role=glue_role_arn,\n", " ExecutionProperty={\n", " 'MaxConcurrentRuns': 2\n", " },\n", " Command={\n", " 'Name': 'glueetl',\n", " 'ScriptLocation': glue_script_location,\n", " },\n", " DefaultArguments={\n", " \"--job-bookmark-option\": \"job-bookmark-enable\",\n", " \"--enable-metrics\": \"\",\n", " \"--additional-python-modules\": \"pyarrow==2,awswrangler==2.9.0,fsspec==0.7.4\"\n", " },\n", " MaxRetries=0,\n", " Timeout=60,\n", " MaxCapacity=10.0,\n", " GlueVersion='2.0'\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create the AWS Lambda Function for Querying Evalution Result" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "'s3://sagemaker-us-east-1-822507008821/sagemaker/DEMO-xgboost-customer-churn-connect/lambdaFunctions/query_evaluation_result.zip'" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import zipfile\n", "\n", "zip_name = \"query_evaluation_result.zip\"\n", "lambda_source_code = \"./code/query_evaluation_result.py\"\n", "\n", "zf = zipfile.ZipFile(zip_name, mode=\"w\")\n", "zf.write(lambda_source_code, arcname=lambda_source_code.split(\"/\")[-1])\n", "zf.close()\n", "\n", "S3Uploader.upload(\n", " local_path=zip_name,\n", " desired_s3_uri=f\"s3://{bucket}/{prefix}/lambdaFunctions\",\n", " sagemaker_session=session,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "***It takes time to take effect after creating Lambda Role, please run `lambda_client.create_function`again if you come across the following error***\n", "```\n", "InvalidParameterValueException: An error occurred (InvalidParameterValueException) when calling the CreateFunction operation: The role defined for the function cannot be assumed by Lambda.\n", "```" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "lambda_client = boto3.client(\"lambda\")\n", "\n", "response = lambda_client.create_function(\n", " FunctionName=query_function_name,\n", " Runtime=\"python3.7\",\n", " Role=lambda_role,\n", " Handler=\"query_evaluation_result.lambda_handler\",\n", " Code={\"S3Bucket\": bucket, \"S3Key\": f\"{prefix}/lambdaFunctions/{zip_name}\"},\n", " Description=\"Queries a Processing Job for evalution and return the results.\",\n", " Timeout=15,\n", " MemorySize=128,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create the AWS Lambda Function for Model Registry" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "'s3://sagemaker-us-east-1-822507008821/sagemaker/DEMO-xgboost-customer-churn-connect/lambdaFunctions/register_model.zip'" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "zip_name = \"register_model.zip\"\n", "lambda_source_code = \"./code/register_model.py\"\n", "\n", "zf = zipfile.ZipFile(zip_name, mode=\"w\")\n", "zf.write(lambda_source_code, arcname=lambda_source_code.split(\"/\")[-1])\n", "zf.close()\n", "\n", "S3Uploader.upload(\n", " local_path=zip_name,\n", " desired_s3_uri=f\"s3://{bucket}/{prefix}/lambdaFunctions\",\n", " sagemaker_session=session,\n", ")" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "model_register_function_name = \"model-register-{}\".format(id)\n", "response = lambda_client.create_function(\n", " FunctionName=model_register_function_name,\n", " Runtime=\"python3.7\",\n", " Role=lambda_role,\n", " Handler=\"register_model.lambda_handler\",\n", " Code={\"S3Bucket\": bucket, \"S3Key\": f\"{prefix}/lambdaFunctions/{zip_name}\"},\n", " Description=\"Register model.\",\n", " Timeout=15,\n", " MemorySize=128,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Configure the AWS SageMaker Estimator" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [], "source": [ "from sagemaker.workflow.parameters import (\n", " ParameterFloat,\n", " ParameterInteger,\n", " ParameterString,\n", ")\n", "\n", "image_uri = sagemaker.image_uris.retrieve(\n", " framework=\"xgboost\",\n", " region=region,\n", " version=\"1.0-1\",\n", " py_version=\"py3\",\n", ")\n", "\n", "train_instance_count = 1\n", "train_instance_type = \"ml.m5.xlarge\"\n", "\n", "model_output = f\"s3://{bucket}/{prefix}/model\"\n", "\n", "xgb_train = sagemaker.estimator.Estimator(\n", " image_uri=image_uri,\n", " instance_type=train_instance_type,\n", " instance_count=train_instance_count,\n", " output_path=model_output,\n", " base_job_name=f\"{prefix}-train\",\n", " sagemaker_session=session,\n", " role=sagemaker_execution_role,\n", ")\n", "\n", "# Set some hyper parameters\n", "# https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost_hyperparameters.html\n", "xgb_train.set_hyperparameters(\n", " max_depth=5,\n", " eta=0.2,\n", " gamma=4,\n", " min_child_weight=6,\n", " subsample=0.8,\n", " silent=0,\n", " objective=\"binary:logistic\",\n", " num_round=100,\n", " eval_metric='auc'\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "## 4. Build a Machine Learning Workflow" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can use a state machine workflow to create a model retraining pipeline. The AWS Data Science Workflows SDK provides several AWS SageMaker workflow steps that you can use to construct an ML pipeline. In this tutorial you will create the following steps:\n", "\n", "* [**ETLStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/compute.html#stepfunctions.steps.compute.GlueStartJobRunStep) - Starts an AWS Glue job to extract the latest data from our source database and prepare our data.\n", "* [**TrainingStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.TrainingStep) - Creates the training step and passes the defined estimator.\n", "* [**ModelStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.ModelStep) - Creates a model in SageMaker using the artifacts created during the TrainingStep.\n", "* [**LambdaStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/compute.html#stepfunctions.steps.compute.LambdaStep) - Creates the task state step within our workflow that calls a Lambda function.\n", "* [**ChoiceStateStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/states.html#stepfunctions.steps.states.Choice) - Creates the choice state step within our workflow.\n", "* [**EndpointConfigStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.EndpointConfigStep) - Creates the endpoint config step to define the new configuration for our endpoint.\n", "* [**EndpointStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.EndpointStep) - Creates the endpoint step to update our model endpoint.\n", "* [**FailStateStep**](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/states.html#stepfunctions.steps.states.Fail) - Creates fail state step within our workflow." ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [], "source": [ "# SageMaker expects unique names for each job, model and endpoint.\n", "# If these names are not unique the execution will fail.\n", "execution_input = ExecutionInput(\n", " schema={\n", " \"TrainingJobName\": str,\n", " \"GlueJobName\": str,\n", " \"QueryLambdaFunctionName\": str,\n", " \"RegisterLambdaFunctionName\": str,\n", " \"EvaluationStep\": str,\n", " \"ModelName\": str,\n", " \"EndpointName\": str,\n", " \"EndpointConfigName\": str\n", " }\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create an ETL step with AWS Glue\n", "In the following cell, we create a Glue step thats runs an AWS Glue job. The Glue job extracts the latest data from our source database, removes unnecessary columns, splits the data in to training and validation sets, and saves the data to CSV format in S3. Glue is performing this extraction, transformation, and load (ETL) in a serverless fashion, so there are no compute resources to configure and manage. See the [GlueStartJobRunStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/compute.html#stepfunctions.steps.compute.GlueStartJobRunStep) Compute step in the AWS Step Functions Data Science SDK documentation." ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [], "source": [ "etl_step = steps.GlueStartJobRunStep(\n", " \"GlueJobPreprocessing\",\n", " parameters={\n", " \"JobName\": execution_input[\"GlueJobName\"],\n", " \"Arguments\": {\n", " # Specify any arguments needed based on bucket and keys (e.g. input/output S3 locations)\n", " '--job-bookmark-option': 'job-bookmark-enable',\n", " '--additional-python-modules': 'pyarrow==2,awswrangler==2.9.0,fsspec==0.7.4',\n", " # Custom arguments below\n", " '--INPUT_DIR': raw_data,\n", " '--PROCESSED_DIR': processed_data\n", " },\n", " },\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create a SageMaker Training Step \n", "\n", "In the following cell, we create the training step and pass the estimator we defined above. See [TrainingStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.TrainingStep) in the AWS Step Functions Data Science SDK documentation to learn more." ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [], "source": [ "train_job_name = \"regression-{}\".format(id)\n", "\n", "training_step = steps.TrainingStep(\n", " \"ModelTraining\",\n", " estimator=xgb_train,\n", " data={\n", " \"train\": TrainingInput(train_data, content_type=\"text/csv\"),\n", " \"validation\": TrainingInput(validation_data, content_type=\"text/csv\"),\n", " },\n", " job_name=execution_input[\"TrainingJobName\"],\n", " wait_for_completion=True,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create a Model Step" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [], "source": [ "model_step = steps.ModelStep(\n", " \"SaveModel\",\n", " model=training_step.get_expected_model(),\n", " model_name=execution_input[\"ModelName\"],\n", " result_path=\"$.ModelStepResults\",\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create a ProcessingStep for Evaluation" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Upload evaluation code to `S3 Bucket`." ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [], "source": [ "input_evaluation_code = session.upload_data(\n", " \"./code/evaluation.py\",\n", " bucket=bucket,\n", " key_prefix=f\"{prefix}/code\",\n", ")" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [], "source": [ "model_data_s3_uri = f\"{model_output}/{train_job_name}/output/model.tar.gz\"\n", "output_model_evaluation_s3_uri = f\"s3://{bucket}/{prefix}/evaluation/\"\n", "\n", "inputs_evaluation = [\n", " ProcessingInput(\n", " source=test_data,\n", " destination=\"/opt/ml/processing/test\",\n", " input_name=\"test-data\",\n", " ),\n", " ProcessingInput(\n", " source=model_data_s3_uri,\n", " destination=\"/opt/ml/processing/model\",\n", " input_name=\"model_uri\",\n", " ),\n", " ProcessingInput(\n", " source=input_evaluation_code,\n", " destination=\"/opt/ml/processing/input/code\",\n", " input_name=\"code\",\n", " ),\n", "]\n", "\n", "outputs_evaluation = [\n", " ProcessingOutput(\n", " source=\"/opt/ml/processing/evaluation\",\n", " destination=output_model_evaluation_s3_uri,\n", " output_name=\"evaluation\",\n", " ),\n", "]" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [], "source": [ "model_evaluation_processor = SKLearnProcessor(\n", " framework_version=\"0.20.0\",\n", " role=sagemaker_execution_role,\n", " instance_type=\"ml.m5.xlarge\",\n", " instance_count=1,\n", " max_runtime_in_seconds=1200,\n", ")" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [], "source": [ "processing_evaluation_step = steps.ProcessingStep(\n", " \"Evaluation\",\n", " processor=model_evaluation_processor,\n", " job_name=execution_input[\"RegisterLambdaFunctionName\"],\n", " inputs=inputs_evaluation,\n", " outputs=outputs_evaluation,\n", " container_entrypoint=[\"python3\", \"/opt/ml/processing/input/code/evaluation.py\"],\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create a Lambda Step For Querying Evaluation Results\n", "In the following cell, we define a lambda step that will invoke the previously created lambda function as part of our Step Function workflow. See [LambdaStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/compute.html#stepfunctions.steps.compute.LambdaStep) in the AWS Step Functions Data Science SDK documentation to learn more." ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [], "source": [ "lambda_step = steps.compute.LambdaStep(\n", " \"QueryEvaluationResults\",\n", " parameters={\n", " \"FunctionName\": execution_input[\"QueryLambdaFunctionName\"],\n", " \"Payload\": {\n", " \"EvaluationResult\": output_model_evaluation_s3_uri,\n", " \"S3ModelArtifacts\": model_data_s3_uri\n", " },\n", " },\n", ")\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create a Choice State Step \n", "In the following cell, we create a choice step in order to build a dynamic workflow. This choice step branches based off of the results of our Evaluation step: did the training job fail or should the model be registered into Model Registry? We will add specfic rules to this choice step later on in section 8 of this notebook." ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [], "source": [ "check_accuracy_step = steps.states.Choice(\"CheckEvaluation\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If the accuracy of model is higher than threshold, you can ether registry the model or deploy model directly. We provide both options in this tutorial, but you may need to choose one of them in your case." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Lambda Step for model registry" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [], "source": [ "model_package_group_name = f\"CustomerChurnModelPackage\"\n", "\n", "model_register_step = steps.compute.LambdaStep(\n", " \"RegisterModel\",\n", " parameters={\n", " \"FunctionName\": execution_input[\"RegisterLambdaFunctionName\"],\n", " \"Payload\": {\n", " \"S3ModelArtifacts.$\": \"$.Payload.S3ModelArtifacts\",\n", " \"EvaluationResult\": output_model_evaluation_s3_uri,\n", " \"ImageUri\": image_uri,\n", " \"ModelPackageGroupName\": model_package_group_name\n", " },\n", " }\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create an Endpoint Configuration Step" ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [], "source": [ "endpoint_config_step = steps.EndpointConfigStep(\n", " \"CreateModelEndpointConfig\",\n", " endpoint_config_name=execution_input[\"EndpointConfigName\"],\n", " model_name=execution_input[\"ModelName\"],\n", " initial_instance_count=1,\n", " instance_type=\"ml.m4.xlarge\",\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create the Model Endpoint Step" ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [], "source": [ "endpoint_step = steps.EndpointStep(\n", " \"CreateModelEndpoint\",\n", " endpoint_name=execution_input[\"EndpointName\"],\n", " endpoint_config_name=execution_input[\"EndpointConfigName\"],\n", " update=False,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create the Fail State Step\n", "In addition, we create a Fail step which proceeds from our choice state if the validation accuracy of our model is lower than the threshold we define. See [FailStateStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/states.html#stepfunctions.steps.states.Fail) in the AWS Step Functions Data Science SDK documentation to learn more." ] }, { "cell_type": "code", "execution_count": 27, "metadata": {}, "outputs": [], "source": [ "fail_step = steps.states.Fail(\n", " \"Model Accuracy Too Low\", comment=\"Validation accuracy lower than threshold\"\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Add Rules to Choice State\n", "In the cells below, we add a threshold rule to our choice state. Therefore, if the validation accuracy of our model is below 0.90, we move to the Fail State. If the validation accuracy of our model is above 0.90, we move to the register model step." ] }, { "cell_type": "code", "execution_count": 28, "metadata": {}, "outputs": [], "source": [ "threshold_rule = steps.choice_rule.ChoiceRule.NumericGreaterThanEquals(\n", " variable=lambda_step.output()[\"Payload\"][\"trainingMetrics\"], value=0.90\n", ")\n", "\n", "check_accuracy_step.add_choice(rule=threshold_rule, next_step=model_register_step)\n", "model_register_step.next(endpoint_config_step)\n", "endpoint_config_step.next(endpoint_step)\n", "check_accuracy_step.default_choice(next_step=fail_step)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Link all the Steps Together\n", "Finally, create your workflow definition by chaining all of the steps together that we've created. See [Chain](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.states.Chain) in the AWS Step Functions Data Science SDK documentation to learn more." ] }, { "cell_type": "code", "execution_count": 29, "metadata": {}, "outputs": [], "source": [ "workflow_definition = steps.Chain(\n", " [etl_step, training_step, model_step, processing_evaluation_step, lambda_step, check_accuracy_step]\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "## 5. Run the Workflow\n", "Create your workflow using the workflow definition above, and render the graph with [render_graph](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.render_graph):" ] }, { "cell_type": "code", "execution_count": 30, "metadata": {}, "outputs": [], "source": [ "workflow = Workflow(\n", " name=\"MyInferenceRoutine_{}\".format(id),\n", " definition=workflow_definition,\n", " role=workflow_execution_role,\n", " execution_input=execution_input,\n", ")" ] }, { "cell_type": "code", "execution_count": 31, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "
\n", " \n", " \n", " \n", "
\n", "\n", "\n" ], "text/plain": [ "" ] }, "execution_count": 31, "metadata": {}, "output_type": "execute_result" } ], "source": [ "workflow.render_graph()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create the workflow in AWS Step Functions with [create](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.create).\n", "\n", "Please run this cell again when encounting the following error.\n", "```\n", "ClientError: An error occurred (AccessDeniedException) when calling the CreateStateMachine operation: Neither the global service principal states.amazonaws.com, nor the regional one is authorized to assume the provided role.\n", "```" ] }, { "cell_type": "code", "execution_count": 32, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\u001b[32m[INFO] Workflow created successfully on AWS Step Functions.\u001b[0m\n" ] }, { "data": { "text/plain": [ "'arn:aws:states:us-east-1:822507008821:stateMachine:MyInferenceRoutine_296da389c5134e1ea71c5a963cbea190'" ] }, "execution_count": 32, "metadata": {}, "output_type": "execute_result" } ], "source": [ "workflow.create()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Run the workflow with [execute](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.execute):" ] }, { "cell_type": "code", "execution_count": 33, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\u001b[32m[INFO] Workflow execution started successfully on AWS Step Functions.\u001b[0m\n" ] } ], "source": [ "model_name = f\"xgb-churn-pred-model-{id}\"\n", "endpoint_name = f\"xgbChurnPredEndpoint{id}\"\n", "endpoint_config_name = f\"xgbChurnPredEndpointConf{id}\"\n", "\n", "execution = workflow.execute(\n", " inputs={\n", " \"TrainingJobName\": train_job_name, # Each Sagemaker Job requires a unique name,\n", " \"GlueJobName\": glue_job_name,\n", " \"QueryLambdaFunctionName\": query_function_name,\n", " \"RegisterLambdaFunctionName\": model_register_function_name,\n", " \"EvaluationStep\": f\"processingstep-{id}\",\n", " \"ModelName\": model_name,\n", " \"EndpointName\": endpoint_name,\n", " \"EndpointConfigName\": endpoint_config_name\n", " }\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Render workflow progress with the [render_progress](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Execution.render_progress). This generates a snapshot of the current state of your workflow as it executes. This is a static image therefore you must run the cell again to check progress:" ] }, { "cell_type": "code", "execution_count": 34, "metadata": { "scrolled": true }, "outputs": [ { "data": { "text/html": [ "\n", "\n", "
\n", " \n", " \n", "
\n", "
    \n", "
  • \n", "
    \n", " Success\n", "
  • \n", "
  • \n", "
    \n", " Failed\n", "
  • \n", "
  • \n", "
    \n", " Cancelled\n", "
  • \n", "
  • \n", "
    \n", " In Progress\n", "
  • \n", "
  • \n", "
    \n", " Caught Error\n", "
  • \n", "
\n", "
\n", "\n", " \n", " Inspect in AWS Step Functions \n", "
\n", "\n", "\n" ], "text/plain": [ "" ] }, "execution_count": 34, "metadata": {}, "output_type": "execute_result" } ], "source": [ "execution.render_progress()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Use [list_executions](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.list_executions) to list all executions for a specific workflow:" ] }, { "cell_type": "code", "execution_count": 35, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", "
NameStatusStartedEnd Time
\n", " b1219731-3692-4d0d-b393-e37698b6850e\n", " RUNNINGJun 19, 2022 03:09:42.578 AM-
\n" ], "text/plain": [ "" ] }, "execution_count": 35, "metadata": {}, "output_type": "execute_result" } ], "source": [ "workflow.list_executions(html=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "## 6. Clean Up\n", "When you are done, make sure to clean up your AWS account by deleting resources you won't be reusing. Uncomment the code below and run the cell to delete the Glue job, Lambda function, and Step Function." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "from setup_iam_roles import delete_role\n", "\n", "workflow.delete()\n", "\n", "# Delete sagemaker resources\n", "sagemaker_client = boto3.client('sagemaker')\n", "try:\n", " sagemaker_client.delete_endpoint(EndpointName=endpoint_name)\n", "except Exception as e:\n", " print(f'Removing EndpointName {endpoint_name} failed. {e}')\n", "\n", "try:\n", " sagemaker_client.delete_endpoint_config(EndpointConfigName=endpoint_config_name)\n", "except Exception as e:\n", " print(f'Removing EndpointConfigName {endpoint_config_name} failed. {e}')\n", "\n", "try:\n", " sagemaker_client.delete_model(ModelName=model_name)\n", "except Exception as e:\n", " print(f'Removing ModelName {model_name} failed. {e}')\n", "\n", "try:\n", " sagemaker_client.delete_model_package(ModelPackageName=model_package_group_name)\n", "except Exception as e:\n", " print(f'Removing ModelPackageName {model_package_group_name} failed. {e}')\n", "\n", "# Delete roles\n", "for role_name in [sfn_role_name, glue_role_name, query_lambda_role_name]:\n", " delete_role(role_name)\n", "\n", "# Delete glue job\n", "glue_client.delete_job(JobName=glue_job_name)\n", "\n", "# Delete lambda functions\n", "try:\n", " lambda_client.delete_function(FunctionName=query_function_name)\n", "except Exception as e:\n", " print(e)\n", "\n", "try:\n", " lambda_client.delete_function(FunctionName=model_register_function_name)\n", "except Exception as e:\n", " print(e)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "conda_python3", "language": "python", "name": "conda_python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.13" } }, "nbformat": 4, "nbformat_minor": 4 }