{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Automate Retraining of Models using SageMaker Pipelines\n", "\n", "# Learning Objectives\n", "1. Construct a [SageMaker Pipeline](https://aws.amazon.com/sagemaker/pipelines/) that consists of a data preprocessing step and a model training step.\n", "2. Execute a SageMaker Pipeline manually\n", "3. (optional) Build infrastructure, using [CloudFormation](https://aws.amazon.com/cloudformation/) and [AWS Lambda](https://aws.amazon.com/lambda/) to allow the Pipeline steps be executed in an event-driven manner when new data is dropped in S3.\n", "\n", "\n", "## Introduction\n", "This workshop shows how you can build and deploy SageMaker Pipelines for multistep processes. In this example, we will build a pipeline that:\n", "\n", " 1. Deduplicates the underlying data\n", " \n", " 2. Trains a built-in SageMaker algorithm (XGBoost) \n", "\n", "A common workflow is that models need to be retrained when new data arrives. This notebook also shows how you can set up a Lambda function that will retrigger the retraining pipeline when new data comes in.\n", "\n", "Please use the `Python 3 (Data Science)` kernel for this workshop." ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "import boto3\n", "import json\n", "import logging\n", "import os\n", "import pandas\n", "import sagemaker\n", "from sagemaker.workflow.parameters import ParameterString\n", "from sagemaker.workflow.steps import ProcessingStep, TrainingStep\n", "from sagemaker.sklearn.processing import SKLearnProcessor\n", "from sagemaker.workflow.pipeline import Pipeline\n", "from sagemaker.inputs import TrainingInput\n", "from sagemaker.processing import ProcessingInput, ProcessingOutput\n", "from sagemaker.estimator import Estimator\n", "from time import gmtime, strftime\n", "\n", "# set logs if not done already\n", "logger = logging.getLogger(\"log\")\n", "if not logger.handlers:\n", " logger.setLevel(logging.INFO)\n", " logger.addHandler(logging.StreamHandler())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "First, get permissions and other information. We will also create a pipeline name" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "session = sagemaker.Session()\n", "default_bucket = session.default_bucket()\n", "role = sagemaker.get_execution_role()\n", "region = boto3.Session().region_name\n", "s3_client = boto3.client(\"s3\", region_name=region)\n", "\n", "current_timestamp = strftime(\"%m-%d-%H-%M\", gmtime())\n", "pipeline_name = f\"my-pipeline-{current_timestamp}\"\n", "prefix = f\"pipeline-lab{current_timestamp}\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Transfer Data into Your Account" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "copy_source = {\n", " \"Bucket\": \"aws-hcls-ml\",\n", " \"Key\": \"workshop/immersion_day_workshop_data_DO_NOT_DELETE/data/ObesityDataSet_with_duplicates.csv\",\n", "}\n", "s3_client.copy(\n", " copy_source, default_bucket, f\"{prefix}/ObesityDataSet_with_duplicates.csv\"\n", ")\n", "\n", "copy_source = {\n", " \"Bucket\": \"aws-hcls-ml\",\n", " \"Key\": \"workshop/immersion_day_workshop_data_DO_NOT_DELETE/kick_off_sagemaker_pipelines_lambda/other_material/lambda.zip\",\n", "}\n", "s3_client.copy(copy_source, default_bucket, f\"{prefix}/lambda.zip\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Define the Pipeline\n", "\n", "First we will create a preprocessing step. The preprocessing step simply removes duplicated rows from the dataset. The `preprocessing.py` script will be written locally, and then built as a SageMaker Pipelines step." ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "input_data = ParameterString(\n", " name=\"InputData\",\n", " default_value=f\"s3://{default_bucket}/{prefix}/ObesityDataSet_with_duplicates.csv\",\n", ")" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Writing preprocessing.py\n" ] } ], "source": [ "%%writefile preprocessing.py\n", "import pandas\n", "import os\n", "base_dir = \"/opt/ml/processing/input\"\n", "the_files = os.listdir(base_dir)\n", "the_file=[i for i in the_files if \".csv\" in i][0] #get the first csv\n", "print(the_file)\n", "df_1=pandas.read_csv(f'{base_dir}/{the_file}',engine='python')\n", "df_2=df_1.drop_duplicates()\n", "df_2.to_csv(f'/opt/ml/processing/output/deduped_{the_file}.csv') \n" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "# Specify the container and framework options\n", "\n", "sklearn_processor = SKLearnProcessor(\n", " framework_version=\"0.23-1\",\n", " instance_type=\"ml.t3.medium\",\n", " instance_count=1,\n", " base_job_name=\"sklearn-abalone-process\",\n", " role=role,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now will will turn the preprocessing step as a SageMaker Processing Step with SageMaker Pipelines." ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "step_process = ProcessingStep(\n", " name=\"deduplication-process\",\n", " processor=sklearn_processor,\n", " inputs=[\n", " ProcessingInput(source=input_data, destination=\"/opt/ml/processing/input\"),\n", " ],\n", " outputs=[\n", " ProcessingOutput(output_name=\"deduplicated\", source=\"/opt/ml/processing/output\")\n", " ],\n", " code=\"preprocessing.py\",\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Define the Model\n", "Now we will create a SageMaker model. We will use the SageMaker built-in XGBoost Algorithm." ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "# Define the model training parameters\n", "model_path = f\"s3://{default_bucket}/{prefix}/myPipelineTrain\"\n", "\n", "image_uri = sagemaker.image_uris.retrieve(\n", " framework=\"xgboost\",\n", " region=region,\n", " version=\"1.0-1\",\n", " py_version=\"py3\",\n", " instance_type=\"ml.m5.large\",\n", ")\n", "xgb_train = Estimator(\n", " image_uri=image_uri,\n", " instance_type=\"ml.m5.large\",\n", " instance_count=1,\n", " output_path=model_path,\n", " role=role,\n", ")\n", "xgb_train.set_hyperparameters(\n", " objective=\"reg:linear\",\n", " num_round=50,\n", " max_depth=5,\n", " eta=0.2,\n", " gamma=4,\n", " min_child_weight=6,\n", " subsample=0.7,\n", " silent=0,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Turn the model training into a SageMaker Pipeline Training Step." ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "# Define the training steps\n", "\n", "step_train = TrainingStep(\n", " name=\"model-training\",\n", " estimator=xgb_train,\n", " inputs={\n", " \"train\": TrainingInput(\n", " s3_data=step_process.properties.ProcessingOutputConfig.Outputs[\n", " \"deduplicated\"\n", " ].S3Output.S3Uri,\n", " content_type=\"text/csv\",\n", " ),\n", " \"validation\": TrainingInput(\n", " s3_data=step_process.properties.ProcessingOutputConfig.Outputs[\n", " \"deduplicated\"\n", " ].S3Output.S3Uri,\n", " content_type=\"text/csv\",\n", " ),\n", " },\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create the Pipeline" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "{'PipelineArn': 'arn:aws:sagemaker:us-east-1:111918798052:pipeline/obesitymodelretrainingpipeline',\n", " 'ResponseMetadata': {'RequestId': '3d83e959-f472-41e1-bca3-e518d87770d1',\n", " 'HTTPStatusCode': 200,\n", " 'HTTPHeaders': {'x-amzn-requestid': '3d83e959-f472-41e1-bca3-e518d87770d1',\n", " 'content-type': 'application/x-amz-json-1.1',\n", " 'content-length': '98',\n", " 'date': 'Mon, 30 Jan 2023 15:09:38 GMT'},\n", " 'RetryAttempts': 0}}" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Create a two-step data processing and model training pipeline\n", "\n", "pipeline_name = \"ObesityModelRetrainingPipeLine\"\n", "pipeline = Pipeline(\n", " name=pipeline_name,\n", " parameters=[\n", " input_data,\n", " ],\n", " steps=[step_process, step_train],\n", ")\n", "pipeline.upsert(role_arn=role)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Run the Pipeline On Separate Files" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "### Execution 1" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [], "source": [ "\n", "s3uri=f\"s3://{default_bucket}/{prefix}/ObesityDataSet_with_duplicates.csv\"\n", "client = boto3.client(\"sagemaker\")\n", "PipelineParameters = [\n", " {\"Name\": \"InputData\", \"Value\": f\"{s3uri}\"},\n", "]\n", "response = client.start_pipeline_execution(\n", " PipelineName=pipeline_name, PipelineParameters=PipelineParameters\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Execution 2" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [], "source": [ "copy_source = {\n", " \"Bucket\": \"aws-hcls-ml\",\n", " \"Key\": \"workshop/immersion_day_workshop_data_DO_NOT_DELETE/data/ObesityDataSet_with_duplicates.csv\",\n", "}\n", "s3_client.copy(\n", " copy_source, default_bucket, f\"{prefix}/Second_File_ObesityDataSet_with_duplicates.csv\"\n", ")\n" ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [], "source": [ "second_s3uri=s3uri=f\"s3://{default_bucket}/{prefix}/Second_File_ObesityDataSet_with_duplicates.csv\"\n", "PipelineParameters = [\n", " {\"Name\": \"InputData\", \"Value\": f\"{second_s3uri}\"},\n", "]\n", "response = client.start_pipeline_execution(\n", " PipelineName=pipeline_name, PipelineParameters=PipelineParameters\n", ")" ] }, { "attachments": { "image.png": { "image/png": "" } }, "cell_type": "markdown", "metadata": {}, "source": [ "## (Optional) Deploy a CloudFormation Template to retrain the Pipeline\n", "\n", "Now we will deploy a cloudformation template that will allow for automated calling of the Pipeline when new files are dropped in an S3 bucket.\n", "\n", "The architecture looks like this:\n", "\n", "![image.png](attachment:image.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "NOTE: In order to run the following steps you must first associate the following IAM policies to your SageMaker execution role:\n", "- cloudformation:CreateStack\n", "- cloudformation:DeleteStack\n", "- cloudformation:DescribeStacks\n", "- iam:CreateRole\n", "- iam:DeleteRole\n", "- iam:DeleteRolePolicy\n", "- iam:GetRole\n", "- iam:GetRolePolicy\n", "- iam:PassRole\n", "- iam:PutRolePolicy\n", "- lambda:AddPermission\n", "- lambda:CreateFunction\n", "- lambda:GetFunction\n", "- lambda:DeleteFuncton" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create a new CloudFormation stack to trigger retraining with new data\n", "\n", "stack_name = \"sagemaker-automated-retraining\"\n", "\n", "with open(\"cfn_sagemaker_pipelines.yaml\") as f:\n", " template_str = f.read()\n", "cfn = boto3.client(\"cloudformation\")\n", "cfn.create_stack(\n", " StackName=stack_name,\n", " TemplateBody=template_str,\n", " Capabilities=[\"CAPABILITY_IAM\"],\n", " Parameters=[\n", " {\"ParameterKey\": \"StaticCodeBucket\", \"ParameterValue\": default_bucket},\n", " {\"ParameterKey\": \"StaticCodeKey\", \"ParameterValue\": f\"{prefix}/lambda.zip\"},\n", " ],\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Wait until stack creation is complete\n", "waiter = cfn.get_waiter(\"stack_create_complete\")\n", "waiter.wait(StackName=stack_name)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Identify the S3 bucket for triggering the training pipeline\n", "input_bucket_name = cfn.describe_stacks(StackName=stack_name)[\"Stacks\"][0][\"Outputs\"][0][\"OutputValue\"]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Copy the training data to the input bucket to start a new pipeline execution\n", "copy_source = {\n", " \"Bucket\": default_bucket,\n", " \"Key\": f\"{prefix}/ObesityDataSet_with_duplicates.csv\",\n", "}\n", "s3_client.copy(copy_source, input_bucket_name, \"ObesityDataSet_with_duplicates.csv\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### (Optional)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "1. Inspect that the `InputBucket` has new data\n", "2. Examine the `SageMaker Pipelines` execution from the SageMaker Studio console" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#!aws s3 rm --recursive s3://{input_bucket_name}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Closing" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this notebook we demonstrated how to create a SageMaker pipeline for data processing and model training and triggered it using an S3 event." ] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 4 }