{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Targeting Direct Marketing with Amazon SageMaker XGBoost\n" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "\n", "This notebook's CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook. \n", "\n", "![This us-west-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/us-west-2/inference|structured|async|default_server|single_model|deploy_all_options_xgb.ipynb)\n", "\n", "---" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "_**Deploy a trained Gradient Boosted Trees model in SageMaker: A Binary Prediction Problem With Unbalanced Classes**_\n", "\n", "---\n", "## Deployments with bring your own model and custom inference script\n", "With Amazon SageMaker, you can deploy your machine learning (ML) models to make predictions, also known as inference. SageMaker provides a broad selection of ML infrastructure and model deployment options to help meet all your ML inference needs. It is a fully managed service and integrates with MLOps tools, so you can scale your model deployment, reduce inference costs, manage models more effectively in production, and reduce operational burden.\n", "\n", "After you’ve built and trained a machine learning model, you can use SageMaker Inference to start getting predictions, or inferences, from your model. With SageMaker Inference, you can either set up an endpoint that returns inferences or run Asynchronous inference workloads\n", "\n", "To get started with SageMaker Inference, see the following sections and review the Inference options https://docs.aws.amazon.com/sagemaker/latest/dg/deploy-model.html#deploy-model-options to determine which feature best fits your use case.\n", "\n", "#### Background for Model and data set\n", "Direct marketing, either through mail, email, phone, etc., is a common tactic to acquire customers. Because resources and a customer's attention is limited, the goal is to only target the subset of prospects who are likely to engage with a specific offer. Predicting those potential customers based on readily available information like demographics, past interactions, and environmental factors is a common machine learning problem.\n", "\n", "The data set is available at https://sagemaker-sample-data-us-west-2.s3-us-west-2.amazonaws.com/autopilot/direct_marketing/bank-additional.zip\n", "\n", "#### For purpose of this notebook we wil execute the following steps\n", "* Visualize the data set used to train the model\n", "* Upload the test data set to S3 for leveraging during the test runs of the Inferencing\n", "* Set up the following End points\n", " * Real-time Inference endpoint\n", " * Serverless Inference endpoint\n", " * Asynchronous Inference endpoint\n", "* Investigate the model latency times and pros and cons of the approach\n", "\n", "Optoinal section:\n", "* Scaling options and showcase how to scale endpoints in SageMaker\n", "\n", "---\n", "\n", "### Preparation\n", "\n", "_This notebook was created and tested on an ml.m4.xlarge notebook instance._\n", "\n", "Let's start by specifying:\n", "\n", "- The S3 bucket and prefix that you want to use for training and model data. This should be within the same region as the Notebook Instance, training, and hosting.\n", "- The IAM role arn used to give training and hosting access to your data. See the documentation for how to create these. Note, if more than one role is required for notebook instances, training, and/or hosting, please replace the boto regexp with a the appropriate full IAM role arn string(s)." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "isConfigCell": true, "tags": [] }, "outputs": [], "source": [ "# cell 01\n", "import sagemaker\n", "\n", "bucket = sagemaker.Session().default_bucket()\n", "prefix = \"sagemaker/DEMO-xgboost-dm\"\n", "\n", "# Define IAM role\n", "import boto3\n", "import re\n", "from sagemaker import get_execution_role\n", "\n", "role = get_execution_role()\n", "\n", "# cell 02\n", "import numpy as np # For matrix operations and numerical processing\n", "import pandas as pd # For munging tabular data\n", "import matplotlib.pyplot as plt # For charts and visualizations\n", "from IPython.display import Image # For displaying images in the notebook\n", "from IPython.display import display # For displaying outputs in the notebook\n", "from time import gmtime, strftime # For labeling SageMaker models, endpoints, etc.\n", "import sys # For writing outputs to notebook\n", "import math # For ceiling function\n", "import json # For parsing hosting outputs\n", "import os # For manipulating filepath names\n", "import sagemaker\n", "import zipfile # Amazon SageMaker's Python SDK provides many helper functions" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now let's bring in the Python libraries that we'll use throughout the analysis" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# cell 02\n", "import numpy as np # For matrix operations and numerical processing\n", "import pandas as pd # For munging tabular data\n", "import matplotlib.pyplot as plt # For charts and visualizations\n", "from IPython.display import Image # For displaying images in the notebook\n", "from IPython.display import display # For displaying outputs in the notebook\n", "from time import gmtime, strftime # For labeling SageMaker models, endpoints, etc.\n", "import sys # For writing outputs to notebook\n", "import math # For ceiling function\n", "import json # For parsing hosting outputs\n", "import os # For manipulating filepath names\n", "import sagemaker\n", "import zipfile # Amazon SageMaker's Python SDK provides many helper functions\n", "from sagemaker.multidatamodel import MultiDataModel\n", "import time" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Prerequisite\n", "\n", "Upload the model and the data to S3 for deployments. We will simulate the bring your own model concept which assumes the model is already in S3" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Visualize the Data we are going to use to run predictions on. The model will essentially predict the last column" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# cell 03\n", "data = pd.read_csv(\"./data_xgb/bank-additional.csv\")\n", "pd.set_option(\"display.max_columns\", 500) # Make sure we can see all of the columns\n", "pd.set_option(\"display.max_rows\", 20) # Keep the output on one page\n", "data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Upload the Model and test data artifacts into S3 for simulations" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Cell 04\n", "import sagemaker\n", "\n", "import boto3\n", "import os\n", "import time\n", "import json\n", "import re" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Cell 05\n", "\n", "prefix = \"sagemaker/DEMO-xgboost-dm\"\n", "role = sagemaker.get_execution_role() # execution role for the endpoint\n", "sess = sagemaker.session.Session() # sagemaker session for interacting with different AWS APIs\n", "def_bucket = sess.default_bucket() # bucket to house artifacts\n", "model_bucket = sess.default_bucket() # bucket to house artifacts\n", "\n", "region = sess._region_name\n", "account_id = sess.account_id()\n", "\n", "s3_client = boto3.client(\"s3\")\n", "sm_client = boto3.client(\"sagemaker\")\n", "smr_client = boto3.client(\"sagemaker-runtime\")\n", "\n", "print(role)\n", "print(region)\n", "\n", "# - upload the model -- use this if you want to test with your own trained model\n", "# s3_model_path = sess.upload_data(\n", "# \"./models_xgb/model.tar.gz\",\n", "# def_bucket,\n", "# \"sagemaker/DEMO-xgboost-dm/output/xgboost-2023-01-20-01-45-52-042/output\",\n", "# ) # - file, bucket, key_prefix\n", "\n", "s3_model_path = (\n", " f\"s3://sagemaker-examples-files-prod-{region}/models/xgboost/hosting/alloptions/model.tar.gz\"\n", ")\n", "\n", "print(s3_model_path)\n", "# os.remove(output_filename)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Upload the Testing data and the Ground Truth\n", "\n", "The full data set is a 4119, 58 matrix" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Cell 06\n", "\n", "# - Upload the Test Data and the Ground truth\n", "\n", "s3_test_path = sess.upload_data(\n", " \"./data_xgb/test_x.csv\",\n", " def_bucket,\n", " \"sagemaker/DEMO-xgboost-dm/output/xgboost-2023-01-20-01-45-52-042/data\",\n", ") # - file, bucket, key_prefix\n", "\n", "s3_y_path = sess.upload_data(\n", " \"./data_xgb/test_y.csv\",\n", " def_bucket,\n", " \"sagemaker/DEMO-xgboost-dm/output/xgboost-2023-01-20-01-45-52-042/data\",\n", ") # - file, bucket, key_prefix\n", "\n", "orig_bank_data = sess.upload_data(\n", " \"./data_xgb/bank-additional.csv\",\n", " def_bucket,\n", " \"sagemaker/DEMO-xgboost-dm/output/xgboost-2023-01-20-01-45-52-042/data\",\n", ") # - file, bucket, key_prefix\n", "print(orig_bank_data)\n", "print(s3_test_path)\n", "print(s3_y_path)" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "### Bring in the container with the specific version which will be used to run the model\n", "Here we need to have the container version be the same as the one which was used to train the model" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# cell 07\n", "# container = sagemaker.image_uris.retrieve(region=boto3.Session().region_name, framework='xgboost', version='latest')\n", "container = sagemaker.image_uris.retrieve(\n", " region=boto3.Session().region_name, framework=\"xgboost\", version=\"1.5-1\"\n", ")\n", "\n", "container" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Create the Inference script. SageMaker allows you to combine a pre trained model with your own inference script " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Cell 08\n", "!mkdir -p code_inference" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%writefile code_inference/model.py\n", "\n", "import json\n", "import os\n", "import pickle as pkl\n", "\n", "import numpy as np\n", "import sagemaker_xgboost_container.encoder as xgb_encoders\n", "import xgboost as xgb\n", "\n", "\n", "def model_fn(model_dir):\n", " \"\"\"\n", " Deserialize and return fitted model.\n", " \"\"\"\n", " model_file = \"xgboost-model\"\n", " booster = pkl.load(open(os.path.join(model_dir, model_file), \"rb\"))\n", " prin(f\"Model:loaded={booster}:\", flush=True)\n", " return booster\n", "\n", "\n", "def input_fn(request_body, request_content_type):\n", " \"\"\"\n", " The SageMaker XGBoost model server receives the request data body and the content type,\n", " and invokes the `input_fn`.\n", " Return a DMatrix (an object that can be passed to predict_fn).\n", " \"\"\"\n", " if request_content_type == \"text/libsvm\":\n", " return xgb_encoders.libsvm_to_dmatrix(request_body)\n", " elif request_content_type == \"text/csv\":\n", " input_np = np.array(request_body).reshape((1, -1))\n", " print(f\"Model:input:text:shape={input_np.shape}::\", flush=True)\n", " # return input_np\n", " return xgb.DMatrix(input_np)\n", " else:\n", " raise ValueError(\"Input Content type {} is not supported.\".format(request_content_type))\n", "\n", "\n", "def predict_fn(input_data, model):\n", " \"\"\"\n", " SageMaker XGBoost model server invokes `predict_fn` on the return value of `input_fn`.\n", " \"\"\"\n", " prediction = model.predict(input_data)\n", " print(f\"Model:prediction:shape={prediction.shape}:\", flush=True)\n", " return prediction # output\n", "\n", "\n", "def output_fn(predictions, content_type):\n", " \"\"\"\n", " After invoking predict_fn, the model server invokes `output_fn`.\n", " \"\"\"\n", " if content_type == \"text/csv\":\n", " return \",\".join(str(x) for x in predictions[0])\n", " else:\n", " raise ValueError(\"Return Content type {} is not supported.\".format(content_type))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "\n", "## Hosting\n", "\n", "### Real-time Inference endpoint\n", "Now that we've have loaded the pre trained `xgboost` model in S3, let's deploy a model that's hosted behind a real-time endpoint." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Cell 10\n", "from sagemaker.xgboost.model import XGBoostModel\n", "\n", "xgboost_model = XGBoostModel(\n", " model_data=s3_model_path,\n", " role=role,\n", " entry_point=\"model.py\",\n", " source_dir=\"code_inference\",\n", " framework_version=\"latest\", # \"1.0-1\"\n", ")\n", "\n", "xgboost_model" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### This cell can take a couple of minutes please be patient" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Cell 11\n", "xgb_predictor = xgboost_model.deploy(instance_type=\"ml.m4.xlarge\", initial_instance_count=1)\n", "\n", "xgb_predictor" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Cell 12\n", "print(xgb_predictor.serializer)\n", "print(xgb_predictor.deserializer)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Change the default serializers which is libsvm to CSV since we will be dealing with a CSV data set" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# cell 13\n", "xgb_predictor.serializer = (\n", " sagemaker.serializers.CSVSerializer()\n", ") # sagemaker.serializers.LibSVMSerializer\n", "xgb_predictor.deserializer = sagemaker.deserializers.CSVDeserializer()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "\n", "## Evaluation\n", "There are many ways to compare the performance of a machine learning model, but let's start by simply comparing actual to predicted values. In this case, we're simply predicting whether the customer subscribed to a term deposit (`1`) or not (`0`), which produces a simple confusion matrix.\n", "\n", "First we'll need to determine how we pass data into and receive data from our endpoint. Our data is currently stored as NumPy arrays in memory of our notebook instance. To send it in an HTTP POST request, we'll serialize it as a CSV string and then decode the resulting CSV.\n", "\n", "*Note: For inference with CSV format, SageMaker XGBoost requires that the data does NOT include the target variable.*" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We have a couple of options here , we can:\n", "1. Loop over our test dataset and split it into mini batches and send that into the model\n", "1. Or we could send the entire payload as is into the model. This is governed by the payload size which cannot exceed 6MB\n", "1. Snce our payload is smaller we can use the entire batch as is to feed\n", "1. This can be configured by the batch_size_to_run variable. For purpose of this lab we can keep it with a shorted value" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# cell 14\n", "test_x = pd.read_csv(\"./data_xgb/test_x.csv\", names=[f\"{i}\" for i in range(58)])\n", "test_y = pd.read_csv(\"./data_xgb/test_y.csv\", names=[\"y\"])\n", "print(test_x.shape)\n", "print(test_y.shape)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# cell 15\n", "batch_size_to_run = 20\n", "# - create a batch_size_to_run row Test data to be used for predictions -- .drop(test_x.columns[0], axis=1)\n", "test_array = test_x.iloc[:batch_size_to_run, :].to_numpy()\n", "y_array = test_y[\"y\"].values[:batch_size_to_run]\n", "test_array.shape # columns in this test data" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# cell 16\n", "# xgb_predictor.predict(test_array[0].tolist())\n", "# xgb_predictor.predict(test_array.tolist())\n", "\n", "predictions = xgb_predictor.predict(test_array)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# cell 17\n", "\n", "print(len(predictions[0]))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we'll check our confusion matrix to see how well we predicted versus actuals." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# cell 18\n", "# pd.crosstab(index=test_y['y'].values, columns=np.round(predictions), rownames=['actuals'], colnames=['predictions'])\n", "pd.crosstab(\n", " index=y_array,\n", " columns=np.round(np.array(predictions[0], dtype=np.float32)),\n", " rownames=[\"actuals\"],\n", " colnames=[\"predictions\"],\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "So, (since out batch size is 20 ) of the ~20 potential customers, this would be on that batch size. Please see for more details on Confusion Matrix [here](https://docs.aws.amazon.com/machine-learning/latest/dg/multiclass-model-insights.html).\n", "\n", "Run the below to get a P95 latency numbers for our model" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# cell 19\n", "\n", "# - get p95 numbers for Latency\n", "\n", "import numpy as np\n", "import time\n", "\n", "print(f\"Starting invocation for model::Real:Time please wait batch_size={batch_size_to_run}:.....\")\n", "results = []\n", "for i in range(0, 10):\n", " start = time.time()\n", " xgb_predictor.predict(test_array)\n", " results.append((time.time() - start) * 1000)\n", "print(\"\\nPredictions for model latency: \\n\")\n", "print(\"\\nP95: \" + str(np.percentile(results, 95)) + \" ms\\n\")\n", "print(\"P90: \" + str(np.percentile(results, 90)) + \" ms\\n\")\n", "print(\"Average: \" + str(np.average(results)) + \" ms\\n\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Serverless Invocations\n", "\n", "Create a Serverless Inference endpoint and show the initial cold-start issue and then subsequent calls run\n", "\n", "For serverless we need to specify only the concurrency and memory size of the model. For further reading you can refer to https://docs.aws.amazon.com/sagemaker/latest/dg/serverless-endpoints.html" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Cell 20\n", "serverless_inf_config = sagemaker.serverless.ServerlessInferenceConfig(\n", " memory_size_in_mb=1024, # 68 KB is the size of the model\n", " max_concurrency=5, # max invocations concurrently\n", ")\n", "serverless_inf_config" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# cell 21\n", "\n", "xgb_serverless_predictor = xgboost_model.deploy(\n", " instance_type=\"ml.m4.xlarge\",\n", " initial_instance_count=1,\n", " serverless_inference_config=serverless_inf_config,\n", ")\n", "\n", "xgb_serverless_predictor" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Cell 22\n", "print(xgb_serverless_predictor.serializer)\n", "print(xgb_serverless_predictor.deserializer)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### As with realtime we will change the serializers to match our data sets" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# cell 23\n", "xgb_serverless_predictor.serializer = (\n", " sagemaker.serializers.CSVSerializer()\n", ") # sagemaker.serializers.LibSVMSerializer\n", "xgb_serverless_predictor.deserializer = sagemaker.deserializers.CSVDeserializer()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Cold start for the serverless \n", "\n", "The very first time the request is sent, the serverless endpoint will spin up an instance and then run predictions on that. To save cost for the customer serverless will spin down all the instances after a certain time period of in-activity. See this for more details https://docs.aws.amazon.com/sagemaker/latest/dg/serverless-endpoints-monitoring.html" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%time\n", "# Cell 24\n", "\n", "# xgb_predictor.predict(test_array[0].tolist())\n", "# xgb_predictor.predict(test_array.tolist())\n", "\n", "predictions_serverless = xgb_serverless_predictor.predict(test_array)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Subsequent invocation will be a lot faster" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%time\n", "# Cell 25\n", "\n", "# xgb_predictor.predict(test_array[0].tolist())\n", "# xgb_predictor.predict(test_array.tolist())\n", "\n", "predictions_serverless = xgb_serverless_predictor.predict(test_array)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Cell 26\n", "print(len(predictions_serverless[0]))" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Cell 27\n", "# pd.crosstab(index=test_y['y'].values, columns=np.round(predictions), rownames=['actuals'], colnames=['predictions'])\n", "pd.crosstab(\n", " index=y_array,\n", " columns=np.round(np.array(predictions_serverless[0], dtype=np.float32)),\n", " rownames=[\"actuals\"],\n", " colnames=[\"predictions\"],\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Run invocations for the endpoint and get P95 numbers" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%time\n", "# cell 28\n", "\n", "# - get p95 numbers for Latency\n", "\n", "import numpy as np\n", "import time\n", "\n", "print(f\"Starting invocation for model::Real:Time please wait batch_size={batch_size_to_run}:.....\")\n", "results = []\n", "for i in range(0, 10):\n", " start = time.time()\n", " xgb_serverless_predictor.predict(test_array)\n", " results.append((time.time() - start) * 1000)\n", "print(\"\\nPredictions for model latency: \\n\")\n", "print(\"\\nP95: \" + str(np.percentile(results, 95)) + \" ms\\n\")\n", "print(\"P90: \" + str(np.percentile(results, 90)) + \" ms\\n\")\n", "print(\"Average: \" + str(np.average(results)) + \" ms\\n\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Start with the Asynchronous inference endpoint for deployment\n", "\n", "Upload Test data sets into multiple buckets. The idea behind this is to leverage Asynchronous inference by splitting \n", "the data sets into smaller data sets which can then be fed into the system for predictions. This simplifies the blast radius and makes it easier to debug vs a batch workload. The size of each payload should be ~1G maximum.\n", "We can simulate loading the Asynchronous inference queue using these location\n", "\n", "\n", "Once you have a model, create an Asynchronous inference configuration. Amazon SageMaker hosting services uses this configuration to deploy models. In the configuration, you identify one or more model that were created, to deploy the resources that you want Amazon SageMaker to provision. Specify the AsyncInferenceConfig object and provide an output Amazon S3 location for OutputConfig. You can optionally specify Amazon SNS topics on which to send notifications about prediction results.\n", "\n", "Asynchronous inference endpoint can spin down to all the way down to 0 Instances to save cost " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Cell 29\n", "import os\n", "\n", "# -- upload a set of 10 test data into S3 for A-SYNC\n", "s3_async_path_list = []\n", "for index in range(10):\n", " s3_async_path = sess.upload_data(\n", " \"./data_xgb/test_x.csv\",\n", " def_bucket,\n", " f\"sagemaker/DEMO-xgboost-dm/output/xgboost-2023-01-20-01-45-52-042/async/data{index}\",\n", " extra_args={\"ContentType\": \"text/csv\"},\n", " )\n", " s3_async_path_list.append(s3_async_path)\n", "\n", "print(len(s3_async_path_list))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Define where the output will go - it will be for each of the outputs we run through the system for prediction" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Cell 30\n", "async_output_path = f\"s3://{def_bucket}/sagemaker/DEMO-xgboost-dm/output/xgboost-2023-01-20-01-45-52-042/async/output\"\n", "print(async_output_path)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Now create the Asynchronous inference endpoint" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Cell 31\n", "async_inf_config = sagemaker.async_inference.AsyncInferenceConfig(\n", " output_path=async_output_path,\n", " max_concurrent_invocations_per_instance=2, # max invocations concurrently\n", ")\n", "async_inf_config" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Cell 32\n", "\n", "xgb_async_predictor = xgboost_model.deploy(\n", " instance_type=\"ml.m5.xlarge\",\n", " initial_instance_count=1,\n", " async_inference_config=async_inf_config,\n", ")\n", "\n", "xgb_async_predictor" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Cell 33\n", "xgb_async_predictor.endpoint_name" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Cell 34\n", "print(xgb_async_predictor.serializer)\n", "print(xgb_async_predictor.deserializer)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Change the serializers to match the model requirements" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# cell 35\n", "xgb_async_predictor.serializer = (\n", " sagemaker.serializers.CSVSerializer()\n", ") # sagemaker.serializers.LibSVMSerializer\n", "xgb_async_predictor.deserializer = sagemaker.deserializers.CSVDeserializer()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Input data for which you want the model to provide inference. \n", "\n", "The Asynchronous inference needs the data to be in s3 and below is a helper method to take the payload from memory and save it to S3 and then run the prediciton on the Asynchronous inference Endpoint\n", "\n", "If a serializer was specified in the encapsulated in the Predictor object, the result of the serializer is sent as input data. Otherwise the data must be sequence of bytes, and the predict method then upload the data to the S3 location\n", "\n", "The predictions will be stored in S3 location at async_output_path" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%time\n", "# Cell 36\n", "\n", "input_payload_location = s3_async_path_list[0]\n", "predictions_async = xgb_async_predictor.predict_async(\n", " data=test_array,\n", ")\n", "predictions_async" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Cell 37\n", "print(predictions_async.output_path)\n", "!aws s3 ls $predictions_async.output_path\n", "!echo \"The Output Path files in S3\"\n", "!aws s3 ls $async_output_path/" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### We can use the SDK API's to get the result or check from the S3 location \n", "Run the below only in case you see the output from the aws ls for async_output_path variable location " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Cell 38\n", "print(len(predictions_async.get_result()[0]))\n", "print(\"\\n\")\n", "print(predictions_async.output_path)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Cell 39\n", "# -- now view the outputs and copy to the local location for viewing results\n", "\n", "!aws s3 ls $predictions_async.output_path\n", "!aws s3 cp $predictions_async.output_path ./data_xgb" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### View the results using Pandas" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Cell 40\n", "\n", "file_name = predictions_async.output_path.split(\"/\")[-1]\n", "file_name = f\"./data_xgb/{file_name}\"\n", "\n", "pred_np = np.loadtxt(file_name, delimiter=\",\", dtype=np.float32).reshape(-1, 1)\n", "print(pred_np.shape)\n", "pred_np" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Run Predictions using the saved data sets\n", "\n", "This sends in the request - and the endpoint will process from the queue. It is not garunteed that the results will be available right away\n", "\n", "The saved data sets were the full 4119 rows of data and hence the length of predictions will not match the batch size we had specified earlier" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%time\n", "# Cell 41\n", "\n", "output_path_list = []\n", "for index in range(len(s3_async_path_list)):\n", " input_payload_location = s3_async_path_list[index]\n", " predictions_async = xgb_async_predictor.predict_async(\n", " # data=test_array,\n", " input_path=input_payload_location,\n", " )\n", " # print(len(predictions_async.get_result()[0]))\n", " print(predictions_async.output_path)\n", " output_path_list.append(predictions_async.output_path)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Cell 42\n", "!echo \"The Output Path files in S3 $async_output_path\"\n", "!aws s3 ls $async_output_path/" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "### Check Output Location\n", "\n", "Check the output location to see if the inference has been processed. We make multiple requests (beginning of the while True statement in the get_output function) every two seconds until there is an output of the inference request:\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Cell 43\n", "\n", "import urllib, time\n", "from botocore.exceptions import ClientError\n", "\n", "\n", "def get_output(output_location):\n", " start_time = time.time()\n", " output_url = urllib.parse.urlparse(output_location)\n", " bucket = output_url.netloc\n", " key = output_url.path[1:]\n", " while (\n", " time.time() - start_time\n", " ) * 1000 < 30: # 30 seconds max wait time - for now - ideally we have a 15 min SLA\n", " try:\n", " return sess.read_s3_file(bucket=output_url.netloc, key_prefix=output_url.path[1:])\n", " except ClientError as e:\n", " if e.response[\"Error\"][\"Code\"] == \"NoSuchKey\":\n", " print(f\"waiting for output...key={key}:\")\n", " time.sleep(2)\n", " continue\n", " print(f\"Timeout or finished method for key={key}:\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Cell 44\n", "# the length of predictions will be 1 per row of input data so it should total to 4119\n", "for output_location in output_path_list:\n", " output = get_output(output_location)\n", " print(f\"Output: {len(output.split(',')) }\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Cell 45\n", "# - check the shape of the output -- to match input size of 4119\n", "np.array(output.split(\",\")).shape" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Cross tab for the predictions" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Cell 46 (a)\n", "# - we can use the data and prtedictions we generated from in memory run\n", "pd.crosstab(\n", " index=y_array,\n", " columns=np.round(pred_np.reshape(-1)),\n", " rownames=[\"actuals\"],\n", " colnames=[\"predictions\"],\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Cell 46 (b)\n", "# - we can use the data from the stored results after Async finished the predictions.\n", "pred_np_array = np.array(output.split(\",\"), dtype=np.float32)[: len(y_array)]\n", "pd.crosstab(\n", " index=y_array,\n", " columns=np.round(pred_np_array),\n", " rownames=[\"actuals\"],\n", " colnames=[\"predictions\"],\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Congratulations End of Lab for deployment Options" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Below section is optional and depends on service limits in your accounts" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Now we create a autoscaling policy for the end point. For this we will leverage the boto3 API" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This section describes how to configure autoscaling on your asynchronous endpoint using Application Autoscaling. You need to first register your endpoint variant with Application Autoscaling, define a scaling policy, and then apply the scaling policy. In this configuration, we use a custom metric, CustomizedMetricSpecification, called ApproximateBacklogSizePerInstance. Please refer to the SageMaker Developer guide for a detailed list of metrics available with your asynchronous inference endpoint." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Cell 47\n", "sm_client = boto3.client(\"sagemaker\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Cell 48\n", "sm_client.describe_endpoint(EndpointName=xgb_async_predictor.endpoint_name)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Cell 49\n", "client = boto3.client(\n", " \"application-autoscaling\"\n", ") # Common class representing Application Auto Scaling for SageMaker amongst other services\n", "\n", "resource_id = (\n", " \"endpoint/\" + xgb_async_predictor.endpoint_name + \"/variant/\" + \"AllTraffic\"\n", ") # This is the format in which application autoscaling references the endpoint\n", "\n", "# Configure Autoscaling on asynchronous endpoint down to zero instances\n", "response = client.register_scalable_target(\n", " ServiceNamespace=\"sagemaker\",\n", " ResourceId=resource_id,\n", " ScalableDimension=\"sagemaker:variant:DesiredInstanceCount\",\n", " MinCapacity=1,\n", " MaxCapacity=3,\n", ")\n", "\n", "# - use a lower value to simulate the autoscaling to kick in\n", "response = client.put_scaling_policy(\n", " PolicyName=\"Invocations-ScalingPolicy\",\n", " ServiceNamespace=\"sagemaker\", # The namespace of the AWS service that provides the resource.\n", " ResourceId=resource_id, # Endpoint name\n", " ScalableDimension=\"sagemaker:variant:DesiredInstanceCount\", # SageMaker supports only Instance Count\n", " PolicyType=\"TargetTrackingScaling\", # 'StepScaling'|'TargetTrackingScaling'\n", " TargetTrackingScalingPolicyConfiguration={\n", " \"TargetValue\": 2.0, # The target value for the metric. - here the metric is - ApproximateBacklogSizePerInstance\n", " \"CustomizedMetricSpecification\": {\n", " \"MetricName\": \"ApproximateBacklogSizePerInstance\",\n", " \"Namespace\": \"AWS/SageMaker\",\n", " \"Dimensions\": [{\"Name\": \"EndpointName\", \"Value\": xgb_async_predictor.endpoint_name}],\n", " \"Statistic\": \"Average\",\n", " },\n", " \"ScaleInCooldown\": 600, # The cooldown period helps you prevent your Auto Scaling group from launching or terminating\n", " # additional instances before the effects of previous activities are visible.\n", " # You can configure the length of time based on your instance startup time or other application needs.\n", " # ScaleInCooldown - The amount of time, in seconds, after a scale in activity completes before another scale in activity can start.\n", " \"ScaleOutCooldown\": 1 # ScaleOutCooldown - The amount of time, in seconds, after a scale out activity completes before another scale out activity can start.\n", " # 'DisableScaleIn': True|False - ndicates whether scale in by the target tracking policy is disabled.\n", " # If the value is true , scale in is disabled and the target tracking policy won't remove capacity from the scalable resource.\n", " },\n", ")\n", "response" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### The end point is now ready for invocations with burst capacity " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Cell 50\n", "# - clean the output path so we can be sure of the response after scaling\n", "!aws s3 rm --recursive $async_output_path/ --quiet\n", "!aws s3 ls $async_output_path/ | wc -l" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%time\n", "# Cell 51\n", "# - invoke n times again\n", "\n", "output_path_list = []\n", "\n", "for index in range(len(s3_async_path_list)):\n", " for top_i in range(50):\n", " # for index in range( len (s3_async_path_list) ):\n", " input_payload_location = s3_async_path_list[index]\n", " predictions_async = xgb_async_predictor.predict_async(\n", " # data=test_array,\n", " input_path=input_payload_location,\n", " )\n", " # print(len(predictions_async.get_result()[0]))\n", " # print(predictions_async.output_path)\n", " output_path_list.append(predictions_async.output_path)\n", " time.sleep(0.5)\n", "\n", "print(f\"No of requests sent to the ASync endpoint are {len(output_path_list)} \")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Cell 52\n", "!echo \"The Output Path files in S3 $async_output_path\"\n", "!aws s3 ls $async_output_path/ | wc -l\n", "# !aws s3 ls --summarize --human-readable --recursive $async_output_path/" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Cell 53 (a)\n", "# Check the Scaling conditions\n", "scale_response = sm_client.describe_endpoint(EndpointName=xgb_async_predictor.endpoint_name)\n", "scale_response[\"EndpointName\"]" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Cell 53 (b)\n", "\n", "print(f\"Scaling endpoint status --- > {scale_response['EndpointStatus']}\")\n", "print(\n", " f\"Scaling endpoint instance count --- > {scale_response['ProductionVariants'][0]['CurrentInstanceCount']}\"\n", ")\n", "print(\n", " f\"Scaling endpoint Desired instance count --- > {scale_response['ProductionVariants'][0]['DesiredInstanceCount']}\"\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Run the below only in case you see scaling activities have finished - this can takea couple of minutes\n", "\n", "endpoint status should show as Inservice" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Cell 53\n", "# check one of the output files\n", "for output_location in output_path_list:\n", " output = get_output(output_location)\n", " print(f\"Output: {len(output.split(',')) }\")\n", " break" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### (Optional) Clean-up\n", "\n", "If you are done with this notebook, please run the cell below. This will remove the hosted endpoint you created and avoid any charges from a stray instance being left on." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Cell 54\n", "# - run this cell only in case you had set up the Scaling options\n", "response = client.deregister_scalable_target(\n", " ServiceNamespace=\"sagemaker\",\n", " ResourceId=resource_id,\n", " ScalableDimension=\"sagemaker:variant:DesiredInstanceCount\",\n", ")\n", "response" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Cell 55\n", "\n", "try:\n", " xgb_predictor.delete_endpoint(delete_endpoint_config=True)\n", " xgb_serverless_predictor.delete_endpoint(delete_endpoint_config=True)\n", " xgb_async_predictor.delete_endpoint(delete_endpoint_config=True)\n", "except:\n", " pass" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Clean the bucket and delete contents" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Cell 56\n", "!aws s3 rm --recursive $async_output_path/ --quiet" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Cell 57\n", "!aws s3 ls $async_output_path/" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Notebook CI Test Results\n", "\n", "This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.\n", "\n", "![This us-east-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/us-east-1/inference|structured|async|default_server|single_model|deploy_all_options_xgb.ipynb)\n", "\n", "![This us-east-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/us-east-2/inference|structured|async|default_server|single_model|deploy_all_options_xgb.ipynb)\n", "\n", "![This us-west-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/us-west-1/inference|structured|async|default_server|single_model|deploy_all_options_xgb.ipynb)\n", "\n", "![This ca-central-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ca-central-1/inference|structured|async|default_server|single_model|deploy_all_options_xgb.ipynb)\n", "\n", "![This sa-east-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/sa-east-1/inference|structured|async|default_server|single_model|deploy_all_options_xgb.ipynb)\n", "\n", "![This eu-west-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-west-1/inference|structured|async|default_server|single_model|deploy_all_options_xgb.ipynb)\n", "\n", "![This eu-west-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-west-2/inference|structured|async|default_server|single_model|deploy_all_options_xgb.ipynb)\n", "\n", "![This eu-west-3 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-west-3/inference|structured|async|default_server|single_model|deploy_all_options_xgb.ipynb)\n", "\n", "![This eu-central-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-central-1/inference|structured|async|default_server|single_model|deploy_all_options_xgb.ipynb)\n", "\n", "![This eu-north-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-north-1/inference|structured|async|default_server|single_model|deploy_all_options_xgb.ipynb)\n", "\n", "![This ap-southeast-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-southeast-1/inference|structured|async|default_server|single_model|deploy_all_options_xgb.ipynb)\n", "\n", "![This ap-southeast-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-southeast-2/inference|structured|async|default_server|single_model|deploy_all_options_xgb.ipynb)\n", "\n", "![This ap-northeast-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-northeast-1/inference|structured|async|default_server|single_model|deploy_all_options_xgb.ipynb)\n", "\n", "![This ap-northeast-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-northeast-2/inference|structured|async|default_server|single_model|deploy_all_options_xgb.ipynb)\n", "\n", "![This ap-south-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-south-1/inference|structured|async|default_server|single_model|deploy_all_options_xgb.ipynb)\n" ] } ], "metadata": { "celltoolbar": "Tags", "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science 2.0)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/sagemaker-data-science-38" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.13" }, "notice": "Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the \"License\"). You may not use this file except in compliance with the License. A copy of the License is located at http://aws.amazon.com/apache2.0/ or in the \"license\" file accompanying this file. This file is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License." }, "nbformat": 4, "nbformat_minor": 4 }