{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Walkthrough MLOps Demo\n", "\n", "This notebook walks you through the whole steps in MLOps with SageMaker.\n", "\n", "- [1.Prepare the environment](#envpreparation)\n", "- [2.Data preparation](#datapreparation)\n", "- [3.Feature ingestion](#featureingestion)\n", "- [4.Model building](#modelbuilding)\n", "- [5.Asynchronous inference](#asyncinfer)\n", "- [6.Real-time inference](#realtimeinfer)\n", "- [7.Cleanup](#cleanup)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "## 1. Prepare the environment" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "!pip install pandas==1.1.5\n", "!pip install awswrangler" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import json\n", "import logging\n", "import boto3\n", "import io\n", "import glob\n", "import os\n", "import re\n", "from time import strftime,gmtime\n", "from botocore.exceptions import ClientError\n", "import urllib\n", "import sys\n", "import pandas as pd\n", "import awswrangler as wr\n", "import time\n", "\n", "import sagemaker\n", "from sagemaker import get_execution_role" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "logger = logging.getLogger(name='project')\n", "sagemaker_session = sagemaker.Session()\n", "boto_session = sagemaker_session.boto_session\n", "sagemaker_client = boto_session.client('sagemaker')\n", "sm_runtime = boto3.Session().client('sagemaker-runtime')\n", "region = sagemaker_session.boto_region_name\n", "\n", "role = get_execution_role()\n", "\n", "client = boto3.client('sts')\n", "account = client.get_caller_identity()['Account']\n", "\n", "bucket = sagemaker_session.default_bucket()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "project_name = # <--- fill here" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "## 2. Data preparation" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Download the data from [Standord AI Lab](https://ai.stanford.edu/~amaas/data/sentiment/). We stage data with SageMaker processing." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile ./processing/data_preparation.py\n", "\n", "import string\n", "import os\n", "import glob\n", "import re\n", "import pandas as pd\n", "import time\n", "import subprocess\n", "import argparse\n", "\n", "punc_list = string.punctuation # you can self define list of punctuation to remove here\n", "\n", "def remove_punctuation(text):\n", " \"\"\"\n", " This function takes strings containing self defined punctuations and returns\n", " strings with punctuations removed.\n", " Input(string): one tweet, contains punctuations in the self-defined list\n", " Output(string): one tweet, self-defined punctuations removed\n", " \"\"\"\n", " translator = str.maketrans(\"\", \"\", punc_list)\n", " return text.translate(translator)\n", "\n", "def staging_data(data_dir):\n", " for data_type in [\"train\", \"test\"]:\n", " data_list = []\n", " for label in [\"neg\", \"pos\"]:\n", " data_path = os.path.join(data_dir, data_type, label)\n", " for files in glob.glob(data_path + '/*.txt'):\n", " data_id = files.split('/')[-1].replace('.txt', '')\n", " with open(files, 'r') as f:\n", " line = f.readline()\n", " line = remove_punctuation(line)\n", " line = re.sub(\"\\s+\", \" \", line)\n", " data_list.append([data_id, line, label])\n", " \n", " data_df = pd.DataFrame(data_list, columns=[\"index\", \"text\", \"label\"])\n", " data_df[\"event_time\"] = time.time()\n", " data_df[\"data_type\"] = data_type\n", " #data_df.reset_index(inplace=True)\n", " data_df.to_csv(f'/opt/ml/processing/output/raw/{data_type}.csv', index=False)\n", "\n", "if __name__ == \"__main__\":\n", " parser = argparse.ArgumentParser()\n", " parser.add_argument(\"--raw-data-url\", type=str, required=True)\n", " args, _ = parser.parse_known_args()\n", " \n", " subprocess.run(f\"wget {args.raw_data_url} -O aclImdb_v1.tar.gz && tar --no-same-owner -xzf aclImdb_v1.tar.gz && rm aclImdb_v1.tar.gz\", shell=True)\n", " \n", " data_dir = f\"{os.getcwd()}/aclImdb\"\n", " staging_data(data_dir)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "from sagemaker.processing import ProcessingInput, ProcessingOutput\n", "from sagemaker.sklearn.processing import SKLearnProcessor\n", "\n", "sklearn_processor = SKLearnProcessor(\n", " framework_version=\"0.20.0\", role=role, instance_type=\"ml.m5.xlarge\", instance_count=1\n", ")\n", "\n", "sklearn_processor.run(\n", " code='processing/data_preparation.py',\n", " arguments = ['--raw-data-url', 'https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz'],\n", " outputs=[ProcessingOutput(output_name=\"raw_data\", source='/opt/ml/processing/output/raw')]\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "preprocessing_job_description = sklearn_processor.jobs[-1].describe()\n", "\n", "output_config = preprocessing_job_description[\"ProcessingOutputConfig\"]\n", "raw_data_dir = output_config[\"Outputs\"][0][\"S3Output\"][\"S3Uri\"]\n", "raw_data_dir" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!aws s3 ls $raw_data_dir/" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "train_df = wr.s3.read_csv(path=f\"{raw_data_dir}/train.csv\")\n", "train_df.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sample_data_dir = \"./data\"\n", "if not os.path.exists(sample_data_dir):\n", " os.makedirs(sample_data_dir)\n", "train_df[\"text\"][:50].to_csv(f\"{sample_data_dir}/sample_imdb.csv\", header=None, index=None)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "## 3. Feature ingestion" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 3.1 Launch feature ingestion pipeline." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "imdb_pipeline_name = f'{project_name}-imdb-preprocessing'\n", "\n", "imdb_pipeline_execution = sagemaker_client.start_pipeline_execution(\n", " PipelineName=imdb_pipeline_name,\n", " PipelineExecutionDisplayName=\"ManualExecution\",\n", " PipelineParameters=[\n", " {\"Name\": \"InputDataUrl_train\", \"Value\": f'{raw_data_dir}/train.csv'},\n", " {\"Name\": \"InputDataUrl_test\", \"Value\": f'{raw_data_dir}/test.csv'},\n", " ],\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 3.2 Verify feature ingestion" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "featurestore_runtime = boto_session.client(\n", " service_name=\"sagemaker-featurestore-runtime\", region_name=region\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "feature_group_name = \"imdb\"\n", "response = featurestore_runtime.get_record(\n", " FeatureGroupName=feature_group_name,\n", " RecordIdentifierValueAsString=\"3142_1\",\n", " )" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "record = response[\"Record\"]\n", "df = pd.DataFrame(record).set_index('FeatureName').transpose()\n", "df[\"text\"].tolist()[0]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "## 4. Model Building\n", "With data in the feature store, you can now start the model building pipeline. You can leave the default parameter values." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "build_pipeline_name = f'{project_name}-build-byoc'\n", "\n", "build_pipeline_execution = sagemaker_client.start_pipeline_execution(\n", " PipelineName=build_pipeline_name,\n", " PipelineExecutionDisplayName=\"ManualExecution\",\n", " PipelineParameters=[\n", " {\"Name\": \"TokenizerModelS3URI\", \"Value\": \"None\"},\n", " ],\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Manually setting the model status to Approved is required if you set ModelApprovalStatus to PendingManualApproval as below." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sagemaker_client = boto_session.client('sagemaker')\n", "\n", "model_package_group_name = f\"{project_name}-imdb\"\n", "\n", "model_list = sagemaker_client.list_model_packages(ModelPackageGroupName=model_package_group_name)[\"ModelPackageSummaryList\"]\n", "model_package_arn = model_list[0][\"ModelPackageArn\"]\n", "model_package_arn" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model_package_update_input_dict = {\n", " \"ModelPackageArn\" : model_package_arn,\n", " \"ModelApprovalStatus\" : \"Approved\"\n", "}\n", "model_package_update_response = sagemaker_client.update_model_package(**model_package_update_input_dict)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The status change of model will trigger endpoint serving, you can check the status of deploying endpoints in [AWS CodePipeline console](https://us-east-1.console.aws.amazon.com/codesuite/codepipeline/pipelines?). In this solution, we deploy endpoints for both real-time inference and [asychronous inference](https://docs.aws.amazon.com/sagemaker/latest/dg/async-inference.html)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "## 5. Asynchronous inference" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 5.1 Testing batch inference with asynchronous inference" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "input_s3_location = f\"s3://{bucket}/{project_name}/sample_data/sample_imdb.csv\"\n", "\n", "!aws s3 cp ./data/sample_imdb.csv $input_s3_location" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "async_endpoint_name = f\"{project_name}-byoc-asynchronous-async\"\n", "\n", "response = sm_runtime.invoke_endpoint_async(\n", " EndpointName=async_endpoint_name, \n", " InputLocation=input_s3_location\n", ")\n", "output_location = response['OutputLocation']\n", "print(f\"OutputLocation: {output_location}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def get_output(output_location):\n", " output_url = urllib.parse.urlparse(output_location)\n", " bucket = output_url.netloc\n", " key = output_url.path[1:]\n", " while True:\n", " try:\n", " return sagemaker_session.read_s3_file(bucket=output_url.netloc, key_prefix=output_url.path[1:])\n", " except ClientError as e:\n", " if e.response['Error']['Code'] == 'NoSuchKey':\n", " print(\"waiting for output...\")\n", " time.sleep(2)\n", " continue\n", " raise" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "output = get_output(output_location)\n", "print(f\"Output size in bytes: {((sys.getsizeof(output)))}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "async_infer_res = \"./data/async_res.json\"\n", "\n", "!aws s3 cp $output_location $async_infer_res\n", "\n", "with open(async_infer_res, 'r') as f:\n", " async_res = json.load(f)\n", "async_res" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 5.2 Testing auto-scaling with multiple invocations (comming soon!)\n", "We enable auto scaling by monitoring the metric `ApproximateBacklogSizePerInstance`. You can find more details about [Asynchronous Inference Endpoint Metrics](https://docs.aws.amazon.com/sagemaker/latest/dg/async-inference-monitor.html). Jump to [CloudWatch](https://console.aws.amazon.com/cloudwatch/home), search endpoint name in search box of `Metrics` console, select `SageMaker>EndpointName` in `AWS namespaces`, you can find `ApproximateBacklogSizePerInstance`.\n", "\n", "
Check asynchronous inference endpoint Metrics
\n", "\n", "For the auto scaling setting, you can refer to [Run computer vision inference on large videos with Amazon SageMaker asynchronous endpoints](https://aws.amazon.com/blogs/machine-learning/run-computer-vision-inference-on-large-videos-with-amazon-sagemaker-asynchronous-endpoints/)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def upload_file(input_location):\n", " prefix = f\"{project_name}/input\"\n", " return sagemaker_session.upload_data(\n", " input_location,\n", " bucket=bucket,\n", " key_prefix=prefix,\n", " extra_args={\"ContentType\": \"text/libsvm\"},\n", " )" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "inferences = []\n", "\n", "async_endpoint_name = \"mlops-byoc02-byoc-asynchronous-async\"\n", "\n", "input_file = \"./data/sample_imdb.csv\"\n", "for i in range(100):\n", " response = sm_runtime.invoke_endpoint_async(\n", " EndpointName=async_endpoint_name, InputLocation=input_s3_location\n", " )\n", " output_location = response[\"OutputLocation\"]\n", " inferences += [(input_file, output_location)]\n", " time.sleep(0.5)\n", "\n", "for input_file, output_location in inferences:\n", " output = get_output(output_location)\n", " print(f\"Input File: {input_file}, Output: {output}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "## 6. Real-time inference\n", "\n", "### 6.1 Testing real-time inference endpoint" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sample_df = pd.read_csv('./data/sample_imdb.csv', header=None)\n", "sample_df.columns = [\"text\"]\n", "sample_df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Error will occur when parameter of request is too long, where asynchronous inference would be an alternative." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sample_list = sample_df[\"text\"].values.tolist()[:5]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df_record = pd.DataFrame({\"inputs\": sample_list})\n", "csv_file = io.StringIO()\n", "df_record.to_csv(csv_file, sep=\",\", header=False, index=False)\n", "payload_as_csv = csv_file.getvalue()\n", "\n", "endpoint_name = f\"{project_name}-byoc\"\n", "\n", "response = sm_runtime.invoke_endpoint(\n", " EndpointName=endpoint_name,\n", " Body= payload_as_csv,\n", " ContentType = 'text/csv'\n", ")\n", "\n", "body = response[\"Body\"].read()\n", "msg = body.decode(\"utf-8\")\n", "data = json.loads(msg)\n", "data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 6.2 Request API Gateway\n", "Jump to https://console.aws.amazon.com/apigateway/main/apis to find the name(`{project_name}_api`) of API Gateway. Then select `prod->GET->get-{project_name}-byoc->GET` in `Stages` console. You will find the invoke URL like below:\n", "\n", "```\n", "https://tkga9zza0a.execute-api.{region}.amazonaws.com/prod/get-{project_name}-byoc\n", "```" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!curl -X GET https:///prod/get--byoc?index=3142_1" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "## 7. Cleanup\n", "\n", "Jump to https://console.aws.amazon.com/cloudformation/home to delete the stacks created in this project, or run the following cell to delete all stacks in this project. All resources built in this project will be deleted by deleting stacks." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!aws cloudformation delete-stack --stack-name $project_name-FeatureStore\n", "!aws cloudformation delete-stack --stack-name $project_name-BuildModelStack\n", "!aws cloudformation delete-stack --stack-name $project_name-ServingStack" ] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 4 }