# Shadow Variant Experiments via API


---

This notebook's CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook. 



---


## Introduction

In this notebook, we will go through the steps of deploying a pre-trained model and then deploying a possible replacement model alongside it as Shadow mode as an experiment to compare the models. We'll do this entirely in code, making use of the SageMaker API. These models are trained on network classification, tabular dataset, where they classify network traffic into 15 different classes. 

## Contents

1) [Setup](#setup)
2) [Deploy Model](#deploy)
3) [Register the Models](#register)
4) [Create a Shadow Test](#shadow)
5) [Perform Inference](#infer)
6) [Evaluate](#eval)
7) [Clean up](#clean)

We trained our models with the CSE-CIC-IDS2018 dataset by CIC and ISCX which is used for security testing and malware prevention.
This data includes a huge amount of raw network traffic logs, plus pre-processed data where network connections have been reconstructed and relevant features extracted using CICFlowMeter, a tool that outputs network connection features as CSV files. Each record is classified as benign or one of fourteen types of malicious traffic.


Class are represented and have been encoded as follows (train + validation):


| Label | Encoded | 
|:-------------------------|:-------:|
| Benign | 0 | 
| Bot | 1 | 
| DoS attacks-GoldenEye | 2 | 
| DoS attacks-Slowloris | 3 | 
| DDoS attacks-LOIC-HTTP | 4 | 
| Infilteration | 5 | 
| DDOS attack-LOIC-UDP | 6 | 
| DDOS attack-HOIC | 7 | 
| Brute Force -Web | 8 | 
| Brute Force -XSS | 9 | 
| SQL Injection | 10 | 
| DoS attacks-SlowHTTPTest | 11 | 
| DoS attacks-Hulk | 12 | 
| FTP-BruteForce | 13 | 
| SSH-Bruteforce | 14 | 

The trained models been saved to a public Amazon S3 bucket for your convenience, and labeled data is included with this notebook.

### Let's get started!

First, we set some variables, including the AWS region we are working in, the IAM (Identity and Access Management) execution role of the notebook instance and the Amazon S3 bucket where we will store data, models, outputs, etc. We will use the Amazon SageMaker default bucket for the selected AWS region, and then define a key prefix to make sure all objects have share the same prefix for easier discoverability.



## Set up

In [None]:
%pip install jsonlines --quiet
%pip install sagemaker --upgrade --quiet

In [None]:
# uncomment to reset kernel after installation

# import IPython
# IPython.Application.instance().kernel.do_shutdown(True) # automatically restarts kernel

In [None]:
import os
import time
import glob
import json
import jsonlines
import base64
import io
import datetime

import boto3
import sagemaker
from sagemaker.model_monitor import DataCaptureConfig
from sagemaker.sklearn.model import SKLearnModel

import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np
from time import sleep
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, f1_score
from sklearn.model_selection import train_test_split
from IPython.display import display, clear_output

pd.options.display.max_columns = 100

region = boto3.Session().region_name
role = sagemaker.get_execution_role()
sagemaker_session = sagemaker.Session()
bucket_name = sagemaker.Session().default_bucket()
prefix = "shadow-test"
os.environ["AWS_REGION"] = region
sm_client = boto3.Session().client("sagemaker")

print(f"REGION: {region}")
print(f"ROLE: {role}")
print(f"BUCKET: {bucket_name}")

model_bucket = f"s3://sagemaker-example-files-prod-{region}/models/shadow-test-models/"
model_source_uri = f"{model_bucket}sourcedir.tar.gz"
model1_uri = f"{model_bucket}hgb/model.tar.gz"
model2_uri = f"{model_bucket}rf/model.tar.gz"

# These are the clasifications that have been encoded as ints, we'll use these for analysis
class_list = [
 "Benign",
 "Bot",
 "DoS attacks-GoldenEye",
 "DoS attacks-Slowloris",
 "DDoS attacks-LOIC-HTTP",
 "Infilteration",
 "DDOS attack-LOIC-UDP",
 "DDOS attack-HOIC",
 "Brute Force-Web",
 "Brute Force-XSS",
 "SQL Injection",
 "DoS attacks-SlowHTTPTest",
 "DoS attacks-Hulk",
 "FTP-BruteForce",
 "SSH-Bruteforce",
]


### Create and Deploy the production model


In [None]:
sklearn_model = SKLearnModel(
 model_data=model1_uri,
 role=role,
 entry_point="histgradientboost.py",
 source_dir="./code",
 framework_version="1.0-1",
)

In [None]:
data_capture_s3 = f"s3://{bucket_name}/{prefix}/datacapture_test/"

data_capture_config = DataCaptureConfig(
 enable_capture=True, sampling_percentage=100, destination_s3_uri=data_capture_s3
)

In [None]:
predictor = sklearn_model.deploy(
 initial_instance_count=3, instance_type="ml.m5.2xlarge", data_capture_config=data_capture_config
)

In [None]:
predictor.endpoint_name

## Predict
Here we verify our endpoint is working correctly by invoking the predictor.

In [None]:
# single prediction
# We expect 4 - DDoS attacks-LOIC-HTTP as the predicted class.
test_values = [
 80,
 1056736,
 3,
 4,
 20,
 964,
 20,
 0,
 6.666666667,
 11.54700538,
 964,
 0,
 241.0,
 482.0,
 931.1691850999999,
 6.6241710320000005,
 176122.6667,
 431204.4454,
 1056315,
 2,
 394,
 197.0,
 275.77164469999997,
 392,
 2,
 1056733,
 352244.3333,
 609743.1115,
 1056315,
 24,
 0,
 0,
 0,
 0,
 72,
 92,
 2.8389304419999997,
 3.78524059,
 0,
 964,
 123.0,
 339.8873763,
 115523.4286,
 0,
 0,
 1,
 1,
 0,
 0,
 0,
 1,
 1.0,
 140.5714286,
 6.666666667,
 241.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 3,
 20,
 4,
 964,
 8192,
 211,
 1,
 20,
 0.0,
 0.0,
 0,
 0,
 0.0,
 0.0,
 0,
 0,
 20,
 2,
 2018,
 1,
 0,
 1,
 0,
]
result = predictor.predict(np.array(test_values).reshape(1, -1))
print(result)


### Register the models

In [None]:
model1_script = "histgradientboost.py"
model2_script = "randomforest.py"

image_uri = sagemaker.image_uris.retrieve("sklearn", region, version="1.0-1")

model_name1 = "PROD-HGB-{}".format(datetime.datetime.now().strftime("%Y-%m-%d-%H%M%S"))
model_name2 = "SHADOW-RF-{}".format(datetime.datetime.now().strftime("%Y-%m-%d-%H%M%S"))

print(f"Prod model name: {model_name1}")
print(f"Shadow model name: {model_name2}")

resp = sm_client.create_model(
 ModelName=model_name1,
 ExecutionRoleArn=role,
 PrimaryContainer={
 "Image": image_uri,
 "Mode": "SingleModel",
 "ModelDataUrl": model1_uri,
 "Environment": {
 "SAGEMAKER_CONTAINER_LOG_LEVEL": "20",
 "SAGEMAKER_SUBMIT_DIRECTORY": model_source_uri,
 "SAGEMAKER_PROGRAM": model1_script,
 },
 },
)

resp = sm_client.create_model(
 ModelName=model_name2,
 ExecutionRoleArn=role,
 PrimaryContainer={
 "Image": image_uri,
 "Mode": "SingleModel",
 "ModelDataUrl": model2_uri,
 "Environment": {
 "SAGEMAKER_CONTAINER_LOG_LEVEL": "20",
 "SAGEMAKER_SUBMIT_DIRECTORY": model_source_uri,
 "SAGEMAKER_PROGRAM": model2_script,
 },
 },
)


# Create a Shadow Test 

## Create a Shadow Test using an Existing Endpoint

Now we will create a shadow test using the existing production endpoint. We will pass the holdout data we set aside earlier to the endpoint. This holdout dataset simulates production traffic. 

We can stop the shadow variant test using the API later in the notebook. Note that we could also specify the test start and stop time when we create the inference experiements. If we don't provide the start and end times, then the experiment starts immediately and concludes after 7 days. We are using an existing production endpoint for this test. SageMaker will update that endpoint with the new model variants. The production endpoint will also update the inference compute instance type for the production variant if needed. 

Below is an example of a SageMaker Endpoint with a shadow variant. 


A production variant consists of the ML model, Serving Container, and ML Instance. Since each variant is independent of others, you can have different models, containers, or instance types across variants. SageMaker lets you specify autoscaling policies on a per-variant basis so they can scale independently based on incoming load. SageMaker supports up to 10 production variants per endpoint. You can either configure a variant to receive a portion of the incoming traffic by setting variant weights or specify the target variant in the incoming request. The response from the production variant is forwarded back to the invoker.

A shadow variant (new) has the same components as a production production variant. A user specified portion of the requests, known as the traffic sampling percentage (VariantWeight parameter in the ShadowProductionVariants object), is forwarded to the shadow variant. You can choose to log the response of the shadow variant in S3 or discard it. For an endpoint with a shadow variant, you can have a maximum of one production variant.

In [None]:
data_capture_s3 = f"s3://{bucket_name}/{prefix}/datacapture_test/"

In [None]:
shadowtestname = "ShadowInferenceTestExistingEP-{}".format(
 datetime.datetime.now().strftime("%Y-%m-%d-%H%M%S")
)
infexperimentarn = sm_client.create_inference_experiment(
 Name=shadowtestname,
 Type="ShadowMode",
 Description="Shadow inference test created via boto3 python API using an existing EP",
 RoleArn=role,
 EndpointName=predictor.endpoint_name,
 ModelVariants=[
 {
 "ModelName": model_name1,
 "VariantName": "AllTraffic",
 "InfrastructureConfig": {
 "InfrastructureType": "RealTimeInference",
 "RealTimeInferenceConfig": {"InstanceType": "ml.m5.2xlarge", "InstanceCount": 3},
 },
 },
 {
 "ModelName": model_name2,
 "VariantName": "Shadow-01",
 "InfrastructureConfig": {
 "InfrastructureType": "RealTimeInference",
 "RealTimeInferenceConfig": {"InstanceType": "ml.m5.2xlarge", "InstanceCount": 3},
 },
 },
 ],
 DataStorageConfig={
 "Destination": data_capture_s3,
 },
 ShadowModeConfig={
 "SourceModelVariantName": "AllTraffic",
 "ShadowModelVariants": [
 {"ShadowModelVariantName": "Shadow-01", "SamplingPercentage": 100},
 ],
 },
)

In [None]:
shadowtestdescribe = sm_client.describe_inference_experiment(Name=shadowtestname)
shadowtestdescribe

In [None]:
def wait_until_test_complete(test_name):
 print(f"Waiting on shadow test: {test_name}")
 done = False
 while not done:
 shadowtestdescribe = sm_client.describe_inference_experiment(Name=shadowtestname)
 status = shadowtestdescribe["Status"].lower()
 print(f"Status: {status}")
 if status == "failed" or status == "cancelled":
 print("Failure detected. Exiting Loop.")
 print(shadowtestdescribe)
 return
 elif shadowtestdescribe["Status"].lower() == "running":
 print("Shadow test is running! Exiting Loop.")
 return
 sleep(60)

In [None]:
wait_until_test_complete(shadowtestname)

## Simulate Production Traffic

We will now simulate the production traffic. We will loop over the production data. In a real production use case you won't need to do this since actual production data will be flowing to the production endpoint. Since our shadow test is now active the production variant and the shadow variant will recieve the inference input. Only the production output will be supplied in the response, however, since we have configured the test to capture data we will record both the production and shadow variant responses in s3. 


In [None]:
holdout = pd.read_csv("./data/holdout.csv")

In [None]:
%%time
# this should take ~ 2 minutes to complete
indexes = []
actuals = []
i = 0
for index, row in holdout.iterrows():
 vals = row.to_numpy()
 prediction = predictor.predict(
 vals[1::].reshape(1, -1), inference_id=f"shadow test, index {index}"
 )
 actuals.append(vals[0])
 indexes.append(index)

 i += 1
 if i % 1000 == 0:
 print(i)


## Now we can compare our two models
You could use an experiment like this to evaluate any aspect of model performance. Here we look at accuracy, but you might compare inference time or memory usage too. First lets grab the captured data. 

In [None]:
storage = shadowtestdescribe["DataStorageConfig"]["Destination"] + predictor.endpoint_name + "/"
storage

In [None]:
!aws s3 ls {storage}

In [None]:
!aws s3 cp {storage} ./data/datacapture/ --recursive

In [None]:
shadowfiles = glob.glob("./data/datacapture/Shadow-01/**/*.jsonl", recursive=True)
prodfiles = glob.glob("./data/datacapture/AllTraffic/**/*.jsonl", recursive=True)

In [None]:
shadowin = []
shadowout = []
shadowid = []

for f in shadowfiles:
 print(f)
 with jsonlines.open(f) as reader:
 for obj in reader:
 try:
 infid = obj["eventMetadata"]["inferenceId"].split(" ")
 shadowid.append(int(infid[-1]))

 # input to model
 model_input = base64.b64decode(obj["captureData"]["endpointInput"]["data"])
 shadowin.append(np.load(io.BytesIO(model_input))[0].tolist())

 # output from model
 model_output = base64.b64decode(obj["captureData"]["endpointOutput"]["data"])
 shadowout.append(np.load(io.BytesIO(model_output))[0])
 except:
 pass

In [None]:
shadowdf = pd.DataFrame(data=shadowout, index=shadowid, columns=["Shadow"])
shadowdf

In [None]:
shadowdf["Shadow"] = pd.to_numeric(shadowdf["Shadow"])
shadowdf["Shadow"] = shadowdf["Shadow"].astype(int)
shadowdf = pd.merge(shadowdf, holdout["Target"], left_index=True, right_index=True)
acc = accuracy_score(shadowdf["Target"], shadowdf["Shadow"])
wf1 = f1_score(shadowdf["Target"], shadowdf["Shadow"], average="weighted")
print(acc, wf1)

In [None]:
print(classification_report(shadowdf["Target"], shadowdf["Shadow"], zero_division=0))

In [None]:
fig, ax = plt.subplots(figsize=(10, 8))
cm = confusion_matrix(shadowdf["Target"], shadowdf["Shadow"])
normalized_cm = cm.astype("float") / cm.sum(axis=1)[:, np.newaxis]
clist = [class_list[i] for i in np.sort(shadowdf["Target"].unique())]
sns.heatmap(normalized_cm, ax=ax, annot=cm, fmt="", xticklabels=clist, yticklabels=clist)
plt.xlabel("Predicted")
plt.ylabel("Actual")
plt.title("Shadow Endpoint Confustion Matrix")
plt.show()

In [None]:
%%time

prodin = []
prodout = []
prodid = []

for f in prodfiles:
 print(f)
 with jsonlines.open(f) as reader:
 for obj in reader:
 try:
 infid = obj["eventMetadata"]["inferenceId"].split(" ")
 prodid.append(int(infid[-1]))

 # input to model
 model_input = base64.b64decode(obj["captureData"]["endpointInput"]["data"])
 prodin.append(np.load(io.BytesIO(model_input))[0].tolist())

 # output from model
 model_output = base64.b64decode(obj["captureData"]["endpointOutput"]["data"])
 prodout.append(np.load(io.BytesIO(model_output))[0])

 except:
 pass

In [None]:
proddf = pd.DataFrame(data=prodout, index=prodid, columns=["Prod"])
proddf

In [None]:
# Line up our production model predictions with the true value based on the index
proddf = pd.merge(proddf, holdout["Target"], left_index=True, right_index=True)

In [None]:
acc = accuracy_score(proddf["Target"], proddf["Prod"])
wf1 = f1_score(proddf["Target"], proddf["Prod"], average="weighted")
print(acc, wf1)

In [None]:
print(classification_report(proddf["Target"], proddf["Prod"]))

In [None]:
fig, ax = plt.subplots(figsize=(10, 8))
cm = confusion_matrix(proddf["Target"], proddf["Prod"])
normalized_cm = cm.astype("float") / cm.sum(axis=1)[:, np.newaxis]
sns.heatmap(normalized_cm, ax=ax, annot=cm, fmt="", xticklabels=class_list, yticklabels=class_list)
plt.xlabel("Predicted")
plt.ylabel("Actual")
plt.title("Shadow Endpoint Confustion Matrix")
plt.show()

## End the experiment and promote the shadow model to production
From the above evaluation we've decided that the shadow is ready for production. We will promote it to production as part of ending the experiment. You can also configure a similar experiment this to run automatically as part of a pipeline, and automatically promote a model if it met your criteria.

In [None]:
sm_client.stop_inference_experiment(
 Name=shadowtestname,
 ModelVariantActions={"Shadow-01": "Promote", "AllTraffic": "Remove"},
 DesiredState="Completed",
 Reason="Shadow variant performed better in validation",
)

In [None]:
# Here we show that the shadow model is now deployed to production
sm_client.describe_endpoint(EndpointName=predictor.endpoint_name)

## Clean Up

In [None]:
def wait_until_complete(test_name):
 print(f"Waiting on shadow test: {test_name}")
 done = False
 while not done:
 shadowtestdescribe = sm_client.describe_inference_experiment(Name=shadowtestname)
 status = shadowtestdescribe["Status"].lower()
 print(f"Status: {status}")
 if status == "completed":
 print("Shadow test is stopped, ok to delete. Exiting Loop.")
 return
 sleep(60)

In [None]:
wait_until_complete(shadowtestname)

In [None]:
# predictor.delete_endpoint()
sm_client.delete_inference_experiment(Name=shadowtestname)
sm_client.delete_endpoint(EndpointName=predictor.endpoint_name)

# References

* A Realistic Cyber Defense Dataset (CSE-CIC-IDS2018) - https://registry.opendata.aws/cse-cic-ids2018/
* AIM362 - Re:Invent 2019 SageMaker Debugger and Model Monitor - https://github.com/aws-samples/reinvent2019-aim362-sagemaker-debugger-model-monitor

## Notebook CI Test Results

This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.






























