{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Regression with Amazon SageMaker XGBoost algorithm\n", "_**Single machine training for regression with Amazon SageMaker XGBoost algorithm**_\n", "\n", "---\n", "\n", "---\n", "## Contents\n", "1. [Introduction](#Introduction)\n", "2. [Setup](#Setup)\n", " 1. [Fetching the dataset](#Fetching-the-dataset)\n", " 2. [Data Ingestion](#Data-ingestion)\n", "3. [Training the XGBoost model](#Training-the-XGBoost-model)\n", " 1. [Plotting evaluation metrics](#Plotting-evaluation-metrics)\n", "4. [Set up hosting for the model](#Set-up-hosting-for-the-model)\n", " 1. [Import model into hosting](#Import-model-into-hosting)\n", " 2. [Create endpoint configuration](#Create-endpoint-configuration)\n", " 3. [Create endpoint](#Create-endpoint)\n", "5. [Validate the model for use](#Validate-the-model-for-use)\n", "\n", "---\n", "## Introduction\n", "\n", "This notebook demonstrates the use of Amazon SageMaker’s implementation of the XGBoost algorithm to train and host a regression model. We use the [Abalone data](https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/regression.html) originally from the [UCI data repository](https://archive.ics.uci.edu/ml/datasets/abalone). More details about the original dataset can be found [here](https://archive.ics.uci.edu/ml/machine-learning-databases/abalone/abalone.names). In the libsvm converted [version](https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/regression.html), the nominal feature (Male/Female/Infant) has been converted into a real valued feature. Age of abalone is to be predicted from eight physical measurements. \n", "\n", "---\n", "## Setup\n", "\n", "Let's start by specifying:\n", "1. The S3 bucket and prefix that you want to use for training and model data. This should be within the same region as the Notebook Instance, training, and hosting.\n", "\n", "Note : Replace REPLACE-BUCKET-NAME with the BucketName from the product provisioned using Service Catalog." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "isConfigCell": true, "tags": [ "parameters" ] }, "outputs": [], "source": [ "%%time\n", "\n", "import os\n", "import boto3\n", "import re\n", "from sagemaker import get_execution_role\n", "\n", "role = get_execution_role()\n", "region = boto3.Session().region_name\n", "accountID = boto3.client('sts').get_caller_identity()['Account']\n", "\n", "# Bucket to use for training data and model\n", "#Replace REPLACE-BUCKET-NAME with the BucketName from the product provisioned using Service Catalog.\n", "bucket='sc-324793933254-pp-fxgx3552-datascientists3bucket-1g1i8ohdfp832'\n", "\n", "print('Datascience project bucket is : ', bucket)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "prefix = 'sagemaker/DEMO-xgboost-regression'\n", "# customize to your bucket where you have stored the data\n", "bucket_path = 'https://s3-{}.amazonaws.com/{}'.format(region,bucket)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Fetching the dataset\n", "\n", "Following methods split the data into train/test/validation datasets and upload files to S3." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "\n", "import io\n", "import boto3\n", "import random\n", "\n", "def data_split(FILE_DATA, FILE_TRAIN, FILE_VALIDATION, FILE_TEST, PERCENT_TRAIN, PERCENT_VALIDATION, PERCENT_TEST):\n", " data = [l for l in open(FILE_DATA, 'r')]\n", " train_file = open(FILE_TRAIN, 'w')\n", " valid_file = open(FILE_VALIDATION, 'w')\n", " tests_file = open(FILE_TEST, 'w')\n", "\n", " num_of_data = len(data)\n", " num_train = int((PERCENT_TRAIN/100.0)*num_of_data)\n", " num_valid = int((PERCENT_VALIDATION/100.0)*num_of_data)\n", " num_tests = int((PERCENT_TEST/100.0)*num_of_data)\n", "\n", " data_fractions = [num_train, num_valid, num_tests]\n", " split_data = [[],[],[]]\n", "\n", " rand_data_ind = 0\n", "\n", " for split_ind, fraction in enumerate(data_fractions):\n", " for i in range(fraction):\n", " rand_data_ind = random.randint(0, len(data)-1)\n", " split_data[split_ind].append(data[rand_data_ind])\n", " data.pop(rand_data_ind)\n", "\n", " for l in split_data[0]:\n", " train_file.write(l)\n", "\n", " for l in split_data[1]:\n", " valid_file.write(l)\n", "\n", " for l in split_data[2]:\n", " tests_file.write(l)\n", "\n", " train_file.close()\n", " valid_file.close()\n", " tests_file.close()\n", "\n", "def write_to_s3(fobj, bucket, key):\n", " return boto3.Session(region_name=region).resource('s3').Bucket(bucket).Object(key).upload_fileobj(fobj)\n", "\n", "def upload_to_s3(bucket, channel, filename):\n", " fobj=open(filename, 'rb')\n", " key = prefix+'/'+channel\n", " url = 's3://{}/{}/{}'.format(bucket, key, filename)\n", " print('Writing to {}'.format(url))\n", " write_to_s3(fobj, bucket, key)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Data ingestion\n", "\n", "Next, we read the dataset from the existing repository into memory, for preprocessing prior to training. This processing could be done *in situ* by Amazon Athena, Apache Spark in Amazon EMR, Amazon Redshift, etc., assuming the dataset is present in the appropriate location. Then, the next step would be to transfer the data to S3 for use in training. For small datasets, such as this one, reading into memory isn't onerous, though it would be for larger datasets." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "import urllib.request\n", "\n", "# Load the dataset\n", "FILE_DATA = 'abalone'\n", "urllib.request.urlretrieve(\"https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/regression/abalone\", FILE_DATA)\n", "\n", "#split the downloaded data into train/test/validation files\n", "FILE_TRAIN = 'abalone.train'\n", "FILE_VALIDATION = 'abalone.validation'\n", "FILE_TEST = 'abalone.test'\n", "PERCENT_TRAIN = 70\n", "PERCENT_VALIDATION = 15\n", "PERCENT_TEST = 15\n", "data_split(FILE_DATA, FILE_TRAIN, FILE_VALIDATION, FILE_TEST, PERCENT_TRAIN, PERCENT_VALIDATION, PERCENT_TEST)\n", "\n", "#upload the files to the S3 bucket\n", "upload_to_s3(bucket, 'train', FILE_TRAIN)\n", "upload_to_s3(bucket, 'validation', FILE_VALIDATION)\n", "upload_to_s3(bucket, 'test', FILE_TEST)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Training the XGBoost model\n", "\n", "After setting training parameters, we kick off training, and poll for status until training is completed, which in this example, takes between 5 and 6 minutes." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.amazon.amazon_estimator import get_image_uri\n", "container = get_image_uri(region, 'xgboost')\n", "print(container)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "import boto3\n", "from time import gmtime, strftime\n", "\n", "job_name = 'DEMO-xgboost-regression-' + strftime(\"%Y-%m-%d-%H-%M-%S\", gmtime())\n", "print(\"Training job\", job_name)\n", "\n", "#Ensure that the training and validation data folders generated above are reflected in the \"InputDataConfig\" parameter below.\n", "\n", "create_training_params = \\\n", "{\n", " \"AlgorithmSpecification\": {\n", " \"TrainingImage\": container,\n", " \"TrainingInputMode\": \"File\"\n", " },\n", " \"RoleArn\": role,\n", " \"OutputDataConfig\": {\n", " \"S3OutputPath\": bucket_path + \"/\" + prefix + \"/single-xgboost\"\n", " },\n", " \"ResourceConfig\": {\n", " \"InstanceCount\": 1,\n", " \"InstanceType\": \"ml.m4.4xlarge\",\n", " \"VolumeSizeInGB\": 5\n", " },\n", " \"TrainingJobName\": job_name,\n", " \"HyperParameters\": {\n", " \"max_depth\":\"5\",\n", " \"eta\":\"0.2\",\n", " \"gamma\":\"4\",\n", " \"min_child_weight\":\"6\",\n", " \"subsample\":\"0.7\",\n", " \"silent\":\"0\",\n", " \"objective\":\"reg:linear\",\n", " \"num_round\":\"50\"\n", " },\n", " \"StoppingCondition\": {\n", " \"MaxRuntimeInSeconds\": 3600\n", " },\n", " \"InputDataConfig\": [\n", " {\n", " \"ChannelName\": \"train\",\n", " \"DataSource\": {\n", " \"S3DataSource\": {\n", " \"S3DataType\": \"S3Prefix\",\n", " \"S3Uri\": bucket_path + \"/\" + prefix + '/train',\n", " \"S3DataDistributionType\": \"FullyReplicated\"\n", " }\n", " },\n", " \"ContentType\": \"libsvm\",\n", " \"CompressionType\": \"None\"\n", " },\n", " {\n", " \"ChannelName\": \"validation\",\n", " \"DataSource\": {\n", " \"S3DataSource\": {\n", " \"S3DataType\": \"S3Prefix\",\n", " \"S3Uri\": bucket_path + \"/\" + prefix + '/validation',\n", " \"S3DataDistributionType\": \"FullyReplicated\"\n", " }\n", " },\n", " \"ContentType\": \"libsvm\",\n", " \"CompressionType\": \"None\"\n", " }\n", " ]\n", "}\n", "\n", "\n", "client = boto3.client('sagemaker', region_name=region)\n", "client.create_training_job(**create_training_params)\n", "\n", "import time\n", "\n", "status = client.describe_training_job(TrainingJobName=job_name)['TrainingJobStatus']\n", "print(status)\n", "while status !='Completed' and status!='Failed':\n", " time.sleep(60)\n", " status = client.describe_training_job(TrainingJobName=job_name)['TrainingJobStatus']\n", " print(status)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note that the \"validation\" channel has been initialized too. The SageMaker XGBoost algorithm actually calculates RMSE and writes it to the CloudWatch logs on the data passed to the \"validation\" channel." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Set up hosting for the model\n", "In order to set up hosting, we have to import the model from training to hosting. \n", "\n", "### Import model into hosting\n", "\n", "Register the model with hosting. This allows the flexibility of importing models trained elsewhere." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "import boto3\n", "from time import gmtime, strftime\n", "\n", "model_name=job_name + '-model'\n", "print(model_name)\n", "\n", "info = client.describe_training_job(TrainingJobName=job_name)\n", "model_data = info['ModelArtifacts']['S3ModelArtifacts']\n", "print(model_data)\n", "\n", "primary_container = {\n", " 'Image': container,\n", " 'ModelDataUrl': model_data\n", "}\n", "\n", "create_model_response = client.create_model(\n", " ModelName = model_name,\n", " ExecutionRoleArn = role,\n", " PrimaryContainer = primary_container)\n", "\n", "print(create_model_response['ModelArn'])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create endpoint configuration\n", "\n", "SageMaker supports configuring REST endpoints in hosting with multiple models, e.g. for A/B testing purposes. In order to support this, customers create an endpoint configuration, that describes the distribution of traffic across the models, whether split, shadowed, or sampled in some way. In addition, the endpoint configuration describes the instance type required for model deployment." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from time import gmtime, strftime\n", "\n", "endpoint_config_name = 'DEMO-XGBoostEndpointConfig-' + strftime(\"%Y-%m-%d-%H-%M-%S\", gmtime())\n", "print(endpoint_config_name)\n", "create_endpoint_config_response = client.create_endpoint_config(\n", " EndpointConfigName = endpoint_config_name,\n", " ProductionVariants=[{\n", " 'InstanceType':'ml.t2.medium',\n", " 'InitialVariantWeight':1,\n", " 'InitialInstanceCount':1,\n", " 'ModelName':model_name,\n", " 'VariantName':'AllTraffic'}])\n", "\n", "print(\"Endpoint Config Arn: \" + create_endpoint_config_response['EndpointConfigArn'])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create endpoint\n", "Lastly, the customer creates the endpoint that serves up the model, through specifying the name and configuration defined above. The end result is an endpoint that can be validated and incorporated into production applications. This takes 9-11 minutes to complete." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "%%time\n", "import time\n", "\n", "endpoint_name = 'DEMO-XGBoostEndpoint-' + strftime(\"%Y-%m-%d-%H-%M-%S\", gmtime())\n", "endpoint_name = 'DEMO-XGBoostEndpoint-2020-03-08-03-39-11'\n", "print(endpoint_name)\n", "create_endpoint_response = client.create_endpoint(\n", " EndpointName=endpoint_name,\n", " EndpointConfigName=endpoint_config_name)\n", "#print(create_endpoint_response['EndpointArn'])\n", "\n", "resp = client.describe_endpoint(EndpointName=endpoint_name)\n", "status = resp['EndpointStatus']\n", "print(\"Status: \" + status)\n", "\n", "while status=='Creating':\n", " time.sleep(60)\n", " resp = client.describe_endpoint(EndpointName=endpoint_name)\n", " status = resp['EndpointStatus']\n", " print(\"Status: \" + status)\n", "\n", "print(\"Arn: \" + resp['EndpointArn'])\n", "print(\"Status: \" + status)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Validate the model for use\n", "Finally, the customer can now validate the model for use. They can obtain the endpoint from the client library using the result from previous operations, and generate classifications from the trained model using that endpoint.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "runtime_client = boto3.client('runtime.sagemaker', region_name=region)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Start with a single prediction." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!head -1 abalone.test > abalone.single.test" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "import json\n", "from itertools import islice\n", "import math\n", "import struct\n", "\n", "file_name = 'abalone.single.test' #customize to your test file\n", "with open(file_name, 'r') as f:\n", " payload = f.read().strip()\n", "response = runtime_client.invoke_endpoint(EndpointName=endpoint_name, \n", " ContentType='text/x-libsvm', \n", " Body=payload)\n", "result = response['Body'].read()\n", "result = result.decode(\"utf-8\")\n", "result = result.split(',')\n", "result = [math.ceil(float(i)) for i in result]\n", "label = payload.strip(' ').split()[0]\n", "print ('Label: ',label,'\\nPrediction: ', result[0])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "OK, a single prediction works. Let's do a whole batch to see how good is the predictions accuracy." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sys\n", "import math\n", "def do_predict(data, endpoint_name, content_type):\n", " payload = '\\n'.join(data)\n", " response = runtime_client.invoke_endpoint(EndpointName=endpoint_name, \n", " ContentType=content_type, \n", " Body=payload)\n", " result = response['Body'].read()\n", " result = result.decode(\"utf-8\")\n", " result = result.split(',')\n", " preds = [float((num)) for num in result]\n", " preds = [math.ceil(num) for num in preds]\n", " return preds\n", "\n", "def batch_predict(data, batch_size, endpoint_name, content_type):\n", " items = len(data)\n", " arrs = []\n", " \n", " for offset in range(0, items, batch_size):\n", " if offset+batch_size < items:\n", " results = do_predict(data[offset:(offset+batch_size)], endpoint_name, content_type)\n", " arrs.extend(results)\n", " else:\n", " arrs.extend(do_predict(data[offset:items], endpoint_name, content_type))\n", " sys.stdout.write('.')\n", " return(arrs)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The following helps us calculate the Median Absolute Percent Error (MdAPE) on the batch dataset. " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "%%time\n", "import json\n", "import numpy as np\n", "\n", "with open(FILE_TEST, 'r') as f:\n", " payload = f.read().strip()\n", "\n", "labels = [int(line.split(' ')[0]) for line in payload.split('\\n')]\n", "test_data = [line for line in payload.split('\\n')]\n", "preds = batch_predict(test_data, 100, endpoint_name, 'text/x-libsvm')\n", "\n", "print('\\n Median Absolute Percent Error (MdAPE) = ', np.median(np.abs(np.array(labels) - np.array(preds)) / np.array(labels)))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Delete Endpoint\n", "Once you are done using the endpoint, you can use the following to delete it. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "##client.delete_endpoint(EndpointName=endpoint_name)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Copy the trained model to the IT tools account\n", "\n", "When ready to hand over the trained model and test data to IT, execute the cells below.\n", "It will kickoff the MLOps pipeline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#First download the model\n", "model_data = info['ModelArtifacts']['S3ModelArtifacts']\n", "print(model_data)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "bucket_and_key_list= model_data.split('/')\n", "print(bucket_and_key_list)\n", "source_bucket=bucket_and_key_list[3]\n", "print(source_bucket)\n", "key=\"\"\n", "for x in range(4, len(bucket_and_key_list)):\n", " key+=bucket_and_key_list[x]\n", " if(x != len(bucket_and_key_list) - 1):\n", " key+=\"/\"\n", "print(key)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import boto3\n", "\n", "s3 = boto3.client('s3')\n", "\n", "trained_model_file = 'local-model.tar.gz'\n", "s3.download_file(source_bucket, key, trained_model_file)\n", "print(\"Model downloaded \", trained_model_file)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tools_account_access_arn=!echo $tools_account_access_role_arn\n", "tools_bucket=!echo $tools_bucket_name\n", "\n", "print(\"tools_account_access_arn : \", tools_account_access_arn)\n", "print(\"tools_bucket : \", tools_bucket)\n", "#print(type(tools_bucket))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tools_model = 'models/model.tar.gz'\n", "tools_test_data = 'data/abalone.test'\n", "tools_validation_data = 'data/abalone.validation'\n", "\n", "local_validation_data = 'abalone.validation'\n", "local_test_data = 'abalone.test'" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sts_connection = boto3.client('sts')\n", "acct_b = sts_connection.assume_role(\n", " RoleArn=tools_account_access_arn[0],\n", " RoleSessionName=\"cross_acct_lambda\"\n", ")\n", "\n", "ACCESS_KEY = acct_b['Credentials']['AccessKeyId']\n", "SECRET_KEY = acct_b['Credentials']['SecretAccessKey']\n", "SESSION_TOKEN = acct_b['Credentials']['SessionToken']\n", "\n", "# create service client using the assumed role credentials, e.g. S3\n", "tools_s3_client = boto3.client(\n", " 's3',\n", " aws_access_key_id=ACCESS_KEY,\n", " aws_secret_access_key=SECRET_KEY,\n", " aws_session_token=SESSION_TOKEN,\n", ")\n", "\n", "print(\"tools_s3_client \", tools_s3_client)\n", "\n", "print(\"Copying trained model to IT account ...\")\n", "with open(trained_model_file, \"rb\") as f:\n", " tools_s3_client.upload_fileobj(f, tools_bucket[0], tools_model)\n", "print(\"Copying trained model to IT account - Done\")\n", "\n", "print(\"Copying validation data to IT account ...\")\n", "with open(local_validation_data, \"rb\") as f:\n", " tools_s3_client.upload_fileobj(f, tools_bucket[0], tools_validation_data)\n", "print(\"Copying validation data to IT account - Done\")\n", "\n", "print(\"Copying test data to IT account ...\")\n", "with open(local_test_data, \"rb\") as f:\n", " tools_s3_client.upload_fileobj(f, tools_bucket[0], tools_test_data)\n", "print(\"Copying test data to IT account - Done\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "anaconda-cloud": {}, "celltoolbar": "Tags", "kernelspec": { "display_name": "conda_python3", "language": "python", "name": "conda_python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.5" } }, "nbformat": 4, "nbformat_minor": 2 }