# Train Autopilot models using a subset of data, and reuse its recipes on the whole dataset

This notebook's CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook.

![This us-west-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/us-west-2/ap-batch-transform.ipynb)

---

Amazon SageMaker Autopilot is an automated machine learning (commonly referred to as AutoML) solution for tabular datasets. You can use SageMaker Autopilot in different ways: on autopilot (without any human input) or with human guidance, without code through SageMaker Studio or scripted using the AWS SDKs. This notebook will use the AWS SDKs to create an Autopilot job on a subset of data, and apply the recipes on a whole dataset. Compared with running Autopilot job directly on the whole dataset, you save cost and build models more efficiently.

## Setup Environment

In [None]:
import boto3
import botocore
import time
import json
import sagemaker
from sagemaker import AutoML, get_execution_role
from datetime import datetime, timezone
import pandas as pd

sess = sagemaker.session.Session()
role = sagemaker.get_execution_role()
region = boto3.session.Session().region_name

bucket = sess.default_bucket()
prefix = "autopilot-subset-0708" # change this to your own prefix
sagemaker_client = boto3.client("sagemaker")
session = sagemaker.Session()

In [None]:
from time import gmtime, strftime, sleep

timestamp_suffix = strftime("%Y%m%d-%H-%M", gmtime())

## Prepare Data

In [None]:
# download data from s3
s3 = boto3.client("s3")
s3.download_file(
 "sagemaker-example-files-prod-{}".format(region),
 "datasets/tabular/uci_bank_marketing/bank-additional-full.csv",
 "bank-data.csv",
)

In [None]:
# read the dataset
bank_df = pd.read_csv("bank-data.csv", sep=",")
pd.set_option("display.max_columns", 500)
bank_df

In [None]:
# Split the whole dataset to training and testing (80/20 split)
target = "y"
train_data = bank_df.sample(frac=0.8, random_state=200)

test_data = bank_df.drop(train_data.index)

test_data_no_target = test_data.drop(columns=[target])

# shift the target column from the last to the first
cols = list(train_data.columns)
cols = [cols[-1]] + cols[:-1]
train_data = train_data[cols]

### Upload the datasets to S3
train_val_file = "train_val_whole_data.csv"
train_data.to_csv(train_val_file, index=False, header=True)
train_val_data_s3_path = sess.upload_data(path=train_val_file, key_prefix=prefix + "/train")
print("Train-validation data uploaded to: " + train_val_data_s3_path)

test_file = "test_data_no_target.csv"
test_data_no_target.to_csv(test_file, index=False, header=False)
test_data_s3_path = sess.upload_data(path=test_file, key_prefix=prefix + "/test")
print("Test data uploaded to: " + test_data_s3_path)

# further split the whole training dataset into training and validation datasets (75/25 split)
# these will be used to re-fit model using automl's recipes
val_data = train_data.sample(frac=0.25, random_state=200)
train_data = train_data.drop(val_data.index)

### Upload the datasets to S3
train_file = "train_whole_data.csv"
train_data.to_csv(train_file, index=False, header=False)
train_data_s3_path = sess.upload_data(path=train_file, key_prefix=prefix + "/train")
print("Train data uploaded to: " + train_data_s3_path)

val_file = "val_whole_data.csv"
val_data.to_csv(val_file, index=False, header=False)
val_data_s3_path = sess.upload_data(path=val_file, key_prefix=prefix + "/val")
print("validation data uploaded to: " + val_data_s3_path)

# take a subset of the training data to run autopilot job, e.g. 20%
# TODO: stratified sampling

train_subset = train_data.sample(frac=0.2, random_state=100)

# keep header for subset data, since autopilot need header information to pick target
train_file = "train_subset_data.csv"
train_subset.to_csv(train_file, index=False, header=True)
train_subset_s3_path = sess.upload_data(path=train_file, key_prefix=prefix + "/train")
print("Train subset uploaded to: " + train_subset_s3_path)

## Setup Autopilot Jobs

Kick off an autopilot job using only a subset of the training data.

For baselining purpose, we also kick off an autopilot job using the whole training data. Note that the solution itself only needs the first autopilot job.

In [None]:
def run_auto_ml_job(input_s3_path, target, auto_ml_job_name, job_config, role, bucket, prefix):
 input_data_config = [
 {
 "DataSource": {
 "S3DataSource": {
 "S3DataType": "S3Prefix",
 "S3Uri": input_s3_path,
 }
 },
 "TargetAttributeName": target,
 }
 ]

 output_data_config = {"S3OutputPath": "s3://{}/{}/full-output".format(bucket, prefix)}

 print("AutoMLJobName: " + auto_ml_job_name)

 sagemaker_client.create_auto_ml_job(
 AutoMLJobName=auto_ml_job_name,
 InputDataConfig=input_data_config,
 OutputDataConfig=output_data_config,
 AutoMLJobConfig=job_config,
 RoleArn=role,
 )

 print("JobStatus - Secondary Status")
 print("------------------------------")

 describe_response = sagemaker_client.describe_auto_ml_job(AutoMLJobName=auto_ml_job_name)
 print(
 describe_response["AutoMLJobStatus"] + " - " + describe_response["AutoMLJobSecondaryStatus"]
 )
 job_run_status = describe_response["AutoMLJobStatus"]

 while job_run_status not in ("Failed", "Completed", "Stopped"):
 describe_response = sagemaker_client.describe_auto_ml_job(AutoMLJobName=auto_ml_job_name)
 job_run_status = describe_response["AutoMLJobStatus"]

 print(
 describe_response["AutoMLJobStatus"]
 + " - "
 + describe_response["AutoMLJobSecondaryStatus"]
 )
 sleep(60)

 return job_run_status

In [None]:
input_s3_path = train_subset_s3_path
auto_ml_job_name = "auto-bank-sub-" + timestamp_suffix
job_config = {"CompletionCriteria": {"MaxCandidates": 10}}

response = run_auto_ml_job(
 input_s3_path, target, auto_ml_job_name, job_config, role, bucket, prefix
)
response

In [None]:
# Autopilot job for baselining purpose
input_s3_path = train_val_data_s3_path
auto_ml_baseline_job_name = "auto-bank-all-" + timestamp_suffix

response = run_auto_ml_job(
 input_s3_path, target, auto_ml_baseline_job_name, job_config, role, bucket, prefix
)
response

## Retrieve best models from the Autopilot job

In [None]:
# get best candidate of the job

# auto_ml_job_name = 'bank-subset-20230524-13-12'
automl = AutoML.attach(auto_ml_job_name=auto_ml_job_name)
best_candidate = automl.describe_auto_ml_job()["BestCandidate"]
best_candidate_name = best_candidate["CandidateName"]

In [None]:
best_candidate["CandidateSteps"]

In [None]:
# you will need transform job for data processing/feature engineering
# and tuning job for model tuning/fitting
transform_job = ""
tuning_job = ""
for index in range(len(best_candidate["CandidateSteps"])):
 if (
 best_candidate["CandidateSteps"][index]["CandidateStepType"]
 == "AWS::SageMaker::TransformJob"
 ):
 transform_job = best_candidate["CandidateSteps"][index]["CandidateStepName"]
 if (
 best_candidate["CandidateSteps"][index]["CandidateStepType"]
 == "AWS::SageMaker::TrainingJob"
 ):
 tuning_job = best_candidate["CandidateSteps"][index]["CandidateStepName"]

print(f"transform_job:{transform_job}, tuning_job:{tuning_job}")

In [None]:
# If you describe the transform job, you will see the recipe of data processing
response = sagemaker_client.describe_transform_job(TransformJobName=transform_job)
response

In [None]:
# The recipe of data processing we will use in later steps
transform_model = response["ModelName"]
input_content_type = response["TransformInput"]["ContentType"]
output_accept = response["TransformOutput"]["Accept"]

In [None]:
# If you describe the training job, you will see the recipe of model tuning
response = sagemaker_client.describe_training_job(TrainingJobName=tuning_job)
# training_model = response['ModelName']
response

In [None]:
# The recipe of model tuning we will use in later steps
hyperparameters = response["HyperParameters"]
TrainingImage = response["AlgorithmSpecification"]["TrainingImage"]

## Transform the whole training dataset with the best transform model

In [None]:
timestamp_suffix = strftime("%Y%m%d-%H-%M", gmtime())
#
train_transform_job = "ap-transform-whole-train-" + timestamp_suffix
val_transform_job = "ap-transform-whole-val-" + timestamp_suffix
#'s3://sagemaker-us-east-1-827930657850/autopilot-subset-recipe-0524/output/bank-subset-20230524-13-12/preprocessed-data/tuning_data/train/chunk_0.csv'
train_transform_input = train_data_s3_path
val_transform_input = val_data_s3_path


train_transform_output = "s3://{}/{}/output/whole/train".format(bucket, prefix)
val_transform_output = "s3://{}/{}/output/whole/val".format(bucket, prefix)

In [None]:
# kick off a new batch transform job for training and validation dataset
new_transform_job = train_transform_job
new_transform_input = train_transform_input
new_transform_output = train_transform_output

sagemaker_client.create_transform_job(
 TransformJobName=new_transform_job,
 ModelName=transform_model,
 MaxConcurrentTransforms=4,
 BatchStrategy="MultiRecord",
 TransformInput={
 "DataSource": {"S3DataSource": {"S3DataType": "S3Prefix", "S3Uri": new_transform_input}},
 "ContentType": input_content_type,
 "SplitType": "Line",
 },
 TransformResources={"InstanceType": "ml.m5.4xlarge", "InstanceCount": 1},
 TransformOutput={"S3OutputPath": new_transform_output, "Accept": output_accept},
)

In [None]:
new_transform_job = val_transform_job
new_transform_input = val_transform_input
new_transform_output = val_transform_output

sagemaker_client.create_transform_job(
 TransformJobName=new_transform_job,
 ModelName=transform_model,
 MaxConcurrentTransforms=4,
 BatchStrategy="MultiRecord",
 TransformInput={
 "DataSource": {"S3DataSource": {"S3DataType": "S3Prefix", "S3Uri": new_transform_input}},
 "ContentType": input_content_type,
 "SplitType": "Line",
 },
 TransformResources={"InstanceType": "ml.m5.2xlarge", "InstanceCount": 1},
 TransformOutput={"S3OutputPath": new_transform_output, "Accept": output_accept},
)

In [None]:
# track batch transform progress
print("JobStatus")
print("------------------------------")

info = sagemaker_client.describe_transform_job(TransformJobName=train_transform_job)
job_run_status = info["TransformJobStatus"]
# print(info["TrainingJobStatus"] + " - " + info["SecondaryStatus"])

while job_run_status not in ("Failed", "Completed", "Stopped"):
 info = sagemaker_client.describe_transform_job(TransformJobName=train_transform_job)
 job_run_status = info["TransformJobStatus"]
 print(info["TransformJobStatus"])
 sleep(60)

## Create a new training job
Using the hyperparameters from the best candidate and fit on a larger dataset

In [None]:
# train_data_s3_path = 's3://{}/{}/output/bank-subset-20230523-21-26/transformed-data/dpp5/rpb/train'.format(bucket, prefix)
# val_data_s3_path = 's3://{}/{}/output/bank-subset-20230523-21-26/transformed-data/dpp5/rpb/validation'.format(bucket, prefix)
train_data_path = train_transform_output
val_data_path = train_transform_output

model_s3_path = "s3://{}/{}/output/training".format(bucket, prefix)
train_instance = "ml.m5.4xlarge"
instance_count = 1
training_job_name = "ap-training-api-" + timestamp_suffix

In [None]:
# kick off a new training job
sagemaker_client.create_training_job(
 TrainingJobName=training_job_name,
 RoleArn=role,
 AlgorithmSpecification={
 "TrainingImage": response["AlgorithmSpecification"]["TrainingImage"],
 "TrainingInputMode": "File",
 },
 HyperParameters=response["HyperParameters"],
 InputDataConfig=[
 {
 "ChannelName": "train",
 "DataSource": {
 "S3DataSource": {
 "S3DataType": "S3Prefix",
 "S3Uri": train_data_path,
 "S3DataDistributionType": "ShardedByS3Key",
 }
 },
 "ContentType": output_accept,
 "CompressionType": "None",
 "RecordWrapperType": "None",
 "InputMode": "Pipe",
 # "EnableFFM": False,
 },
 {
 "ChannelName": "validation",
 "DataSource": {
 "S3DataSource": {
 "S3DataType": "S3Prefix",
 "S3Uri": val_data_path,
 "S3DataDistributionType": "FullyReplicated",
 }
 },
 "ContentType": output_accept,
 "CompressionType": "None",
 "RecordWrapperType": "None",
 "InputMode": "Pipe",
 # "EnableFFM": False,
 },
 ],
 OutputDataConfig={
 "KmsKeyId": "",
 "S3OutputPath": model_s3_path,
 },
 ResourceConfig={
 "InstanceType": train_instance,
 "InstanceCount": instance_count,
 "VolumeSizeInGB": 50,
 },
 StoppingCondition={"MaxRuntimeInSeconds": 86400},
)

In [None]:
model_name = training_job_name

# track model training progress
print("JobStatus - Secondary Status")
print("------------------------------")


info = sagemaker_client.describe_training_job(TrainingJobName=model_name)
job_run_status = info["TrainingJobStatus"]
print(info["TrainingJobStatus"] + " - " + info["SecondaryStatus"])

while job_run_status not in ("Failed", "Completed", "Stopped"):
 info = sagemaker_client.describe_training_job(TrainingJobName=model_name)
 job_run_status = info["TrainingJobStatus"]
 print(info["TrainingJobStatus"] + " - " + info["SecondaryStatus"])
 sleep(60)

## Create a model using SageMaker AutoML API & Batch Transform Predict test dataset
We will replace the original tuning model in the autopilot job with the newly fitted model

In [None]:
# original recipe
print(best_candidate["CandidateSteps"])
# replace tuning candidate with re-fitted model
best_candidate["CandidateSteps"][3]["CandidateStepName"] = training_job_name
best_candidate["CandidateSteps"][3]["CandidateStepArn"] = info["TrainingJobArn"]
# validate
print(best_candidate["CandidateSteps"])

In [None]:
# original recipe
print(best_candidate["InferenceContainers"][1])
# replace tuning container with re-fitted model
best_candidate["InferenceContainers"][1]["ModelDataUrl"] = info["ModelArtifacts"][
 "S3ModelArtifacts"
]
# validate
print(best_candidate["InferenceContainers"][1])

In [None]:
# creat model by replacing recipes of best candidate
inference_response_keys = ["predicted_label", "probability"]

hack_model = automl.create_model(
 name=best_candidate_name + "-hack",
 candidate=best_candidate,
 inference_response_keys=inference_response_keys,
)

In [None]:
def batch_predict(model, job_name, output_path, instance_count, instance_type, content_type):
 transformer = model.transformer(
 instance_count=instance_count,
 instance_type=instance_type,
 assemble_with="Line",
 output_path=output_path,
 )

 response = transformer.transform(
 data=test_data_s3_path,
 split_type="Line",
 content_type=content_type,
 job_name=job_name,
 wait=False,
 )
 # track batch transform progress

 info = sagemaker_client.describe_transform_job(TransformJobName=job_name)
 job_run_status = info["TransformJobStatus"]

 while job_run_status not in ("Failed", "Completed", "Stopped"):
 info = sagemaker_client.describe_transform_job(TransformJobName=train_transform_job)
 job_run_status = info["TransformJobStatus"]
 sleep(60)
 return job_run_status

In [None]:
# Now you can batch transform the test dataset
batch_output = "s3://{}/{}/output/whole/batchTransformHack".format(bucket, prefix)
job_name = "hack-batch-predict" + timestamp_suffix
job_run_status = batch_predict(
 hack_model,
 "test-batch-predict-with-status",
 batch_output,
 1,
 "ml.m5.xlarge",
 "text/csv",
)
job_run_status

In [None]:
# model_data = info["ModelArtifacts"]["S3ModelArtifacts"]

# primary_container = {"Image": TrainingImage, "ModelDataUrl": model_data}

# create_model_response = sagemaker_client.create_model(
# ModelName=model_name, ExecutionRoleArn=role, PrimaryContainer=primary_container
# )

# print(create_model_response["ModelArn"])

## Last Step! Evaluate the model and compare it with the baseline

In [None]:
# creat model using the baseline autopilot job

automl = AutoML.attach(auto_ml_job_name=auto_ml_baseline_job_name)
best_candidate = automl.describe_auto_ml_job()["BestCandidate"]
best_candidate_name = best_candidate["CandidateName"]

baseline_model = automl.create_model(
 name=best_candidate_name,
 candidate=best_candidate,
 inference_response_keys=inference_response_keys,
)

batch_baseline_output = "s3://{}/{}/output/whole/batchTransform".format(bucket, prefix)
job_name = "baseline-batch-predict" + timestamp_suffix
job_run_status = batch_predict(
 baseline_model, job_name, batch_baseline_output, 1, "ml.m5.xlarge", "text/csv"
)
job_run_status

In [None]:
# create pandas dataframe from s3 files
import re


def get_csv_output_from_s3(s3uri, batch_file):
 file_name = "{}.out".format(batch_file)
 match = re.match("s3://([^/]+)/(.*)", "{}/{}".format(s3uri, file_name))
 output_bucket, output_prefix = match.group(1), match.group(2)
 s3.download_file(output_bucket, output_prefix, file_name)
 return pd.read_csv(file_name, sep=",", header=None)

In [None]:
baseline_df = get_csv_output_from_s3(batch_baseline_output, test_file)

In [None]:
hackmodel_df = get_csv_output_from_s3(batch_output, test_file)

In [None]:
# groundtruth
test_groundtruth = test_data["y"]

In [None]:
# compare this evaluation result with the autopilot model trained on whole dataset
from sklearn.metrics import accuracy_score

baseline_score = accuracy_score(test_groundtruth, baseline_df.iloc[:, 0])
hackmodel_score = accuracy_score(test_groundtruth, hackmodel_df.iloc[:, 0])
print(
 "Baseline model (autopilot on whole training data) score: {}, our model score: {}".format(
 baseline_score, hackmodel_score
 )
)

## The end

## Notebook CI Test Results

This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.


![This us-east-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/us-east-1/ap-batch-transform.ipynb)

![This us-east-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/us-east-2/ap-batch-transform.ipynb)

![This us-west-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/us-west-1/ap-batch-transform.ipynb)

![This ca-central-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ca-central-1/ap-batch-transform.ipynb)

![This sa-east-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/sa-east-1/ap-batch-transform.ipynb)

![This eu-west-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-west-1/ap-batch-transform.ipynb)

![This eu-west-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-west-2/ap-batch-transform.ipynb)

![This eu-west-3 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-west-3/ap-batch-transform.ipynb)

![This eu-central-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-central-1/ap-batch-transform.ipynb)

![This eu-north-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-north-1/ap-batch-transform.ipynb)

![This ap-southeast-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-southeast-1/ap-batch-transform.ipynb)

![This ap-southeast-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-southeast-2/ap-batch-transform.ipynb)

![This ap-northeast-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-northeast-1/ap-batch-transform.ipynb)

![This ap-northeast-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-northeast-2/ap-batch-transform.ipynb)

![This ap-south-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-south-1/ap-batch-transform.ipynb)
