{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Train, host, and optimize 50+ XGBoost models in a multi-model endpoint for millisecond latency\n", "This example demonstrate hosting 51 State-wise ML models in a SageMaker Multi-Model Endpoint to predict customer churn based on account usage. The models are trained using a synthetic telecommunication customer churn dataset and SageMaker's built-in XGBoost algorithm. We will host this multi-model endpoint on two instance types: `ml.c5.xlarge` and `ml.c5.2xlarge` and compare the performance with a load test in order to find out an optimal hosting architecture. We will analyze the load testing results in Amazon CloudWatch.\n", "\n", "Instead of hosting 51 models in 51 endpoints as illustrated below,\n", "\n", "<img src=\"./images/sagemaker-model-deployment-multi-endpoints.png\" width=\"700\" />\n", "<!--  -->\n", "\n", "We can host 51 models in one endpoint and load models dynamically from S3.\n", "\n", "<img src=\"./images/sagemaker-model-deployment-multimodel.png\" width=\"700\" />\n", "<!--  -->\n", "\n", "Amazon CloudWatch dashboard to show endpoint performance.\n", "\n", "<img src=\"./images/mme-load-testing-combined-2.png\" width=\"700\" />\n", "<!--  -->" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This notebook is developed in SageMaker Studio using `Python 3 (Data Science)` kernel with a ml.t3.medium instance.\n", "\n", "First we install a library `sagemaker-experiment` to manage the training jobs." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!pip install -q sagemaker-experiments" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Import the libraries and set up the SageMaker resources." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sagemaker\n", "import os, sys\n", "import json\n", "import boto3\n", "import numpy as np\n", "import pandas as pd\n", "\n", "role = sagemaker.get_execution_role()\n", "sess = sagemaker.Session()\n", "region = sess.boto_region_name\n", "bucket = sess.default_bucket()\n", "prefix = 'sagemaker/reinvent21-aim408/churn-mme'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The dataset is a customer churn dataset from a synthetic telecommunication use case. We download the data from source." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sagemaker.s3.S3Downloader.download('s3://sagemaker-sample-files/datasets/tabular/synthetic/churn.txt', './')\n", "df=pd.read_csv('churn.txt')\n", "df['CustomerID']=df.index\n", "df.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We perform minimal data preprocessing: \n", "1. replacing binary columns from string type to integers (0 & 1).\n", "2. setting CustomerID as the dataframe index and move the target column to the first column for XGBoost training." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "binary_columns=[\"Int'l Plan\", \"VMail Plan\"]\n", "df[binary_columns] = df[binary_columns].replace(to_replace=['yes', 'no'], \n", " value=[1, 0])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df['Churn?'] = df['Churn?'].replace(to_replace=['True.', 'False.'], \n", " value=[1, 0])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "columns=['Churn?', 'State', 'Account Length', \"Int'l Plan\",\n", " 'VMail Plan', 'VMail Message', 'Day Mins', 'Day Calls', 'Day Charge',\n", " 'Eve Mins', 'Eve Calls', 'Eve Charge', 'Night Mins', 'Night Calls',\n", " 'Night Charge', 'Intl Mins', 'Intl Calls', 'Intl Charge',\n", " 'CustServ Calls']\n", "df.index = df['CustomerID']\n", "df_processed = df[columns]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The processed data shown below." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df_processed.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We hold out 10% of data as a test set, stratified by `State`. The remaining data will be further split into train and validation set later right before training." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sklearn.model_selection import train_test_split\n", "df_train, df_test = train_test_split(df_processed, test_size=0.1, random_state=42, \n", " shuffle=True, stratify=df_processed['State'])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Save the test data into S3 bucket. Two version of the test data are saved, one that has complete data, and the other one without target and index for inference purposes. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "columns_no_target=['Account Length', \"Int'l Plan\", 'VMail Plan', 'VMail Message', \n", " 'Day Mins', 'Day Calls', 'Day Charge', 'Eve Mins', 'Eve Calls', \n", " 'Eve Charge', 'Night Mins', 'Night Calls', 'Night Charge', \n", " 'Intl Mins', 'Intl Calls', 'Intl Charge', 'CustServ Calls']\n", "\n", "df_test.to_csv('churn_test.csv')\n", "df_test[columns_no_target].to_csv('churn_test_no_target.csv', \n", " index=False)\n", "\n", "sagemaker.s3.S3Uploader.upload('churn_test.csv', \n", " f's3://{bucket}/{prefix}/churn_data')\n", "sagemaker.s3.S3Uploader.upload('churn_test_no_target.csv', \n", " f's3://{bucket}/{prefix}/churn_data')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We set up an experiment in SageMaker to hold all the training job information." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.amazon.amazon_estimator import image_uris\n", "from smexperiments.experiment import Experiment\n", "from smexperiments.trial import Trial\n", "from botocore.exceptions import ClientError\n", "import time\n", "from time import gmtime, strftime\n", "\n", "dict_estimator = {}\n", "\n", "experiment_name = 'churn-prediction'\n", "\n", "try:\n", " experiment = Experiment.create(\n", " experiment_name=experiment_name, \n", " description='Training churn prediction models based on telco churn dataset.')\n", "except ClientError as e:\n", " experiment = Experiment.load(experiment_name)\n", " print(f'{experiment_name} experiment already exists! Reusing the existing experiment.')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For convenience, we create a function `launch_training_job()` so that later we can reuse it in a loop through the States. The training algorithm used here is SageMaker's built-in XGBoost algorithm with 20 rounds of training as the only hyperparameter we specify. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "image = image_uris.retrieve(region=region, framework='xgboost', version='1.3-1')\n", "train_instance_type = 'ml.m5.xlarge'\n", "train_instance_count = 1\n", "s3_output = f's3://{bucket}/{prefix}/churn_data/training'\n", "\n", "def launch_training_job(state, train_data_s3, val_data_s3):\n", " exp_datetime = strftime('%Y-%m-%d-%H-%M-%S', gmtime())\n", " jobname = f'churn-xgb-{state}-{exp_datetime}'\n", "\n", " # Creating a new trial for the experiment\n", " exp_trial = Trial.create(experiment_name=experiment_name, \n", " trial_name=jobname)\n", "\n", " experiment_config={'ExperimentName': experiment_name,\n", " 'TrialName': exp_trial.trial_name,\n", " 'TrialComponentDisplayName': 'Training'}\n", "\n", " xgb = sagemaker.estimator.Estimator(image,\n", " role,\n", " instance_count=train_instance_count,\n", " instance_type=train_instance_type,\n", " output_path=s3_output,\n", " enable_sagemaker_metrics=True,\n", " sagemaker_session=sess)\n", " xgb.set_hyperparameters(objective='binary:logistic', \n", " num_round=20)\n", " \n", " train_input = sagemaker.inputs.TrainingInput(s3_data=train_data_s3, \n", " content_type='csv')\n", " val_input = sagemaker.inputs.TrainingInput(s3_data=val_data_s3, \n", " content_type='csv')\n", " data_channels={'train': train_input, 'validation': val_input}\n", " \n", " xgb.fit(inputs=data_channels, \n", " job_name=jobname, \n", " experiment_config=experiment_config, \n", " wait=False)\n", "\n", " return xgb" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We isolate the data points by `State`, create train and validation sets for each `State` and train models by `State` using `launch_training_job()`. Again we hold out 10% as validation set in each `State`. We save the estimators in a dictionary `dict_estimator`. \n", "\n", "Execute the next four cells to launch the training jobs if this is the first time running the demo. There will be 51 training jobs submitted. We implemented a function `wait_for_training_quota()` to check for the current job count and limit the total training job in this experiment to `job_limit`. If the job count is at the limit, the function waits number of seconds specified in `wait` argument and check the job count again. This is to account for account level SageMaker quota that may cause error in the for loop. The default service quota for *Number of instances across training jobs* and *number of ml.m5.xlarge instances* are 4 as documented in [Service Quota page](https://docs.aws.amazon.com/general/latest/gr/sagemaker.html#limits_sagemaker). If your account has a higher limit, you may change the `job_limit` to a higher number to allow more simultaneous training jobs (therefore faster). You can also [request a quota increase](https://docs.aws.amazon.com/general/latest/gr/aws_service_limits.html).\n", "\n", "If you already have run the training jobs from this notebook and have completed trials in SageMaker Experiments, you can proceed to [loading the existing estimators](#loading-estimators)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def wait_for_training_quota(dict_estimator, job_limit = 4, wait = 30):\n", " def query_jobs(dict_estimator):\n", " counter=0\n", " for key, estimator in dict_estimator.items():\n", " status = estimator.latest_training_job.describe()[\"TrainingJobStatus\"]\n", " time.sleep(2)\n", " if status == \"InProgress\":\n", " counter+=1\n", " return counter\n", " \n", " job_count = query_jobs(dict_estimator)\n", " if job_count < job_limit:\n", " print(f'Current total running jobs {job_count} is below {job_limit}. Proceeding...')\n", " return \n", " \n", " while job_count >= job_limit:\n", " print(f'Current total running jobs {job_count} is reaching the limit {job_limit}. Waiting {wait} seconds...')\n", " time.sleep(wait)\n", " job_count = query_jobs(dict_estimator)\n", "\n", " print(f'Current total running jobs {job_count} is below {job_limit}. Proceeding...')" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "os.makedirs('churn_data_by_state', exist_ok=True)\n", "\n", "for state in df_processed.State.unique():\n", " print(state)\n", " output_dir = f's3://{bucket}/{prefix}/churn_data/by_state'\n", " out_train_csv_s3 = f's3://{bucket}/{prefix}/churn_data/by_state/churn_{state}_train.csv'\n", " out_val_csv_s3 = f's3://{bucket}/{prefix}/churn_data/by_state/churn_{state}_val.csv'\n", " \n", " # create train/val split for each State\n", " df_state = df_train[df_train['State']==state].drop(labels='State', axis=1)\n", " df_state_train, df_state_val = train_test_split(df_state, \n", " test_size=0.1, \n", " random_state=42, \n", " shuffle=True, \n", " stratify=df_state['Churn?'])\n", " \n", " df_state_train.to_csv(f'churn_data_by_state/churn_{state}_train.csv', index=False)\n", " df_state_val.to_csv(f'churn_data_by_state/churn_{state}_val.csv', index=False)\n", " sagemaker.s3.S3Uploader.upload(f'churn_data_by_state/churn_{state}_train.csv', output_dir)\n", " sagemaker.s3.S3Uploader.upload(f'churn_data_by_state/churn_{state}_val.csv', output_dir)\n", " \n", " wait_for_training_quota(dict_estimator, job_limit=4, wait=30)\n", " \n", " dict_estimator[state] = launch_training_job(state, out_train_csv_s3, out_val_csv_s3)\n", " \n", " time.sleep(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Wait for all jobs to complete." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def wait_for_training_job_to_complete(estimator):\n", " job = estimator.latest_training_job.job_name\n", " print(f\"Waiting for job: {job}\")\n", " status = estimator.latest_training_job.describe()[\"TrainingJobStatus\"]\n", " while status == \"InProgress\":\n", " time.sleep(45)\n", " status = estimator.latest_training_job.describe()[\"TrainingJobStatus\"]\n", " if status == \"InProgress\":\n", " print(f\"{job} job status: {status}\")\n", " print(f\"DONE. Status for {job} is {status}\\n\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "for est in list(dict_estimator.values()):\n", " wait_for_training_job_to_complete(est)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "<a name=\"loading-estimators\"></a>The code snippet below is to retrieve the estimators from the experiment trials. It is useful when you have already trained the models but somehow lost the dictionary `dict_estimator` and want to resume the work.\n", "\n", "```python\n", "dict_estimator={}\n", "experiment = Experiment.load(experiment_name)\n", "for i, j in enumerate(experiment.list_trials()):\n", " print(i, j.trial_name)\n", " jobname=j.trial_name\n", " state=jobname.split('-')[2]\n", " print(state)\n", " try:\n", " dict_estimator[state]=sagemaker.estimator.Estimator.attach(jobname)\n", " except:\n", " pass\n", "```" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "## Uncomment this part to load the estimators if you already have trained them.\n", "# dict_estimator={}\n", "# experiment = Experiment.load(experiment_name)\n", "# for i, j in enumerate(experiment.list_trials()):\n", "# print(i, j.trial_name)\n", "# jobname=j.trial_name\n", "# state=jobname.split('-')[2]\n", "# print(state)\n", "# try:\n", "# dict_estimator[state]=sagemaker.estimator.Estimator.attach(jobname)\n", "# except:\n", "# pass" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Once the training are completed, we can start hosting our multimodel endpoint. We host our State-wise multi-model endpoint in two different instances: `ml.c5.xlarge` and `ml.c5.2xlarge`. And we will be conducting load testing to profile the performance." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(len(dict_estimator))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(dict_estimator.keys())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Here we designate a S3 location to hold all the model artifacts we would like to host. At any time (before or after the endpoint is created), we can dynamically add models to the designated model artifacts folder, making multi-model endpoint a flexible tool to serve models at scale." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model_data_prefix = f's3://{bucket}/{prefix}/churn_data/multi_model_artifacts/'" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "for state, est in dict_estimator.items():\n", " artifact_path = est.model_data\n", " state_model_name = f'churn-xgb-{state}.tar.gz'\n", " print(f'Copying {state_model_name} to multi_model_artifacts folder')\n", " # This is copying over the model artifact to the S3 location for the MME.\n", " !aws s3 --quiet cp {artifact_path} {model_data_prefix}{state_model_name}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Endpoint creation is a three-step process with the API. `create_model()`==>`create_endpoint_config()`==>`creat_endpoint()`.\n", "\n", "Create our first endpoint with `ml.c5.xlarge` instance which has 4 vCPU and 8 GB RAM." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "exp_datetime = strftime('%Y-%m-%d-%H-%M-%S', gmtime())\n", "model_name = f'churn-xgb-mme-{exp_datetime}'\n", "\n", "hosting_instance_type = 'ml.c5.xlarge'\n", "hosting_instance_count = 1\n", "\n", "endpoint_name = f'{model_name}-c5-xl'\n", "\n", "# image = image_uris.retrieve(region=region, framework='xgboost', version='1.3-1')\n", "container = {'Image': image, \n", " 'ModelDataUrl': model_data_prefix, \n", " 'Mode': 'MultiModel'}\n", "\n", "response1 = sess.sagemaker_client.create_model(ModelName = model_name,\n", " ExecutionRoleArn = role,\n", " Containers = [container])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response2 = sess.sagemaker_client.create_endpoint_config(\n", " EndpointConfigName = endpoint_name,\n", " ProductionVariants = [{'InstanceType': hosting_instance_type,\n", " 'InitialInstanceCount': hosting_instance_count,\n", " 'InitialVariantWeight': 1,\n", " 'ModelName': model_name,\n", " 'VariantName': 'AllTraffic'}])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response3 = sess.sagemaker_client.create_endpoint(EndpointName = endpoint_name,\n", " EndpointConfigName = endpoint_name)\n", "print(endpoint_name)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We create another endpoint with `ml.c5.2xlarge` which has 8 vCPU and 16 GB RAM." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hosting_instance_type = 'ml.c5.2xlarge'\n", "hosting_instance_count = 1\n", "\n", "endpoint_name_2 = f'{model_name}-c5-2xl'\n", "\n", "response4 = sess.sagemaker_client.create_endpoint_config(\n", " EndpointConfigName = endpoint_name_2,\n", " ProductionVariants = [{'InstanceType': hosting_instance_type,\n", " 'InitialInstanceCount': hosting_instance_count,\n", " 'InitialVariantWeight': 1,\n", " 'ModelName': model_name, # re-using the model\n", " 'VariantName': 'AllTraffic'}])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response5 = sess.sagemaker_client.create_endpoint(EndpointName = endpoint_name_2,\n", " EndpointConfigName = endpoint_name_2)\n", "print(endpoint_name_2)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "waiter = sess.sagemaker_client.get_waiter('endpoint_in_service')\n", "print(f'Waiting for endpoint {endpoint_name} to create...')\n", "waiter.wait(EndpointName=endpoint_name)\n", "print(f'Waiting for endpoint {endpoint_name_2} to create...')\n", "waiter.wait(EndpointName=endpoint_name_2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's move our load testing to [AWS Cloud9](https://console.aws.amazon.com/cloud9/home?region=us-east-1). You could also use your local computer to run the load testing." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### (Optional) Enable autoscaling\n", "We have verified the baseline single instance performance, let's apply a autoscaling policy to allow scale in/out between 2 to 5 instances for variable traffic to ensure performance. Here we use a predefined metric `SageMakerVariantInvocationsPerInstance` with a `TargetValue` 4,000 to balance the load to 4,000 requests per instance." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Common class representing Application Auto Scaling for SageMaker amongst other services\n", "client = boto3.client('application-autoscaling')\n", "\n", "# This is the format in which application autoscaling references the endpoint\n", "resource_id=f'endpoint/{endpoint_name_2}/variant/AllTraffic' \n", "\n", "response = client.register_scalable_target(\n", " ServiceNamespace='sagemaker', \n", " ResourceId=resource_id,\n", " ScalableDimension='sagemaker:variant:DesiredInstanceCount',\n", " MinCapacity=2, \n", " MaxCapacity=5\n", ")\n", "\n", "response = client.put_scaling_policy(\n", " PolicyName='Invocations-ScalingPolicy',\n", " ServiceNamespace='sagemaker', # The namespace of the AWS service that provides the resource. \n", " ResourceId=resource_id, # Endpoint name \n", " ScalableDimension='sagemaker:variant:DesiredInstanceCount', # SageMaker supports only Instance Count\n", " PolicyType='TargetTrackingScaling', # 'StepScaling'|'TargetTrackingScaling'\n", " TargetTrackingScalingPolicyConfiguration={\n", " 'TargetValue': 4000, # The target value for the metric: ApproximateBacklogSizePerInstance. \n", " 'PredefinedMetricSpecification': {\n", " 'PredefinedMetricType': 'SageMakerVariantInvocationsPerInstance', \n", " },\n", " 'ScaleInCooldown': 600, # The cooldown period helps you prevent your Auto Scaling group from launching or terminating \n", " # additional instances before the effects of previous activities are visible. \n", " # You can configure the length of time based on your instance startup time or other application needs.\n", " # ScaleInCooldown - The amount of time, in seconds, after a scale in activity completes before another scale in activity can start. \n", " 'ScaleOutCooldown': 300,# ScaleOutCooldown - The amount of time, in seconds, after a scale out activity completes before another scale out activity can start.\n", " \n", " 'DisableScaleIn': False,# Indicates whether scale in by the target tracking policy is disabled. \n", " # If the value is true , scale in is disabled and the target tracking policy won't \n", " # remove capacity from the scalable resource.\n", " }\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "After you are done with the load-testing, uncomment and run the next cell to delete endpoints to stop incurring cost." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# sess.sagemaker_client.delete_endpoint(EndpointName=endpoint_name)\n", "# sess.sagemaker_client.delete_endpoint(EndpointName=endpoint_name_2)" ] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 4 }