{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Walkthrough MLOps Demo\n", "\n", "This notebook walks you through the whole steps in MLOps with SageMaker.\n", "\n", "- [1.Prepare the environment](#envpreparation)\n", "- [2.Data preparation](#datapreparation)\n", "- [3.Feature ingestion](#featureingestion)\n", "- [4.Model building](#modelbuilding)\n", "- [5.Asynchronous inference](#asyncinfer)\n", "- [6.Real-time inference](#realtimeinfer)\n", "- [7.Cleanup](#cleanup)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "## 1. Prepare the environment" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "!pip install pandas==1.1.5\n", "!pip install awswrangler" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import json\n", "import logging\n", "import boto3\n", "import io\n", "import glob\n", "import os\n", "import re\n", "from time import strftime,gmtime\n", "from botocore.exceptions import ClientError\n", "import urllib\n", "import sys\n", "import pandas as pd\n", "import awswrangler as wr\n", "import time\n", "\n", "import sagemaker\n", "from sagemaker import get_execution_role" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "logger = logging.getLogger(name='project')\n", "sagemaker_session = sagemaker.Session()\n", "boto_session = sagemaker_session.boto_session\n", "sagemaker_client = boto_session.client('sagemaker')\n", "sm_runtime = boto3.Session().client('sagemaker-runtime')\n", "region = sagemaker_session.boto_region_name\n", "\n", "role = get_execution_role()\n", "\n", "client = boto3.client('sts')\n", "account = client.get_caller_identity()['Account']\n", "\n", "bucket = sagemaker_session.default_bucket()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "project_name = # <--- fill here" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "## 2. Data preparation" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Download the data from [Standord AI Lab](https://ai.stanford.edu/~amaas/data/sentiment/). We stage data with SageMaker processing." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile ./processing/data_preparation.py\n", "\n", "import string\n", "import os\n", "import glob\n", "import re\n", "import pandas as pd\n", "import time\n", "import subprocess\n", "import argparse\n", "\n", "punc_list = string.punctuation # you can self define list of punctuation to remove here\n", "\n", "def remove_punctuation(text):\n", " \"\"\"\n", " This function takes strings containing self defined punctuations and returns\n", " strings with punctuations removed.\n", " Input(string): one tweet, contains punctuations in the self-defined list\n", " Output(string): one tweet, self-defined punctuations removed\n", " \"\"\"\n", " translator = str.maketrans(\"\", \"\", punc_list)\n", " return text.translate(translator)\n", "\n", "def staging_data(data_dir):\n", " for data_type in [\"train\", \"test\"]:\n", " data_list = []\n", " for label in [\"neg\", \"pos\"]:\n", " data_path = os.path.join(data_dir, data_type, label)\n", " for files in glob.glob(data_path + '/*.txt'):\n", " data_id = files.split('/')[-1].replace('.txt', '')\n", " with open(files, 'r') as f:\n", " line = f.readline()\n", " line = remove_punctuation(line)\n", " line = re.sub(\"\\s+\", \" \", line)\n", " data_list.append([data_id, line, label])\n", " \n", " data_df = pd.DataFrame(data_list, columns=[\"index\", \"text\", \"label\"])\n", " data_df[\"event_time\"] = time.time()\n", " data_df[\"data_type\"] = data_type\n", " #data_df.reset_index(inplace=True)\n", " data_df.to_csv(f'/opt/ml/processing/output/raw/{data_type}.csv', index=False)\n", "\n", "if __name__ == \"__main__\":\n", " parser = argparse.ArgumentParser()\n", " parser.add_argument(\"--raw-data-url\", type=str, required=True)\n", " args, _ = parser.parse_known_args()\n", " \n", " subprocess.run(f\"wget {args.raw_data_url} -O aclImdb_v1.tar.gz && tar --no-same-owner -xzf aclImdb_v1.tar.gz && rm aclImdb_v1.tar.gz\", shell=True)\n", " \n", " data_dir = f\"{os.getcwd()}/aclImdb\"\n", " staging_data(data_dir)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "from sagemaker.processing import ProcessingInput, ProcessingOutput\n", "from sagemaker.sklearn.processing import SKLearnProcessor\n", "\n", "sklearn_processor = SKLearnProcessor(\n", " framework_version=\"0.20.0\", role=role, instance_type=\"ml.m5.xlarge\", instance_count=1\n", ")\n", "\n", "sklearn_processor.run(\n", " code='processing/data_preparation.py',\n", " arguments = ['--raw-data-url', 'https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz'],\n", " outputs=[ProcessingOutput(output_name=\"raw_data\", source='/opt/ml/processing/output/raw')]\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "preprocessing_job_description = sklearn_processor.jobs[-1].describe()\n", "\n", "output_config = preprocessing_job_description[\"ProcessingOutputConfig\"]\n", "raw_data_dir = output_config[\"Outputs\"][0][\"S3Output\"][\"S3Uri\"]\n", "raw_data_dir" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!aws s3 ls $raw_data_dir/" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "train_df = wr.s3.read_csv(path=f\"{raw_data_dir}/train.csv\")\n", "train_df.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sample_data_dir = \"./data\"\n", "if not os.path.exists(sample_data_dir):\n", " os.makedirs(sample_data_dir)\n", "train_df[\"text\"][:50].to_csv(f\"{sample_data_dir}/sample_imdb.csv\", header=None, index=None)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "## 3. Feature ingestion" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 3.1 Launch feature ingestion pipeline." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "imdb_pipeline_name = f'{project_name}-imdb-preprocessing'\n", "\n", "imdb_pipeline_execution = sagemaker_client.start_pipeline_execution(\n", " PipelineName=imdb_pipeline_name,\n", " PipelineExecutionDisplayName=\"ManualExecution\",\n", " PipelineParameters=[\n", " {\"Name\": \"InputDataUrl_train\", \"Value\": f'{raw_data_dir}/train.csv'},\n", " {\"Name\": \"InputDataUrl_test\", \"Value\": f'{raw_data_dir}/test.csv'},\n", " ],\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 3.2 Verify feature ingestion" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "featurestore_runtime = boto_session.client(\n", " service_name=\"sagemaker-featurestore-runtime\", region_name=region\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "feature_group_name = \"imdb\"\n", "response = featurestore_runtime.get_record(\n", " FeatureGroupName=feature_group_name,\n", " RecordIdentifierValueAsString=\"3142_1\",\n", " )" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "record = response[\"Record\"]\n", "df = pd.DataFrame(record).set_index('FeatureName').transpose()\n", "df[\"text\"].tolist()[0]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "## 4. Model Building\n", "With data in the feature store, you can now start the model building pipeline. You can leave the default parameter values." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "build_pipeline_name = f'{project_name}-build-byoc'\n", "\n", "build_pipeline_execution = sagemaker_client.start_pipeline_execution(\n", " PipelineName=build_pipeline_name,\n", " PipelineExecutionDisplayName=\"ManualExecution\",\n", " PipelineParameters=[\n", " {\"Name\": \"TokenizerModelS3URI\", \"Value\": \"None\"},\n", " ],\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Manually setting the model status to Approved is required if you set ModelApprovalStatus to PendingManualApproval as below." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sagemaker_client = boto_session.client('sagemaker')\n", "\n", "model_package_group_name = f\"{project_name}-imdb\"\n", "\n", "model_list = sagemaker_client.list_model_packages(ModelPackageGroupName=model_package_group_name)[\"ModelPackageSummaryList\"]\n", "model_package_arn = model_list[0][\"ModelPackageArn\"]\n", "model_package_arn" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model_package_update_input_dict = {\n", " \"ModelPackageArn\" : model_package_arn,\n", " \"ModelApprovalStatus\" : \"Approved\"\n", "}\n", "model_package_update_response = sagemaker_client.update_model_package(**model_package_update_input_dict)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The status change of model will trigger endpoint serving, you can check the status of deploying endpoints in [AWS CodePipeline console](https://us-east-1.console.aws.amazon.com/codesuite/codepipeline/pipelines?). In this solution, we deploy endpoints for both real-time inference and [asychronous inference](https://docs.aws.amazon.com/sagemaker/latest/dg/async-inference.html)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "## 5. Asynchronous inference" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 5.1 Testing batch inference with asynchronous inference" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "input_s3_location = f\"s3://{bucket}/{project_name}/sample_data/sample_imdb.csv\"\n", "\n", "!aws s3 cp ./data/sample_imdb.csv $input_s3_location" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "async_endpoint_name = f\"{project_name}-byoc-asynchronous-async\"\n", "\n", "response = sm_runtime.invoke_endpoint_async(\n", " EndpointName=async_endpoint_name, \n", " InputLocation=input_s3_location\n", ")\n", "output_location = response['OutputLocation']\n", "print(f\"OutputLocation: {output_location}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def get_output(output_location):\n", " output_url = urllib.parse.urlparse(output_location)\n", " bucket = output_url.netloc\n", " key = output_url.path[1:]\n", " while True:\n", " try:\n", " return sagemaker_session.read_s3_file(bucket=output_url.netloc, key_prefix=output_url.path[1:])\n", " except ClientError as e:\n", " if e.response['Error']['Code'] == 'NoSuchKey':\n", " print(\"waiting for output...\")\n", " time.sleep(2)\n", " continue\n", " raise" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "output = get_output(output_location)\n", "print(f\"Output size in bytes: {((sys.getsizeof(output)))}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "async_infer_res = \"./data/async_res.json\"\n", "\n", "!aws s3 cp $output_location $async_infer_res\n", "\n", "with open(async_infer_res, 'r') as f:\n", " async_res = json.load(f)\n", "async_res" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 5.2 Testing auto-scaling with multiple invocations (comming soon!)\n", "We enable auto scaling by monitoring the metric `ApproximateBacklogSizePerInstance`. You can find more details about [Asynchronous Inference Endpoint Metrics](https://docs.aws.amazon.com/sagemaker/latest/dg/async-inference-monitor.html). Jump to [CloudWatch](https://console.aws.amazon.com/cloudwatch/home), search endpoint name in search box of `Metrics` console, select `SageMaker>EndpointName` in `AWS namespaces`, you can find `ApproximateBacklogSizePerInstance`.\n", "\n", "