{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"# Bringing your own data processing code to SageMaker Autopilot\n"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"---\n",
"\n",
"This notebook's CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook. \n",
"\n",
"\n",
"\n",
"---"
]
},
{
"cell_type": "markdown",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"In a typical machine learning model building process, data scientists are required to manually prepare the features, select the algorithm, and optimize model parameters. It takes lots of effort and expertise. SageMaker Autopilot (https://aws.amazon.com/sagemaker/) removes the heavy lifting. It inspects your data set, and runs a number of candidates to figure out the optimal combination of data preprocessing steps, machine learning algorithms and hyperparameters. You can easily deploy either on a real-time endpoint or for batch processing. \n",
"\n",
"In some cases, customer wants to have the flexibility to bring custom data processing code to SageMaker Autopilot. For example, customer might have datasets with large number of independent variables. Customer would like to have a custom feature selection step to remove irrelevant variables first. The resulted smaller dataset is then used to launch SageMaker Autopilot job. Customer would also like to include both the custom processing code and models from SageMaker Autopilot for easy deployment—either on a real-time endpoint or for batch processing. We will demonstrate how to achieve this in this notebook. \n",
"\n",
"\n",
"## Table of contents\n",
"* [Setup](#setup)\n",
" * [Generate dataset](#data_gene)\n",
" * [Upload data to S3](#upload)\n",
"* [Feature Selection](#feature_selection)\n",
" * [Prepare Feature Selection Script](#feature_script)\n",
" * [Create SageMaker Scikit Estimator](#create_sklearn_estimator)\n",
" * [Batch transform our training data](#preprocess_train_data)\n",
"* [Launch SageMaker Autopilot job with the preprocessed data](#autopilot)\n",
"* [Serial Inference Pipeline that combines feature selection and autopilot](#inference_pipeline)\n",
" * [Set up the inference pipeline](#pipeline_setup)\n",
" * [Make a request to our pipeline endpoint](#pipeline_inference_request)\n",
" * [Delete Endpoint](#delete_endpoint)"
]
},
{
"cell_type": "markdown",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"## Setup "
]
},
{
"cell_type": "markdown",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"Let's first create our Sagemaker session and role, and create a S3 prefix to use for the notebook example."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"!pip install -U sagemaker"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"import sagemaker\n",
"import os\n",
"from sagemaker import get_execution_role\n",
"\n",
"sagemaker_session = sagemaker.Session()\n",
"\n",
"# S3 prefix\n",
"bucket = sagemaker_session.default_bucket()\n",
"prefix = \"reuse-autopilot-blog\"\n",
"\n",
"# Get a SageMaker-compatible role used by this Notebook Instance.\n",
"role = get_execution_role()"
]
},
{
"cell_type": "markdown",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"## Generate dataset \n",
"\n",
"\n",
"We use `sklearn.datasets.make_regression` to generate data with 100 features. 5 of these features are informative."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"pycharm": {
"name": "#%%\n"
},
"scrolled": true
},
"outputs": [],
"source": [
"from sklearn.datasets import make_regression\n",
"import pandas as pd\n",
"from sklearn.model_selection import train_test_split\n",
"\n",
"X, y = make_regression(n_features=100, n_samples=1500, n_informative=5, random_state=0)\n",
"df_X = pd.DataFrame(X).rename(columns=lambda x: \"x_\" + str(x))\n",
"df_y = pd.DataFrame(y).rename(columns=lambda x: \"y\")\n",
"df = pd.concat([df_X, df_y], axis=1)\n",
"pd.options.display.max_columns = 14\n",
"df.head()"
]
},
{
"cell_type": "markdown",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"## Upload the data for training \n",
"\n",
"When training large models with huge amounts of data, you'll typically use big data tools, like Amazon Athena, AWS Glue, or Amazon EMR, to create your data in S3. In this notebook, we use the tools provided by the SageMaker Python SDK to upload the data to `S3`. \n",
"\n",
"We first create a folder `data` to store our dataset locally. Then we save our data as `train.csv` and upload it to the `S3` bucket specified earlier."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"%%sh\n",
"\n",
"if [ ! -d ./data ]\n",
"then\n",
" mkdir data\n",
"fi"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"df.to_csv(\"./data/train.csv\", index=False)\n",
"\n",
"WORK_DIRECTORY = \"data\"\n",
"\n",
"train_input = sagemaker_session.upload_data(\n",
" path=\"{}/{}\".format(WORK_DIRECTORY, \"train.csv\"),\n",
" bucket=bucket,\n",
" key_prefix=\"{}/{}\".format(prefix, \"training_data\"),\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"## Feature Selection \n",
"\n",
"We use Scikit-learn on Sagemaker `SKLearn` Estimator with a feature selection script as an entry point. The script is very similar to a training script you might run outside of SageMaker, but you can access useful properties about the training environment through various environment variables, such as:\n",
"\n",
"* SM_MODEL_DIR: A string representing the path to the directory to write model artifacts to. These artifacts are uploaded to S3 for model hosting.\n",
"* SM_OUTPUT_DIR: A string representing the filesystem path to write output artifacts to. Output artifacts may include checkpoints, graphs, and other files to save, not including model artifacts. These artifacts are compressed and uploaded to S3 to the same S3 prefix as the model artifacts.\n",
"\n",
"A typical training script loads data from the input channels, trains a model, and saves a model to model_dir so that it can be hosted later. \n"
]
},
{
"cell_type": "markdown",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"## Prepare Feature Selection Script \n",
"\n",
"Inside `SKLearn` container, `sklearn.feature_selection` module contains several feature selection algorithms. We choose the following feature selection algorithms in our training script. \n",
"\n",
"* Recursive feature elimination using `sklearn.feature_selection.RFE`: the goal of recursive feature elimination (RFE) is to select features by recursively considering smaller and smaller sets of features. First, the estimator is trained on the initial set of features and the importance of each feature is obtained. Then, the least important features are pruned from current set of features. That procedure is recursively repeated on the pruned set until the desired number of features to select is eventually reached. We use Epsilon-Support Vector Regression (`sklearn.svm.SVR`) as our learning estimator for RFE. \n",
"*Â Univariate linear regression test using `sklearn.feature_selection.f_regression`: Linear model for testing the individual effect of each of many regressors. This is done in 2 steps. First the correlation between each regressor and the target is computed. Then the correction is converted to an F score then to a p-value. Features with low p-values are selected. \n",
"* Select features according to the k highest scores using `sklearn.feature_selection.SelectKBest`. We use mutual information as the score function. Mutual information between two random variables is a non-negative value, which measures the dependency between the variables. It is equal to zero if and only if two random variables are independent, and higher values mean higher dependency.\n",
"\n",
"We stack the three feature selection algorithms into one `sklearn.pipeline.Pipeline`. After training is done, we save model artifacts to `SM_MODEL_DIR`. We also saved the selected column names for later use. The complete Python script is shown below:\n",
"\n",
"**Note that the feature selection algorithms used here are for demonstration purposes. You can update the script based on the feature selection algorithm of your choice.**\n",
"\n",
"```python\n",
"from __future__ import print_function\n",
"\n",
"import time\n",
"import sys\n",
"from io import StringIO\n",
"import os\n",
"import shutil\n",
"\n",
"import argparse\n",
"import csv\n",
"import joblib\n",
"import json\n",
"import numpy as np\n",
"import pandas as pd\n",
"\n",
"from sklearn.impute import SimpleImputer\n",
"from sklearn.pipeline import Pipeline\n",
"from sklearn.svm import SVR\n",
"from sklearn.feature_selection import f_regression, mutual_info_regression, SelectKBest, RFE\n",
"\n",
"from sagemaker_containers.beta.framework import (\n",
" content_types, encoders, env, modules, transformer, worker)\n",
"\n",
"label_column = 'y'\n",
"INPUT_FEATURES_SIZE = 100\n",
"\n",
"if __name__ == '__main__':\n",
"\n",
" parser = argparse.ArgumentParser()\n",
"\n",
" # Sagemaker specific arguments. Defaults are set in the environment variables.\n",
" parser.add_argument('--output-data-dir', type=str, default=os.environ['SM_OUTPUT_DATA_DIR'])\n",
" parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])\n",
" parser.add_argument('--train', type=str, default=os.environ['SM_CHANNEL_TRAIN'])\n",
"\n",
" args = parser.parse_args()\n",
"\n",
" # Take the set of files and read them all into a single pandas dataframe\n",
" input_files = [ os.path.join(args.train, file) for file in os.listdir(args.train) ]\n",
" if len(input_files) == 0:\n",
" raise ValueError(('There are no files in {}.\\n' +\n",
" 'This usually indicates that the channel ({}) was incorrectly specified,\\n' +\n",
" 'the data specification in S3 was incorrectly specified or the role specified\\n' +\n",
" 'does not have permission to access the data.').format(args.train, \"train\"))\n",
" \n",
" raw_data = [ pd.read_csv(file) for file in input_files ]\n",
" concat_data = pd.concat(raw_data)\n",
" \n",
" number_of_columns_x = concat_data.shape[1]\n",
" y_train = concat_data.iloc[:,number_of_columns_x-1].values\n",
" X_train = concat_data.iloc[:,:number_of_columns_x-1].values\n",
" \n",
" '''Feature selection pipeline'''\n",
" feature_selection_pipe = Pipeline([\n",
" ('svr', RFE(SVR(kernel=\"linear\"))),# default: eliminate 50%\n",
" ('f_reg',SelectKBest(f_regression, k=30)),\n",
" ('mut_info',SelectKBest(mutual_info_regression, k=10))\n",
" ])\n",
" \n",
" \n",
" feature_selection_pipe.fit(X_train,y_train)\n",
"\n",
" joblib.dump(feature_selection_pipe, os.path.join(args.model_dir, \"model.joblib\"))\n",
"\n",
" print(\"saved model!\")\n",
" \n",
" \n",
" '''Save selected feature names'''\n",
" feature_names = concat_data.columns[:-1]\n",
" feature_names = feature_names[feature_selection_pipe.named_steps['svr'].get_support()]\n",
" feature_names = feature_names[feature_selection_pipe.named_steps['f_reg'].get_support()]\n",
" feature_names = feature_names[feature_selection_pipe.named_steps['mut_info'].get_support()]\n",
" joblib.dump(feature_names, os.path.join(args.model_dir, \"selected_feature_names.joblib\"))\n",
" \n",
" print(\"Selected features are: {}\".format(feature_names))\n",
" \n",
" \n",
"def input_fn(input_data, content_type):\n",
" \"\"\"Parse input data payload\n",
" \n",
" We currently only take csv input. Since we need to process both labelled\n",
" and unlabelled data we first determine whether the label column is present\n",
" by looking at how many columns were provided.\n",
" \"\"\"\n",
" \n",
" if content_type == 'text/csv':\n",
" # Read the raw input data as CSV.\n",
" df = pd.read_csv(StringIO(input_data)) \n",
" return df\n",
" else:\n",
" raise ValueError(\"{} not supported by script!\".format(content_type))\n",
" \n",
"\n",
"def output_fn(prediction, accept):\n",
" \"\"\"Format prediction output\n",
" \n",
" The default accept/content-type between containers for serial inference is JSON.\n",
" We also want to set the ContentType or mimetype as the same value as accept so the next\n",
" container can read the response payload correctly.\n",
" \"\"\"\n",
" if accept == \"application/json\":\n",
" instances = []\n",
" for row in prediction.tolist():\n",
" instances.append({\"features\": row})\n",
"\n",
" json_output = {\"instances\": instances}\n",
"\n",
" return worker.Response(json.dumps(json_output), mimetype=accept)\n",
" elif accept == 'text/csv':\n",
" return worker.Response(encoders.encode(prediction, accept), mimetype=accept)\n",
" else:\n",
" raise RuntimeException(\"{} accept type is not supported by this script.\".format(accept))\n",
"\n",
"\n",
"def predict_fn(input_data, model):\n",
" \"\"\"Preprocess input data\n",
" \n",
" We implement this because the default predict_fn uses .predict(), but our model is a preprocessor\n",
" so we want to use .transform().\n",
"\n",
" The output is returned in the following order:\n",
" \n",
" rest of features either one hot encoded or standardized\n",
" \"\"\"\n",
" print(\"Input data shape at predict_fn: {}\".format(input_data.shape))\n",
" if input_data.shape[1] == INPUT_FEATURES_SIZE:\n",
" # This is a unlabelled example, return only the features\n",
" features = model.transform(input_data)\n",
" return features\n",
" \n",
" elif input_data.shape[1] == INPUT_FEATURES_SIZE + 1:\n",
" # Labeled data. Return label and features\n",
" features = model.transform(input_data.iloc[:,:INPUT_FEATURES_SIZE])\n",
" return np.insert(features, 0, input_data[label_column], axis=1)\n",
"\n",
"def model_fn(model_dir):\n",
" \"\"\"Deserialize fitted model\n",
" \"\"\"\n",
" preprocessor = joblib.load(os.path.join(model_dir, \"model.joblib\"))\n",
" return preprocessor\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"## Create SageMaker Scikit Estimator for feature selection \n",
"\n",
"To run our Scikit-learn training script on SageMaker, we construct a `sagemaker.sklearn.estimator.sklearn` estimator, which accepts several constructor arguments:\n",
"\n",
"* __entry_point__: The path to the Python script SageMaker runs for training and prediction.\n",
"* __role__: Role ARN\n",
"* __instance_type__ *(optional)*: The type of SageMaker instances for training. __Note__: Because Scikit-learn does not natively support GPU training, Sagemaker Scikit-learn does not currently support training on GPU instance types.\n",
"* __sagemaker_session__ *(optional)*: The session used to train on Sagemaker.\n",
"\n",
"To see the code for the SKLearn Estimator, see here: https://github.com/aws/sagemaker-python-sdk/tree/master/src/sagemaker/sklearn"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"pycharm": {
"name": "#%%\n"
},
"scrolled": true
},
"outputs": [],
"source": [
"from sagemaker.sklearn.estimator import SKLearn\n",
"\n",
"script_path = \"sklearn_feature_selection.py\"\n",
"model_output_path = os.path.join(\"s3://\", bucket, prefix, \"Feature_selection_model/\")\n",
"\n",
"sklearn_preprocessor = SKLearn(\n",
" entry_point=script_path,\n",
" role=role,\n",
" output_path=model_output_path,\n",
" instance_type=\"ml.c4.xlarge\",\n",
" sagemaker_session=None,\n",
" framework_version=\"1.2-1\",\n",
" py_version=\"py3\",\n",
")\n",
"\n",
"sklearn_preprocessor.fit({\"train\": train_input})"
]
},
{
"cell_type": "markdown",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"The trained model contains model.joblib, which is our feature selection pipeline. In additon to that, we also saved selected features. It can be retrived from `model_output_path` as show below. We retrive the selected feature names for later use."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"key_prefix = os.path.join(\n",
" prefix,\n",
" \"Feature_selection_model\",\n",
" sklearn_preprocessor.latest_training_job.job_name,\n",
" \"output\",\n",
" \"model.tar.gz\",\n",
")\n",
"sagemaker_session.download_data(path=\"./\", bucket=bucket, key_prefix=key_prefix)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"!tar xvzf model.tar.gz"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"import joblib\n",
"\n",
"feature_list = list(joblib.load(\"selected_feature_names.joblib\"))\n",
"print(feature_list)"
]
},
{
"cell_type": "markdown",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"## Batch transform our training data \n",
"Now that our feature selection model is properly fitted, let's go ahead and transform our training data. Let's use batch transform to directly process the raw data and store right back into s3."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"# Define a SKLearn Transformer from the trained SKLearn Estimator\n",
"transformer_output = os.path.join(\"s3://\", bucket, prefix, \"Feature_selection_output/\")\n",
"transformer = sklearn_preprocessor.transformer(\n",
" instance_count=1,\n",
" instance_type=\"ml.m4.xlarge\",\n",
" output_path=transformer_output,\n",
" assemble_with=\"Line\",\n",
" accept=\"text/csv\",\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"pycharm": {
"name": "#%%\n"
},
"scrolled": true
},
"outputs": [],
"source": [
"# Preprocess training input\n",
"transformer.transform(train_input, content_type=\"text/csv\")\n",
"print(\"Waiting for transform job: \" + transformer.latest_transform_job.job_name)\n",
"transformer.wait()\n",
"preprocessed_train = transformer.output_path"
]
},
{
"cell_type": "markdown",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"## Autopilot "
]
},
{
"cell_type": "markdown",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"### First we add column names to transferred data\n",
"\n",
"`selected_feature_names.joblib` downloaded from previous step contains the list of variables selected. For this demonstration, we download the batch transform output file from S3 and add column name on this notebook instance. When dealing with big dataset, you can use SageMaker Processing or Amazon Glue to add column names. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"transformer_output_path = os.path.join(transformer.output_path)\n",
"\n",
"key_prefix = (\n",
" transformer_output_path[transformer_output_path.find(bucket) + len(bucket) + 1 :]\n",
" + \"train.csv.out\"\n",
")\n",
"print(transformer_output_path)\n",
"\n",
"sagemaker_session.download_data(path=\"./\", bucket=bucket, key_prefix=key_prefix)\n",
"df_new = pd.read_csv(\"train.csv.out\", header=None)\n",
"\n",
"# first column is the target variable\n",
"df_new.columns = [\"y\"] + feature_list"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"df_new.to_csv(\"./data/train_new.csv\", index=False)\n",
"\n",
"WORK_DIRECTORY = \"data\"\n",
"\n",
"train_new_input = sagemaker_session.upload_data(\n",
" path=\"{}/{}\".format(WORK_DIRECTORY, \"train_new.csv\"),\n",
" bucket=bucket,\n",
" key_prefix=\"{}/{}\".format(prefix, \"training_data_new\"),\n",
")\n",
"\n",
"df_new.head()"
]
},
{
"cell_type": "markdown",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"### Set up and kick off autopilot job"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"input_data_config = [\n",
" {\n",
" \"DataSource\": {\n",
" \"S3DataSource\": {\n",
" \"S3DataType\": \"S3Prefix\",\n",
" \"S3Uri\": \"s3://{}/{}/training_data_new\".format(bucket, prefix),\n",
" }\n",
" },\n",
" \"TargetAttributeName\": \"y\",\n",
" }\n",
"]\n",
"\n",
"output_data_config = {\"S3OutputPath\": \"s3://{}/{}/autopilot_job_output\".format(bucket, prefix)}\n",
"\n",
"AutoML_Job_Config = {\n",
" \"CompletionCriteria\": {\n",
" # we set MaxCandidate to 50 to have shorter run time. Please adjust this for your use case.\n",
" \"MaxCandidates\": 50,\n",
" \"MaxAutoMLJobRuntimeInSeconds\": 1800,\n",
" }\n",
"}"
]
},
{
"cell_type": "markdown",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"You can now launch the Autopilot job by calling the create_auto_ml_job API."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"from time import gmtime, strftime, sleep\n",
"import boto3\n",
"\n",
"region = boto3.Session().region_name\n",
"\n",
"sm = boto3.Session().client(service_name=\"sagemaker\", region_name=region)\n",
"timestamp_suffix = strftime(\"%d-%H-%M-%S\", gmtime())\n",
"\n",
"auto_ml_job_name = \"automl-blog\" + timestamp_suffix\n",
"print(\"AutoMLJobName: \" + auto_ml_job_name)\n",
"\n",
"sm.create_auto_ml_job(\n",
" AutoMLJobName=auto_ml_job_name,\n",
" InputDataConfig=input_data_config,\n",
" OutputDataConfig=output_data_config,\n",
" AutoMLJobConfig=AutoML_Job_Config,\n",
" RoleArn=role,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"### Tracking SageMaker Autopilot job progress\n",
"\n",
"SageMaker Autopilot job consists of the following high-level steps :\n",
"\n",
"* Analyzing Data, where the dataset is analyzed and Autopilot comes up with a list of ML pipelines that should be tried out on the dataset. The dataset is also split into train and validation sets.\n",
"* Feature Engineering, where Autopilot performs feature transformation on individual features of the dataset as well as at an aggregate level.\n",
"* Model Tuning, where the top performing pipeline is selected along with the optimal hyperparameters for the training algorithm (the last stage of the pipeline).\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"pycharm": {
"name": "#%%\n"
},
"scrolled": true
},
"outputs": [],
"source": [
"print(\"JobStatus - Secondary Status\")\n",
"print(\"------------------------------\")\n",
"\n",
"\n",
"describe_response = sm.describe_auto_ml_job(AutoMLJobName=auto_ml_job_name)\n",
"print(describe_response[\"AutoMLJobStatus\"] + \" - \" + describe_response[\"AutoMLJobSecondaryStatus\"])\n",
"job_run_status = describe_response[\"AutoMLJobStatus\"]\n",
"\n",
"while job_run_status not in (\"Failed\", \"Completed\", \"Stopped\"):\n",
" describe_response = sm.describe_auto_ml_job(AutoMLJobName=auto_ml_job_name)\n",
" job_run_status = describe_response[\"AutoMLJobStatus\"]\n",
"\n",
" print(\n",
" describe_response[\"AutoMLJobStatus\"] + \" - \" + describe_response[\"AutoMLJobSecondaryStatus\"]\n",
" )\n",
" sleep(30)"
]
},
{
"cell_type": "markdown",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"### Results\n",
"Now use the describe_auto_ml_job API to look up the best candidate selected by the SageMaker Autopilot job.\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"from IPython.display import JSON\n",
"\n",
"best_candidate = sm.describe_auto_ml_job(AutoMLJobName=auto_ml_job_name)[\"BestCandidate\"]\n",
"best_candidate_name = best_candidate[\"CandidateName\"]\n",
"\n",
"print(\"\\n\")\n",
"print(\"CandidateName: \" + best_candidate_name)\n",
"print(\"CandidateName Steps: \" + best_candidate[\"FinalAutoMLJobObjectiveMetric\"][\"MetricName\"])\n",
"print(\n",
" \"FinalAutoMLJobObjectiveMetricName: \"\n",
" + best_candidate[\"FinalAutoMLJobObjectiveMetric\"][\"MetricName\"]\n",
")\n",
"print(\n",
" \"FinalAutoMLJobObjectiveMetricValue: \"\n",
" + str(best_candidate[\"FinalAutoMLJobObjectiveMetric\"][\"Value\"])\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"Autopilot generates 2 containers, one for data processing, and the other for machine learning model. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"best_candidate[\"InferenceContainers\"]"
]
},
{
"cell_type": "markdown",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"## Serial Inference Pipeline that combines feature selection and autopilot \n"
]
},
{
"cell_type": "markdown",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"## Set up the inference pipeline \n",
"Setting up a Machine Learning pipeline can be done with the Pipeline Model. This sets up a list of models in a single endpoint; in this example, we configure our pipeline model with the fitted Scikit-learn inference model and Autopilot models. Deploying the model follows the same ```deploy``` pattern in the SDK."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"sklearn_preprocessor.latest_training_job.describe()[\"HyperParameters\"][\n",
" \"sagemaker_submit_directory\"\n",
"][1:-1]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"pycharm": {
"name": "#%%\n"
},
"scrolled": true
},
"outputs": [],
"source": [
"from botocore.exceptions import ClientError\n",
"\n",
"sagemaker = boto3.client(\"sagemaker\")\n",
"import time\n",
"from datetime import datetime\n",
"\n",
"time_stamp = datetime.now().strftime(\"%m-%d-%Y-%I-%M-%S-%p\")\n",
"\n",
"pipeline_name = \"pipeline-blog-\" + time_stamp\n",
"pipeline_endpoint_config_name = \"pipeline-blog-endpoint-config-\" + time_stamp\n",
"pipeline_endpoint_name = \"pipeline-blog-endpoint-\" + time_stamp\n",
"\n",
"sklearn_image = sklearn_preprocessor.image_uri\n",
"container_1_source = sklearn_preprocessor.latest_training_job.describe()[\"HyperParameters\"][\n",
" \"sagemaker_submit_directory\"\n",
"][1:-1]\n",
"inference_containers = [\n",
" {\n",
" \"Image\": sklearn_image,\n",
" \"ModelDataUrl\": sklearn_preprocessor.model_data,\n",
" \"Environment\": {\n",
" \"SAGEMAKER_SUBMIT_DIRECTORY\": container_1_source,\n",
" \"SAGEMAKER_DEFAULT_INVOCATIONS_ACCEPT\": \"text/csv\",\n",
" \"SAGEMAKER_PROGRAM\": \"sklearn_feature_selection.py\",\n",
" },\n",
" }\n",
"]\n",
"\n",
"inference_containers.extend(best_candidate[\"InferenceContainers\"])\n",
"\n",
"response = sagemaker.create_model(\n",
" ModelName=pipeline_name, Containers=inference_containers, ExecutionRoleArn=role\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"Now that we've created our pipeline and let us deploy it to a hosted endpoint."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"try:\n",
" response = sagemaker.create_endpoint_config(\n",
" EndpointConfigName=pipeline_endpoint_config_name,\n",
" ProductionVariants=[\n",
" {\n",
" \"VariantName\": \"DefaultVariant\",\n",
" \"ModelName\": pipeline_name,\n",
" \"InitialInstanceCount\": 1,\n",
" \"InstanceType\": \"ml.m4.xlarge\",\n",
" },\n",
" ],\n",
" )\n",
" print(\"{}\\n\".format(response))\n",
"\n",
"except ClientError:\n",
" print(\"Endpoint config already exists, continuing...\")\n",
"\n",
"\n",
"try:\n",
" response = sagemaker.create_endpoint(\n",
" EndpointName=pipeline_endpoint_name,\n",
" EndpointConfigName=pipeline_endpoint_config_name,\n",
" )\n",
" print(\"{}\\n\".format(response))\n",
"\n",
"except ClientError:\n",
" print(\"Endpoint already exists, continuing...\")\n",
"\n",
"\n",
"# Monitor the status until completed\n",
"endpoint_status = sagemaker.describe_endpoint(EndpointName=pipeline_endpoint_name)[\"EndpointStatus\"]\n",
"while endpoint_status not in (\"OutOfService\", \"InService\", \"Failed\"):\n",
" endpoint_status = sagemaker.describe_endpoint(EndpointName=pipeline_endpoint_name)[\n",
" \"EndpointStatus\"\n",
" ]\n",
" print(endpoint_status)\n",
" time.sleep(30)"
]
},
{
"cell_type": "markdown",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"## Make a request to our pipeline endpoint\n",
"\n",
"\n",
"Here we just grab the first line from the training data for demonstration purpose. The ```ContentType``` field configures the first container, while the ```Accept``` field configures the last container. You can also specify each container's ```Accept``` and ```ContentType``` values using environment variables.\n",
"\n",
"We make our request with the payload in ```'text/csv'``` format, since that is what our script currently supports. If other formats need to be supported, this would have to be added to the ```input_fn()``` method in our entry point. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"test_data = df.iloc[0:5, :-1]\n",
"print(test_data)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"from sagemaker.predictor import Predictor\n",
"from sagemaker.serializers import IdentitySerializer\n",
"from sagemaker.deserializers import CSVDeserializer\n",
"\n",
"predictor = Predictor(\n",
" endpoint_name=pipeline_endpoint_name,\n",
" sagemaker_session=sagemaker_session,\n",
" serializer=IdentitySerializer(content_type=\"text/csv\"),\n",
" deserializer=CSVDeserializer(),\n",
")\n",
"\n",
"predictor.predict(test_data.to_csv(sep=\",\", header=True, index=False))"
]
},
{
"cell_type": "markdown",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"## Delete Endpoint \n",
"Once we are finished with the endpoint, we clean up the resources!"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"sm_client = sagemaker_session.boto_session.client(\"sagemaker\")\n",
"sm_client.delete_endpoint(EndpointName=pipeline_endpoint_name)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Notebook CI Test Results\n",
"\n",
"This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n"
]
}
],
"metadata": {
"celltoolbar": "Tags",
"kernelspec": {
"display_name": "Python 3 (Data Science 3.0)",
"language": "python",
"name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/sagemaker-data-science-310-v1"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.6"
}
},
"nbformat": 4,
"nbformat_minor": 2
}