{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Amazon SageMaker Batch Transform: Associate prediction results with their corresponding input records\n", "_**Use SageMaker's XGBoost to train a binary classification model and for a list of tumors in batch file, predict if each is malignant**_\n", "\n", "_**It also shows how to use the input output joining / filter feature in Batch transform in details**_\n", "\n", "---\n", "\n", "## Setup\n", "\n", "Let's start by specifying:\n", "\n", "* The SageMaker role arn used to give training and batch transform access to your data. The snippet below will use the same role used by your SageMaker notebook instance. Otherwise, specify the full ARN of a role with the SageMakerFullAccess policy attached.\n", "* The S3 bucket that you want to use for training and storing model objects." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "import boto3\n", "import sagemaker\n", "from sagemaker import get_execution_role\n", "from time import gmtime, strftime\n", "\n", "role = get_execution_role()\n", "\n", "region = boto3.Session().region_name\n", "\n", "sagemaker_session = sagemaker.Session()\n", "\n", "bucket=sagemaker.Session().default_bucket()\n", "prefix = 'sagemaker/DEMO-xgboost-tripfare'" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%store\n", "%store -r" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## XGBoost Bring Your Own Model\n", "\n", "Amazon SageMaker includes functionality to support a hosted notebook environment, distributed, serverless training, and real-time hosting. We think it works best when all three of these services are used together, but they can also be used independently. Some use cases may only require hosting. Maybe the model was trained prior to Amazon SageMaker existing, in a different service.\n", "\n", "This section shows how to use a pre-existing trained XGBoost model with the Amazon SageMaker XGBoost Algorithm container to quickly create a hosted endpoint for that model. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "container = sagemaker.image_uris.retrieve(region=boto3.Session().region_name, framework='xgboost', version='1.2-2')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "\n", "model_file_name = \"DEMO-byo-xgboost-model\"\n", "model_name = model_file_name + strftime(\"%Y-%m-%d-%H-%M-%S\", gmtime())\n", "model_data = model_url\n", "print(model_data)\n", "sm_client = boto3.client(\"sagemaker\")\n", "\n", "\n", "primary_container = {\n", " \"Image\": container,\n", " \"ModelDataUrl\": model_data,\n", "}\n", "\n", "create_model_response2 = sm_client.create_model(\n", " ModelName=model_name, ExecutionRoleArn=role, PrimaryContainer=primary_container\n", ")\n", "\n", "print(create_model_response2[\"ModelArn\"])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "endpoint_config_name = \"DEMO-XGBoostEndpointConfig-\" + strftime(\"%Y-%m-%d-%H-%M-%S\", gmtime())\n", "print(endpoint_config_name)\n", "create_endpoint_config_response = sm_client.create_endpoint_config(\n", " EndpointConfigName=endpoint_config_name,\n", " ProductionVariants=[\n", " {\n", " \"InstanceType\": \"ml.m4.xlarge\",\n", " \"InitialInstanceCount\": 1,\n", " \"InitialVariantWeight\": 1,\n", " \"ModelName\": model_name,\n", " \"VariantName\": \"AllTraffic\",\n", " }\n", " ],\n", ")\n", "\n", "print(\"Endpoint Config Arn: \" + create_endpoint_config_response[\"EndpointConfigArn\"])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "import time\n", "\n", "endpoint_name = \"BYOM-XGBoostEndpoint-\" + strftime(\"%Y-%m-%d-%H-%M-%S\", gmtime())\n", "print(endpoint_name)\n", "create_endpoint_response = sm_client.create_endpoint(\n", " EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name\n", ")\n", "print(create_endpoint_response[\"EndpointArn\"])\n", "\n", "resp = sm_client.describe_endpoint(EndpointName=endpoint_name)\n", "status = resp[\"EndpointStatus\"]\n", "print(\"Status: \" + status)\n", "\n", "while status == \"Creating\":\n", " time.sleep(60)\n", " resp = sm_client.describe_endpoint(EndpointName=endpoint_name)\n", " status = resp[\"EndpointStatus\"]\n", " print(\"Status: \" + status)\n", "\n", "print(\"Arn: \" + resp[\"EndpointArn\"])\n", "print(\"Status: \" + status)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Before you run the next cell, make sure you wait till the endpoint is created. The code will check the endpoint status every 60 seconds.\n", "Make sure you see the status of the endpoint is \"InService\" then move to next cell." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import awswrangler as wr\n", "test_df = wr.s3.read_csv(\n", " path=test_path, dataset=True, nrows=5, header=None\n", " )" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import io\n", "import csv\n", "\n", "runtime_client = boto3.client(\"runtime.sagemaker\")\n", "\n", "data = test_df.iloc[:,1:].to_numpy()\n", "\n", "results = []\n", "csv_buffer = io.StringIO()\n", "csv_writer = csv.writer(csv_buffer, delimiter=\",\")\n", "for record in data:\n", " csv_writer.writerow(record)\n", "\n", "response = runtime_client.invoke_endpoint(\n", " EndpointName=endpoint_name, ContentType=\"text/csv\", Body=csv_buffer.getvalue()\n", " )\n", "print(\"Predicted Class Probabilities: {}.\".format(response[\"Body\"].read().decode(\"ascii\")))\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Batch Transform\n", "\n", "\n", "In SageMaker Batch Transform, we introduced 3 new attributes - __input_filter__, __join_source__ and __output_filter__. In the below cell, we use the [SageMaker Python SDK](https://github.com/aws/sagemaker-python-sdk) to kick-off several Batch Transform jobs using different configurations of these 3 new attributes. Please refer to [this page](https://docs.aws.amazon.com/sagemaker/latest/dg/batch-transform-data-processing.html) to learn more about how to use them.\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### 1. Create a transform job with the default configurations\n", "Let's first skip these 3 new attributes and inspect the inference results. We'll use it as a baseline to compare to the results with data processing." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.model import Model\n", "xgb_model = Model(\n", " image_uri=container,\n", " model_data=model_url,\n", " role=role,\n", " name=model_file_name + strftime(\"%Y-%m-%d-%H-%M-%S\", gmtime()),\n", " sagemaker_session=sagemaker_session,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### 2. Join the input and the prediction results \n", "Now, let's associate the prediction results with their corresponding input records. We can also use the __input_filter__ to exclude the ID column easily and there's no need to have a separate file in S3.\n", "\n", "* Set __input_filter__ to \"$[1:]\": indicates that we are excluding column 0 (the 'ID') before processing the inferences and keeping everything from column 1 to the last column (all the features or predictors) \n", " \n", " \n", "* Set __join_source__ to \"Input\": indicates our desire to join the input data with the inference results \n", "\n", "* Leave __output_filter__ to default ('$'), indicating that the joined input and inference results be will saved as output." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "xgb_transformer = xgb_model.transformer(instance_count=2, instance_type=\"ml.m4.xlarge\")\n", "\n", "# content_type / accept and split_type / assemble_with are required to use IO joining feature\n", "xgb_transformer.assemble_with = \"Line\"\n", "xgb_transformer.accept = \"text/csv\"\n", "\n", "# start a transform job\n", "xgb_transformer.transform(test_path, \n", " content_type=\"text/csv\", \n", " split_type=\"Line\",\n", " input_filter=\"$[1:]\",\n", " join_source=\"Input\",\n", " )\n", "xgb_transformer.wait()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Please wait until the batch transform job finishes before executing the following code.\n", "\n", "Let's inspect the output of the Batch Transform job in S3. It should show the list of trips identified by their original feature columns and their corresponding predicted trip fares." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import json\n", "import io\n", "from urllib.parse import urlparse\n", "\n", "def get_csv_output_from_s3(s3uri):\n", " parsed_url = urlparse(s3uri)\n", " bucket_name = parsed_url.netloc\n", " prefix = parsed_url.path[1:]\n", " s3 = boto3.client(\"s3\")\n", " obj_key = s3.list_objects(Bucket=bucket_name, Prefix=prefix)[\"Contents\"][0][\"Key\"]\n", " return s3.get_object(Bucket=bucket_name, Key=obj_key)[\"Body\"].read().decode('utf-8')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!aws s3 ls $xgb_transformer.output_path/" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "output_df = get_csv_output_from_s3(xgb_transformer.output_path)\n", "output_df.split('\\n')[0]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## (Optional) Cleanup" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sagemaker_session.delete_endpoint(endpoint_name)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "\n", "## End of Lab 3" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "conda_python3", "language": "python", "name": "conda_python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.13" } }, "nbformat": 4, "nbformat_minor": 4 }