# Model training

This notebook shows how to train a XGBoost model to predict the likelyhood of a bid on an ad request. It is a binary classification problem, the model predicts bid / no_bid given the details of an ad request


This notebook is designed to be run with `Python 3 (Data Science)` kernel.

In [None]:
import boto3
import os
import sagemaker
import re
from sagemaker import get_execution_role

sagemaker_sess = sagemaker.Session()
role = get_execution_role()
role

## Define input and output paths

In [None]:
boto_session = boto3.Session()
ssm= boto_session.client('ssm')
s3_client = boto3.client("s3")
bucket = ssm.get_parameter(Name="/aik/data-bucket")["Parameter"]["Value"]

In [None]:
INPUT_BUCKET_NAME = bucket
OUTPUT_BUCKET_NAME = bucket
DATA_PREFIX = 'processed/sample'

In [None]:
def getTrainFiles(file_type='libsvm', bucket_name=INPUT_BUCKET_NAME, prefix=DATA_PREFIX):
 items = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=f"{prefix}/train/")
 files = []
 for itm in items['Contents']:
 if itm['Key'].endswith(file_type):
 files.append(itm['Key'])
 return files

def getValidationFiles(file_type='libsvm', bucket_name=INPUT_BUCKET_NAME, prefix=DATA_PREFIX):
 items = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=f"{prefix}/valid/")
 files = []
 for itm in items['Contents']:
 if itm['Key'].endswith(file_type):
 files.append(itm['Key'])
 return files

def getTestFiles(file_type='libsvm', bucket_name=INPUT_BUCKET_NAME, prefix=DATA_PREFIX):
 items = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=f"{prefix}/test/")
 files = []
 for itm in items['Contents']:
 if itm['Key'].endswith(file_type):
 files.append(itm['Key'])
 return files

In [None]:
file_type = 'parquet'
train_files = getTrainFiles(file_type=file_type)
test_files = getTestFiles(file_type=file_type)
valid_files = getValidationFiles(file_type=file_type)
print(f"Train files: {len(train_files)}")
print(f"Test files: {len(test_files)}")
print(f"Valid files: {len(valid_files)}")

## Overview of sample data

Let's have a look into the data by downloading sample train/test/valid files from S3 and store it locally in temp folder

In [None]:
import os
from sklearn.datasets import load_svmlight_file
import pandas as pd

if not os.path.exists(os.path.join("./temp")):
 os.makedirs(os.path.join("./temp"))

### Download single train/valid/test file

In [None]:
file_type = 'parquet'
s3_client.download_file(INPUT_BUCKET_NAME, train_files[0], f"./temp/training_set.{file_type}")
s3_client.download_file(INPUT_BUCKET_NAME, valid_files[0], f"temp/validation_set.{file_type}")
s3_client.download_file(INPUT_BUCKET_NAME, test_files[0], f"temp/test_set.{file_type}")

Let's only look into first 100k rows

In [None]:
X_train_pd = pd.read_parquet(f"temp/training_set.{file_type}")[:100000]
X_train_pd.describe()

In [None]:
X_train_pd.head()

Ensure we have only numeric values

In [None]:
X_train_pd.dtypes

### Check the ratio of bid/no bid

In [None]:
X_train_pd["label"].value_counts()

### Vizualize Correlation matrix

In [None]:
train_corrs = X_train_pd.corr()

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

plt.figure(figsize=(40,20))
sns.set(font_scale=1.5)
sns.heatmap(train_corrs, annot = True, vmin=-1, vmax=1, center= 0, cmap= 'coolwarm')
plt.show()

We see that at this stage `device_type_id` has a high correlation to our target `label` (bid/no_bid). This is quite normal as we are only using a few features from the dataset. Also there is no value for features `dow` and `IndexAdvertiserID`, this is because the sample data has only one value for those column. 

### Distribution of feature `device_type_id` with respect to `label`

In [None]:
sns.set(font_scale=1.1)
ax = X_train_pd[['device_type_id', 'label']].hist(by='label', sharey=True)
ax[0].set_title("0 (no_bid)")
ax[1].set_title("1 (bid)")
plt.xticks(X_train_pd['device_type_id'].unique())
plt.show()

We can see the values of device_type_id when label is `0` (left chart)

In [None]:
X_train_pd[X_train_pd["label"]==0]["device_type_id"].value_counts()

When the label is `1` (right chart), we have different distribution for `device_type_id` as shown below

In [None]:
X_train_pd[X_train_pd["label"]==1]["device_type_id"].value_counts()

### Vizualise distribution of other features with respect to the label

In [None]:
sns.set(font_scale=1.)
for column in X_train_pd.select_dtypes(include=['object']).columns:
 if column != 'label':
 display(pd.crosstab(index=X_train_pd[column], columns=X_train_pd['label'], normalize='columns'))

for column in X_train_pd.select_dtypes(exclude=['object']).columns:
 if column != 'label':
 print(column)
 hist = X_train_pd[[column, 'label']].hist(by='label', bins=30, sharey=True)
 plt.show()

## XGBoost Training
We are now ready to train a first simple XGboost model using the features data prepared in EMR.

In [None]:
training_path = f"{DATA_PREFIX}/train/"
validation_path = f"{DATA_PREFIX}/valid/"
test_path = f"{DATA_PREFIX}/test/"
training_path, validation_path, test_path 

### Define train input and validation input

`Pipe mode` is available for parquet format, so we will use it to stream the data from S3 directly to training instances. 

We are also sharding the training data into multiple instances by activating `ShardedByS3Key` option, this will allow us to train the model using multiple instances, where each instance will recieve a portion of the data. This is a must for training with 10s of GB of data on multiple instances.

In [None]:
from sagemaker.inputs import TrainingInput
s3_input_train = TrainingInput(s3_data='s3://{}/{}'.format(INPUT_BUCKET_NAME, training_path), content_type='application/x-parquet', distribution="ShardedByS3Key", input_mode='Pipe') # 
s3_input_validation = TrainingInput(s3_data='s3://{}/{}'.format(INPUT_BUCKET_NAME, validation_path), content_type='application/x-parquet', input_mode='Pipe')

### Selecting XGboost version
We are using SageMaker built-in XGboost algorithm version 1.2-1

In [None]:
container = sagemaker.image_uris.retrieve('xgboost', boto_session.region_name, '1.2-1')
display(container)

In [None]:
import time
from time import gmtime, strftime

prefix = 'sagemaker/xgb_bid_filtering'

# JOB_TS = time.strftime('%Y-%m-%d-%H-%M-%S', time.gmtime())
base_job_name = f'sample-single-cpu-parquet-7-features-training1st'
base_job_name

### On-demand instances vs Spot intances for training

We will use on-demand instances to train, however we could use spot instances to save costs but this will add some delays in training time. Spot intances might not be available during certain times. 


In [None]:
use_spot_instances = False
max_run = 5400 # max 90 mins run
max_wait = 7200 if use_spot_instances else None
checkpoint_s3_uri = (
 "s3://{}/{}/checkpoints/{}".format(OUTPUT_BUCKET_NAME, prefix, base_job_name) if use_spot_instances else None
)
print("Checkpoint path:", checkpoint_s3_uri)

In [None]:
output_path='s3://{}/{}/output'.format(OUTPUT_BUCKET_NAME, prefix)
output_path

### Create XGBoost model and define hyperparameters

In [None]:
xgb = sagemaker.estimator.Estimator(container,
 role,
 # volume_size=150, # default 30GB 
 base_job_name=base_job_name,
 instance_count=1,
 instance_type='ml.m5.2xlarge', # Other alternatives for CPU ml.m5.12xlarge. For GPU ml.g4dn.4xlarge, ml.g4dn.xlarge, ml.p3.2xlarge
 output_path='s3://{}/{}/output'.format(OUTPUT_BUCKET_NAME, prefix),
 sagemaker_session=sagemaker_sess,
 enable_sagemaker_metrics=True,
 use_spot_instances=use_spot_instances, # used for managed spot training
 max_run=max_run, # used for managed spot training
 max_wait=max_wait, # used for managed spot training
 checkpoint_s3_uri=checkpoint_s3_uri, # used for managed spot training
 )
xgb.set_hyperparameters(max_depth=5,
 # tree_method='gpu_hist', # Required when GPU instance is chosen
 eta=0.2,
 gamma=4,
 min_child_weight=6,
 subsample=0.8,
 # silent=0,
 objective='binary:logistic',
 early_stopping_rounds=20,
 num_round=50)

### Start the training asynchronously

In [None]:
xgb.fit(inputs={'train': s3_input_train, 'validation': s3_input_validation},
 wait=False) 

Wait till the training job is completed

In [None]:
status = sagemaker_sess.describe_training_job(xgb._current_job_name)['TrainingJobStatus']
while(status not in ["Completed", "Failed", "Stopped"]):
 time.sleep(30)
 status = sagemaker_sess.describe_training_job(xgb._current_job_name)['TrainingJobStatus']
 print (status)

## Load trained model
Once the Training Job is completed, let's load the model locally and perfrom evaluation on test set.

In [None]:
TRAINING_JOB_NAME = xgb._current_job_name
TRAINING_JOB_NAME

In [None]:
s3_model_path = xgb.model_data
s3_model_path

### Download model from s3

In [None]:
import os
model_tar_file_local_path = f"./trained_model/{TRAINING_JOB_NAME}/model.tar.gz"

# download trained model locally
if not os.path.exists(os.path.dirname(model_tar_file_local_path)):
 os.makedirs(os.path.dirname(model_tar_file_local_path))

s3_client.download_file(Bucket=OUTPUT_BUCKET_NAME, 
 Key=s3_model_path.replace(f"s3://{OUTPUT_BUCKET_NAME}/", ""), 
 Filename=model_tar_file_local_path)

### Extract model locally

In [None]:
import tarfile
# Extract the model tar file and retrieve the model pickle file
with tarfile.open(model_tar_file_local_path, "r:gz") as tar:
 tar.extractall(path=f"./trained_model/{TRAINING_JOB_NAME}")

In [None]:
!pip install -q xgboost

In [None]:
import pickle as pkl
import xgboost as xgblib
import os
import numpy as np
import pandas as pd
from sklearn.datasets import load_svmlight_file

def model_fn(model_dir):
 with open(os.path.join(model_dir, "xgboost-model"), "rb") as f:
 booster = pkl.load(f)
 return booster

def local_predict(xgb_local, test_libsvm_file):
 t_mat = xgblib.DMatrix(test_libsvm_file)
 preds = xgb_local.predict(t_mat)
 return preds

def local_predict_nparray(xgb_local, np_array):
 t_mat = xgblib.DMatrix(np_array)
 preds = xgb_local.predict(t_mat)
 return preds

### Download test dataset

In [None]:
file_type = 'parquet'
# Download test file matching the model
model_folder = f"./trained_model/{TRAINING_JOB_NAME}"
test_file_s3_path = test_files[0]
test_file_local_path = f"./trained_model/{TRAINING_JOB_NAME}/test_set.{file_type}"

s3_client.download_file(INPUT_BUCKET_NAME, test_file_s3_path, test_file_local_path)

In [None]:
xgb_local = model_fn(model_folder)

In [None]:
if file_type == 'parquet':
 X_test_pd = pd.read_parquet(test_file_local_path)
 X_test = X_test_pd.drop(["label"], axis=1)
 X_test.to_numpy().shape
 y_test = X_test_pd["label"].to_numpy()
elif file_type == 'libsvm':
 X_test, y_test = load_svmlight_file(test_file_local_path, zero_based=True)

## Inference on test set

In [None]:
 %%time
if file_type == 'parquet':
 local_preds = local_predict_nparray(xgb_local, X_test)
 local_y_vals = np.round(local_preds)
elif file_type == 'libsvm':
 local_preds = local_predict(xgb_local, test_file_local_path)
 local_y_vals = np.round(local_preds)
local_preds, local_y_vals

### Confusion matrix

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix

def show_confusion_matrix(y_true, y_preds, threshold=0.5):
 y_vals = np.where(y_preds > threshold, 1, 0)
 cf_matrix = confusion_matrix(y_test, y_vals)
 group_names = ['True Neg','False Pos','False Neg','True Pos']
 group_counts = ["{0:0.0f}".format(value) for value in
 cf_matrix.flatten()]
 labels = [f"{v1}\n{v2}" for v1, v2 in
 zip(group_names,group_counts)]
 labels = np.asarray(labels).reshape(2,2)

 # plt.figure(figsize=(9,7))
 ax = sns.heatmap(cf_matrix, annot=labels, fmt='', cmap='Blues')

 ax.set_title('Confusion Matrix \n\n');
 ax.set_xlabel('\nPredicted Values')
 ax.set_ylabel('Actual Values ');

 ax.xaxis.set_ticklabels(['No Bid','Bid'])
 ax.yaxis.set_ticklabels(['No Bid','Bid'])

 plt.show()

In [None]:
show_confusion_matrix(y_test, local_preds)

### Evaluation metrics

Compute different evaluation metrics to assess the model performance.

In [None]:
from sklearn.metrics import roc_auc_score, accuracy_score, precision_score, recall_score, f1_score, matthews_corrcoef

def collect_eval_metrics(true_values, predicted_values, threshold=0.5):
 metric_df = pd.DataFrame({"test_roc_auc_score":[round(roc_auc_score(true_values, (predicted_values > threshold)), 5)],
 "test_accuracy":[round(accuracy_score(true_values,(predicted_values > threshold)) ,5)],
 "test_recall":[round(recall_score(true_values, (predicted_values > threshold)), 5)],
 "test_precision":[round(precision_score(true_values, (predicted_values > threshold)),5)],
 "test_f1_score":[round(f1_score(true_values, (predicted_values > threshold)),5)],
 "test_matthews_corrcoef":[round(matthews_corrcoef(true_values, (predicted_values > threshold)),5)]})
 return metric_df

In [None]:
evaluation_metrics_df = collect_eval_metrics(y_test, local_preds)
evaluation_metrics_df

### Histograms of probabilities

In [None]:
import matplotlib.pyplot as plt
plt.hist(local_preds)
plt.show()

The above chart shows the distribution of predicted probabilities. 

We are using np.round() to convert the proability into the class 0 (no_bid) and 1 (bid).
In this case the cut off threshold is 0.5, however for bid prediction use-case false negatives are much more important than false positives. We can change the cut-off threshold to minimize the false negatives.

### Change cut off value

We will now change the default cut off value (0.5) to a different value and see the impact on confusion matrix and evaluation metrics like precision, recall, etc.

In [None]:
threshold = 0.1

In [None]:
show_confusion_matrix(y_test, local_preds, threshold)

In [None]:
evaluation_metrics_df = collect_eval_metrics(y_test, local_preds, threshold)
evaluation_metrics_df

### Visualize ROC curve

In [None]:
from sklearn import metrics

def show_roc_curve(y_test, y_preds):
 fpr1, tpr1, _ = metrics.roc_curve(y_test, y_preds)
 auc_title = plt.title("ROC Curve")
 auc_full_model = plt.plot(fpr1, tpr1,
 color = 'blue',
 label = "full model")
 auc_legend = plt.legend(loc = 'lower right')
 random_guess = plt.plot([0,1],[0,1],'r--')
 xlim = plt.xlim([-0.1,1.1])
 ylim = plt.ylim([-0.1,1.1])
 ylabel = plt.ylabel('True Positive Rate')
 xlabel = plt.xlabel('False Positive Rate')
 plt.show()

In [None]:
show_roc_curve(y_test, local_preds)

## Save model in Binary format for inference in ECS

Save locally in the notebook

In [None]:
xgb_model = xgb_local
xgb_binary_model_path = "xgboost.bin"

xgb_model.save_model(xgb_binary_model_path)

Upload to S3 so the inference application can use it in ECS

In [None]:
binary_model_dir = ssm.get_parameter(Name="/aik/xgboost/path")["Parameter"]["Value"]

In [None]:
binary_model_path = ssm.get_parameter(Name="/aik/xgboost/path")["Parameter"]["Value"]
binary_model_path = binary_model_path.replace("s3://" + bucket + "/", "")
s3_client.upload_file(xgb_binary_model_path, bucket, binary_model_path)

## Save the Schema as json for inference in ECS

In [None]:
%%writefile schema.json
{
 "BidID": "StringType",
 "dow": "IntegerType",
 "hour": "StringType",
 "RegionID": "StringType",
 "CityID": "StringType",
 "Domain": "StringType",
 "AdvertiserID": "StringType",
 "BiddingPrice": "LongType",
 "PayingPrice": "LongType",
 "UserAgent": "StringType"
}

In [None]:
inference_schema= ssm.get_parameter(Name="/aik/pipelineModelArtifactSchemaPath")["Parameter"]["Value"]
inference_schema_path = inference_schema.replace("s3://" + bucket + "/", "")

In [None]:
s3_client.upload_file("schema.json", bucket, inference_schema_path)
