## Module 3: Training Custom Models Using Studio Notebook

In many situations, you may choose to build a custom model when you need to tackle a unique problem or when there isn't a pre-built model that meets your needs. In such cases, building a custom model might involve selecting an appropriate algorithm, fine-tuning its parameters, and optimizing its performance through iterative experimentation. In this module we will going through following steps to build, track, deploy and monitor a custom model using Amazon SageMaker Studio Notebook.

- [Step 1: Pull Data from Offline Feature Store](#Pull-data-from-offline-feature-store)
- [Step 2: Train, Track, and Deploy a Xgboost Model](#Train-XGBoost-Model)
- [Step 3: Train, Track, and Deploy an Isolation Forest Model](#Train-Isolation-Forest-Model)
- [Step 4: Model Monitoring](#Model-Monitoring)
- [Step 5: Clean Up](#Clean-up)

**If you DID NOT run the previous modules, please run [0_setup.ipynb notebook](0_setup.ipynb) first before running this notebook**

**This Demo is optimized for SageMaker Studio using Studio notebook in Data Science Kernel**



### Setup

Install required and/or update libraries

In [None]:
!pip install -Uq pip --quiet

!pip install -Uq awswrangler sagemaker boto3 --quiet

### Import & Global Parameters

In [None]:
import boto3
import sagemaker
import pandas as pd

sagemaker_session = sagemaker.Session()

region = sagemaker_session.boto_region_name
sagemaker_role = sagemaker.get_execution_role()

bucket = sagemaker_session.default_bucket()

s3_client = boto3.client("s3", region_name=region)
sagemaker_client = boto3.client("sagemaker")

prefix = "telco-5g-observability"

%store region
%store bucket
%store sagemaker_role
%store prefix

### Pull data from offline feature store
----
In Module 1 of this workshop, we prepared the raw data and upload the final data into an Offline Feature Store. This dataset is now cataloged in a central location for management and discovery. Now we want to extract that data and build our observability models. SageMaker feature store uses athena query to pull the data and can cast the data directly into a pandas dataframe for further processing.

In [None]:
from sagemaker.feature_store.feature_group import FeatureGroup

%store -r fg_name

anomaly_features = FeatureGroup(name=fg_name, sagemaker_session=sagemaker_session)

query = anomaly_features.athena_query()

table_name = query.table_name
 
query_string = f"""
SELECT * FROM "{table_name}"
"""

query.run(query_string=query_string, output_location=f"s3://{bucket}/{prefix}/data/query_results")
query.wait()

dataset = query.as_dataframe()

dataset

### Train XGBoost Model
----

In real world, data scientist goes through hundreds of iterations to experiment with different algorithm to come up with the best model for the ML use case. Here you are going to start with a supervised learning approach and use XGboost model for our problem.

To get Your features ready for XGBoost, we need to move the target varibale to the first column for our xgboost model. You will also split the data into train & test dataset to keep a holdout set to validate model performance.

In [None]:
col_order = ["anomaly"] + list(dataset.drop(["location_id", "anomaly", "eventtime", "write_time","api_invocation_time",'is_deleted'], axis=1).columns)

train = dataset.sample(frac=0.80, random_state=0)[col_order]
test = dataset.drop(train.index)[col_order]

Upload the training data to S3

In [None]:
train.to_csv("data/train.csv", index=False)
key = f"{prefix}/data/xgboost/train.csv"

s3_client.upload_file(
 Filename="data/train.csv",
 Bucket=bucket,
 Key=key,
)

train_s3_path = f"s3://{bucket}/{key}"
print(f"training data is uploaded to {train_s3_path}")

#### Set the hyperparameters
These are the parameters which will be sent to our training script in order to train the model. Although they are all defined as "hyperparameters" here, they can encompass XGBoost's [Learning Task Parameters](https://xgboost.readthedocs.io/en/latest/parameter.html#learning-task-parameters), [Tree Booster Parameters](https://xgboost.readthedocs.io/en/latest/parameter.html#parameters-for-tree-booster), or any other parameters you'd like to configure for XGBoost.

#### Setup Experiment Run Context
Amazon SageMaker Experiment allows data you to organize, track, compare, and evaluate experiments during the model building and training process. Experiment tracking is extremely important because it enables you to keep track of model performance and changes over time, making it easier to debug and optimize the model. It also helps in reproducing and sharing the results with others, leading to better collaboration and faster iteration. For more details reference [SageMaker documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/experiments-create.html).

### Train a custom model on SageMaker
When it comes to training a model on SageMaker, you start by specifying the type of instance, the framework container, and any hyperparameters you want to use. When you call `estimator.fit()`, you supply the location of your training data. SageMaker will then spin up the specified instance and download your training data onto it. In the example below, we are also supplying a custom training script. This way, SageMaker will copy the script into the container and run. This makes it easy for you to iterate your code.

In [None]:
from sagemaker.xgboost.estimator import XGBoost
from sagemaker.experiments.run import Run, load_run
from sagemaker.utils import unique_name_from_base

train_instance_count=1
train_instance_type="ml.m5.xlarge" 

experiment_name = unique_name_from_base(prefix)

run_name = unique_name_from_base("xgboost-experiment")

with Run(experiment_name=experiment_name, run_name=run_name, 
 sagemaker_session=sagemaker_session) as run:
 
 run.log_file("data/train.csv", is_output=False)
 
 hyperparameters = {
 "max_depth": "3",
 "eta": "0.2",
 "objective": "binary:logistic",
 "num_round": "100",
 "region":region
 }

 xgb_estimator = XGBoost(
 entry_point="xgboost_starter_script.py",
 source_dir="code",
 hyperparameters=hyperparameters,
 role=sagemaker_role,
 instance_count=train_instance_count,
 instance_type=train_instance_type, 
 framework_version="1.5-1",
 )
 
 xgb_estimator.fit(inputs={"train": train_s3_path})

#### Deploy model to an endpoint
We are going to enable data capturing for model monitoring

In [None]:
from sagemaker.serializers import CSVSerializer
from sagemaker.model_monitor import DataCaptureConfig

data_capture_config = DataCaptureConfig(
 enable_capture=True,
 sampling_percentage=100,
 destination_s3_uri=f"s3://{bucket}/{prefix}/monitoring/datacapture"
)


predictor = xgb_estimator.deploy(
 initial_instance_count=1, instance_type="ml.m5.xlarge", serializer=CSVSerializer(), data_capture_config=data_capture_config
)

#### Test inference on endpoint
Function below calls the sagemaker endpoint and capture the predictions to generate the confussion matrix.

In [None]:
import numpy as np
def predict(data, rows=500):
 split_array = np.array_split(data, int(data.shape[0] / float(rows) + 1))
 predictions =[]
 for array in split_array:
 predictions = predictions + sum(predictor.predict(array), [])

 return [float(i) for i in predictions]

def calibrate(probabilities, cutoff=.2):
 predictions = []
 for p in probabilities:
 if p <= cutoff:
 predictions.append(0)
 else:
 predictions.append(1)
 return predictions

You can load SageMaker Experiment at any time and track more information about the run. Here you will invoke the SageMaker endpoint for batch prediction and we will put the results in a simple [confusion matrix](https://www.dataschool.io/simple-guide-to-confusion-matrix-terminology/). We will also capture the chart and track it inside our experiment run.

After you complete the cell below, you can go to SageMaker Experiment to see all the information SageMaker has captured for you in this experiment run.

![EX01](statics/module_03_ex01.png)

In [None]:
from sklearn.metrics import f1_score

with load_run(experiment_name=experiment_name, run_name=run_name) as run:

 # run batch prediction
 probabilities = predict(test.to_numpy()[:, 1:])
 # run calibration and visualize the results
 predictions = np.asarray(calibrate(probabilities, 0.4))
 run.log_confusion_matrix(test["anomaly"], predictions, unique_name_from_base("Confusion-Matrix"))

print(f"Experiment Name: {experiment_name}\n")

print(f"Run Name: {run_name}\n")

f1 = f1_score(test["anomaly"], predictions, average='micro')

print(f"F1 Score: {f1}")
pd.crosstab(
 index=test.iloc[:, 0],
 columns=predictions,
 rownames=["actual"],
 colnames=["predictions"],
)

### Train Isolation Forest Model
----
Now let's experiment with a different approach. This time you are going to use a common unsupervised learning algorithm called isolation forest. In the cells below, you will use the full dataset to build an isolation forest model trying to isolate anomalies base on how different they are from each other. More on [isolation forest algorithm](https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.IsolationForest.html#sklearn.ensemble.IsolationForest).

Upload isolation forest dataset to S3.

In [None]:
iso_input = dataset.drop(["location_id", "anomaly", "eventtime", 
 "write_time","api_invocation_time",'is_deleted'], axis=1)
iso_input.to_csv("data/iso_input.csv", index=False)
key = f"{prefix}/data/isoforest/iso_input.csv"

s3_client.upload_file(
 Filename="data/iso_input.csv",
 Bucket=bucket,
 Key=key,
)

input_s3_path = f"s3://{bucket}/{key}"
print(f"training data is uploaded to {input_s3_path}")

In [None]:
from sagemaker.sklearn.estimator import SKLearn

run_name = unique_name_from_base("isoforest-experiment")

with Run(experiment_name=experiment_name, run_name=run_name, 
 sagemaker_session=sagemaker_session) as run:
 
 run.log_file("data/iso_input.csv", is_output=False)
 FRAMEWORK_VERSION = "1.0-1"

 sklearn = SKLearn(
 entry_point="isolation_forest_script.py",
 source_dir="code",
 framework_version="1.0-1",
 instance_count=train_instance_count,
 instance_type=train_instance_type,
 role=sagemaker_role,
 sagemaker_session=sagemaker_session,
 hyperparameters={"max_samples": 512,
 "random_state": 42,
 "region":region},
 )
 sklearn.fit({"train": input_s3_path})

#### Deploy IsoForest Model to an endpoint

In [None]:
from sagemaker.deserializers import JSONDeserializer

isoforest_predictor = sklearn.deploy(
 initial_instance_count=1, instance_type="ml.m5.xlarge", serializer=CSVSerializer(), deserializer = JSONDeserializer()
)

#### Test inference on endpoint
Capture the confussion matrix results in the experiemnt for historic reference. It turns out, Isolation forest does not perform well in this situation, so we will go with the xgboost model fore the rest of the module and lab.

In [None]:
with load_run(experiment_name=experiment_name, run_name=run_name) as run:

 results = isoforest_predictor.predict(test.to_numpy()[:, 1:])
 
 # run fix -1 value to 0
 predictions = []
 for x in results:
 if x <= 0:
 predictions.append(0)
 else:
 predictions.append(x)
 
 predictions = np.asarray(predictions)
 run.log_confusion_matrix(test["anomaly"], predictions, unique_name_from_base("IsoForest-Confusion-Matrix"))

print(f"Experiment Name: {experiment_name}\n")

print(f"Run Name: {run_name}\n")

f1 = f1_score(test["anomaly"], predictions, average='micro')

print(f"F1 Score: {f1}")

pd.crosstab(
 index=test.iloc[:, 0],
 columns=predictions,
 rownames=["actual"],
 colnames=["predictions"],
)

## Model Monitoring

![MON01](statics/module_03_monitor01.png)

Model monitoring is crucial to ensure that machine learning models continue to perform as expected after deployment. It involves tracking various metrics such as accuracy, precision, recall, and F1 score to detect and diagnose performance degradation, identify data drift, and other issues that may arise over time.

Amazon SageMaker model monitoring allows you to automatically monitor your deployed models using predefined rules, and alerts you when the model's performance deviates from the expected behavior. In this example you are going to manually setup data drift detection for our xgboost endpoint.

#### 1. Create a baselining job with training dataset
Now that you have the training data ready in Amazon S3, start a job to suggest constraints. DefaultModelMonitor.suggest_baseline(..) starts a ProcessingJob using an Amazon SageMaker provided Model Monitor container to generate the constraints.

In [None]:
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat

# this is our training dataset
baseline_data_uri = train_s3_path
baseline_results_prefix = f"{prefix}/monitoring/baselining/results"
baseline_results_uri = f"s3://{bucket}/{baseline_results_prefix}"


my_default_monitor = DefaultModelMonitor(
 role=sagemaker_role,
 instance_count=1,
 instance_type="ml.m5.xlarge",
 volume_size_in_gb=20,
 max_runtime_in_seconds=3600,
)

my_default_monitor.suggest_baseline(
 baseline_dataset=baseline_data_uri,
 dataset_format=DatasetFormat.csv(header=True),
 output_s3_uri=baseline_results_uri,
 wait=True,
)

Baseline process generates a contraints and statistics configuration files, we are going to preview the generated json files. Keep in mind, you can also supply your own.

In [None]:
result = s3_client.list_objects(Bucket=bucket, Prefix=baseline_results_prefix)
report_files = [report_file.get("Key") for report_file in result.get("Contents")]
print("Found Files:")
print("\n ".join(report_files))

**Statistics** refer to the expected statistical properties of the input data, such as mean and standard deviation

In [None]:
import pandas as pd

baseline_job = my_default_monitor.latest_baselining_job
schema_df = pd.io.json.json_normalize(baseline_job.baseline_statistics().body_dict["features"])
schema_df.head(10)

**Constraints** are rules that are used to ensure that the model's performance does not degrade beyond a certain threshold

In [None]:
constraints_df = pd.io.json.json_normalize(
 baseline_job.suggested_constraints().body_dict["features"]
)
constraints_df.head(10)

#### 2. Create a schedule to analyze collected data for data quality issues

In [None]:
from time import gmtime, strftime
from sagemaker.model_monitor import CronExpressionGenerator

mon_schedule_name = unique_name_from_base(f"{prefix}-monitoring-job")

s3_report_path = f"s3://{bucket}/{prefix}/montoring/report"

my_default_monitor.create_monitoring_schedule(
 monitor_schedule_name=mon_schedule_name,
 endpoint_input=predictor.endpoint, #predictor endpoint name
 output_s3_uri=s3_report_path,
 statistics=my_default_monitor.baseline_statistics(),
 constraints=my_default_monitor.suggested_constraints(),
 schedule_cron_expression=CronExpressionGenerator.hourly(), #Hourly
 enable_cloudwatch_metrics=True,
)

In [None]:
desc_schedule_result = my_default_monitor.describe_schedule()
print("Schedule status: {}".format(desc_schedule_result["MonitoringScheduleStatus"]))

### Generate some artificial traffic

In [None]:
count = 0
for i in range(100):

 predict(test.to_numpy()[:, 1:])
 count+=1
 if count%10 == 0:
 print(f"predicting artificial traffic batch {count} ...")

Depend on the cron job you defined, you may need to wait a bit for your monitor job to execute. Once it does, the execution will be listed below.

In [None]:
mon_executions = my_default_monitor.list_executions()
mon_executions

## Clean up

Delete feature group

In [None]:
feature_group_name = fg_name
sagemaker_client.delete_feature_group(
 FeatureGroupName= feature_group_name
)

Remove experiments

In [None]:
import time
def remove_experiment(experiment_name):
 trials = sagemaker_client.list_trials(ExperimentName=experiment_name)['TrialSummaries']
 print('TrialNames:')
 for trial in trials:
 trial_name = trial['TrialName']
 print(f"\n{trial_name}")

 components_in_trial = sagemaker_client.list_trial_components(TrialName=trial_name)
 print('\tTrialComponentNames:')
 for component in components_in_trial['TrialComponentSummaries']:
 component_name = component['TrialComponentName']
 print(f"\t{component_name}")
 sagemaker_client.disassociate_trial_component(TrialComponentName=component_name, TrialName=trial_name)
 try:
 # comment out to keep trial components
 sagemaker_client.delete_trial_component(TrialComponentName=component_name)
 except:
 # component is associated with another trial
 continue
 # to prevent throttling
 time.sleep(.5)
 sagemaker_client.delete_trial(TrialName=trial_name)
 sagemaker_client.delete_experiment(ExperimentName=experiment_name)
 print(f"\nExperiment {experiment_name} deleted")

remove_experiment(experiment_name)

Remove Model Monitor

In [None]:
my_default_monitor.delete_monitoring_schedule()

Remove endpoints

In [None]:
def remove_endpoint(endpoint_name):
 monitor_schedules = sagemaker_client.list_monitoring_schedules(EndpointName=endpoint_name)['MonitoringScheduleSummaries']
 print('Monitoring Schedule:')
 for ms in monitor_schedules:
 ms_name = ms['MonitoringScheduleName']
 print(f"\n{ms_name}")

 sagemaker_client.delete_monitoring_schedule(MonitoringScheduleName=ms_name)
 
 sagemaker_client.delete_endpoint(EndpointName=endpoint_name)
 print(f"Endpoint {endpoint_name} deleted")

#xgboost
remove_endpoint(predictor.endpoint_name)
# #isolation forest
remove_endpoint(isoforest_predictor.endpoint_name)