# A/B Testing with Amazon SageMaker

***
This notebooks is designed to run on `Python 3 (Data Science 2.0)` kernel in Amazon SageMaker Studio
***

In production ML workflows, data scientists and data engineers frequently try to improve their models in various ways, such as by performing [Perform Automatic Model Tuning](https://docs.aws.amazon.com/sagemaker/latest/dg/automatic-model-tuning.html), training on additional or more-recent data, and improving feature selection. Performing A/B testing between a new model and an old model with production traffic can be an effective final step in the validation process for a new model. In A/B testing, you test different variants of your models and compare how each variant performs relative to each other. You then choose the best-performing model to replace a previously-existing model new version delivers better performance than the previously-existing version.

Amazon SageMaker enables you to test multiple models or model versions behind the same endpoint using production variants. Each production variant identifies a machine learning (ML) model and the resources deployed for hosting the model. You can distribute endpoint invocation requests across multiple production variants by providing the traffic distribution for each variant, or you can invoke a specific variant directly for each request.

In this notebook, we'll:
* Evaluate models by invoking specific variants
* Gradually release a new model by specifying traffic distribution

Reference notebook example: [A/B Testing with Amazon SageMaker](https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker_endpoints/a_b_testing/a_b_testing.ipynb)

## Setup
Let's set up some required imports and basic initial variables:

In [None]:
%matplotlib inline
import datetime
import time
import os, sys
import boto3
import re
import json
import pandas as pd
import numpy as np
import sagemaker
from sagemaker import get_execution_role

import csv
import matplotlib.pyplot as plt
from sklearn import metrics
p = os.path.abspath('..')
if p not in sys.path:
 sys.path.append(p)
import utils

sm_session = sagemaker.Session()
role = get_execution_role()
region = sm_session.boto_region_name
bucket = sm_session.default_bucket()
sm_client = sm_session.sagemaker_client
sm_runtime = sm_session.sagemaker_runtime_client
prefix = "sagemaker/huggingface-pytorch-sentiment-analysis"
time_now = f'{datetime.datetime.now():%Y-%m-%d-%H-%M-%S}'
time_now

In [None]:
%store
%store -r

### Step 1: Deploy the models created in the previous multi-model endpoint notebook



Uncomment the below cell to view details of the `production_variant` function defined in SageMaker SDK

In [None]:
# ?? sagemaker.production_variant

In [None]:
variant1 = sagemaker.production_variant(
 model_name=roberta_mme_model_name,
 instance_type="ml.c5.2xlarge",
 initial_instance_count=1,
 variant_name="Variant1",
 initial_weight=1,
)
variant2 = sagemaker.production_variant(
 model_name=distilbert_model_name,
 instance_type="ml.c5.xlarge",
 initial_instance_count=1,
 variant_name="Variant2",
 initial_weight=1,
)

(variant1, variant2)

#### Deploy
Let's go ahead and deploy our two variants to a SageMaker endpoint:

Uncomment below cells to view the details of the functions

In [None]:
# ?? sm_session.create_endpoint

In [None]:
# ?? sm_session.endpoint_from_production_variants

In [None]:
endpoint_name = f"demo-hf-pytorch-variant-{time_now}"
print(f"EndpointName={endpoint_name}")

sm_session.endpoint_from_production_variants(
 name=endpoint_name, production_variants=[variant1, variant2]
)

## Step 2: Invoke the deployed models

You can now send data to this endpoint to get inferences in real time.



In [None]:
test_data = pd.read_csv("../sample_payload/test_data.csv", header=None)
json_data = dict({'inputs':test_data.iloc[:,0].to_list()})
batch_data = pd.read_csv("../sample_payload/batch_data.csv", header=None)

In [None]:
%%time
predictions = []

for i in range(5):
 response = sm_runtime.invoke_endpoint(
 EndpointName=endpoint_name,
 Body=json.dumps(json_data),
 ContentType="application/json",
 )
 predictions.append(response["Body"].read().decode("utf-8"))
 time.sleep(0.5)

print(*predictions, sep='\n')

### Invoke a specific variant

Now, let’s use the new feature that was released today to invoke a specific variant. For this, we simply use the new parameter to define which specific ProductionVariant we want to invoke. Let us use this to invoke Variant1 for all requests.

In [None]:
%%time
response = sm_runtime.invoke_endpoint(
 EndpointName=endpoint_name,
 Body=json.dumps(json_data),
 ContentType="application/json",
 TargetVariant=variant1["VariantName"],
)

print(response["Body"].read())

In [None]:
%%time
response = sm_runtime.invoke_endpoint(
 EndpointName=endpoint_name,
 Body=json.dumps(json_data),
 ContentType="application/json",
 TargetVariant=variant2["VariantName"],
)

print(response["Body"].read())

## Step 3: Evaluate variant performance

### Evaluating Variant 1

Using the new targeting feature, let us evaluate the accuracy, precision, recall, F1 score, and ROC/AUC for Variant1:

Note that the test data was from [Kaggle financial sentiment analysis dataset](https://www.kaggle.com/datasets/sbhatti/financial-sentiment-analysis)

In [None]:
df_data = pd.read_csv("../sample_payload/batch_data.csv")
source_data = df_data.to_json(orient='records')
json_lst = json.loads(source_data)
json_lst[0]

In [None]:
predictions1 = utils.invoke_with_single_sentence(json_lst, endpoint_name, variant1["VariantName"])

In [None]:
df = pd.DataFrame(columns=['label','score'], dtype=object)
for prediction in predictions1:
 tmp_df = pd.DataFrame(json.loads(prediction)[0])
 new_row = tmp_df[tmp_df['score']==max(tmp_df['score'])]
 df = df.append(new_row, ignore_index=True)
df.head()

In [None]:
value_map = {'LABEL_0': 0, 'LABEL_1': 1, 'LABEL_2': 2}
df = df.replace({'label': value_map})
df.head()

In [None]:
# Let's get the labels of our test set; we will use these to evaluate our predictions
df_with_labels = pd.read_csv("../sample_payload/batch_data_groundtruth.csv")

value_map = {'negative': 0, 'neutral': 1, 'positive': 2}
df_with_labels = df_with_labels.replace({'sentiment': value_map})

In [None]:
test_labels = df_with_labels.iloc[:, 1]
labels = test_labels.to_numpy()
preds = df.label.to_numpy()

# Calculate accuracy
accuracy = sum(preds == labels) / len(labels)
print(f"Accuracy: {accuracy}")


### Next, we collect data for Variant2

In [None]:
predictions2 = utils.invoke_with_single_sentence(json_lst, endpoint_name, variant2["VariantName"])

In [None]:
df2 = pd.DataFrame(columns=['label','score'], dtype=object)
for prediction in predictions2:
 tmp_df = pd.DataFrame(json.loads(prediction))
 new_row = tmp_df[tmp_df['score']==max(tmp_df['score'])]
 df2 = df2.append(new_row, ignore_index=True)
df2.head()

In [None]:
value_map = {'NEGATIVE': 0, 'POSITIVE': 1}
df2 = df2.replace({'label': value_map})
df2.head()

In [None]:
preds = df2.label.to_numpy()

# Calculate accuracy
accuracy = sum(preds == labels) / len(labels)
print(f"Accuracy: {accuracy}")

## Step 4: Dialing up our chosen variant in production

Now that we have determined Variant1 to be better as compared to Variant2, we will shift more traffic to it. 

We can continue to use TargetVariant to continue invoking a chosen variant. A simpler approach is to update the weights assigned to each variant using UpdateEndpointWeightsAndCapacities. This changes the traffic distribution to your production variants without requiring updates to your endpoint. 

Recall our variant weights are as follows:

In [None]:
{
 variant["VariantName"]: variant["CurrentWeight"]
 for variant in sm_client.describe_endpoint(EndpointName=endpoint_name)["ProductionVariants"]
}

We'll first write a method to easily invoke our endpoint (a copy of what we had been previously doing):

We invoke our endpoint for a bit, to show the even split in invocations:

In [None]:
invocation_start_time = datetime.datetime.now()
utils.invoke_endpoint_for_two_minutes(endpoint_name)
time.sleep(20) # give metrics time to catch up
params = {
 "endpoint_name": endpoint_name, 
 "variant1": variant1, 
 "variant2": variant2, 
 "start_time":invocation_start_time
}
utils.plot_endpoint_metrics(**params)

Now let us shift 75% of the traffic to Variant1 by assigning new weights to each variant using UpdateEndpointWeightsAndCapacities. Amazon SageMaker will now send 75% of the inference requests to Variant1 and remaining 25% of requests to Variant2. 

In [None]:
sm_client.update_endpoint_weights_and_capacities(
 EndpointName=endpoint_name,
 DesiredWeightsAndCapacities=[
 {"DesiredWeight": 75, "VariantName": variant1["VariantName"]},
 {"DesiredWeight": 25, "VariantName": variant2["VariantName"]},
 ],
)

In [None]:
print("Waiting for update to complete")
utils.endpoint_update_wait(endpoint_name)

{
 variant["VariantName"]: variant["CurrentWeight"]
 for variant in sm_client.describe_endpoint(EndpointName=endpoint_name)["ProductionVariants"]
}

Now let's check how that has impacted invocation metrics:

In [None]:
utils.invoke_endpoint_for_two_minutes(endpoint_name)
time.sleep(20) # give metrics time to catch up
utils.plot_endpoint_metrics(**params)

We can continue to monitor our metrics and when we're satisfied with a variant's performance, we can route 100% of the traffic over the variant. We used UpdateEndpointWeightsAndCapacities to update the traffic assignments for the variants. The weight for Variant1 is set to 0 and the weight for Variant2 is set to 1. Therefore, Amazon SageMaker will send 100% of all inference requests to Variant2.

In [None]:
sm_client.update_endpoint_weights_and_capacities(
 EndpointName=endpoint_name,
 DesiredWeightsAndCapacities=[
 {"DesiredWeight": 1, "VariantName": variant1["VariantName"]},
 {"DesiredWeight": 0, "VariantName": variant2["VariantName"]},
 ],
)
print("Waiting for update to complete")
utils.endpoint_update_wait(endpoint_name)

{
 variant["VariantName"]: variant["CurrentWeight"]
 for variant in sm_client.describe_endpoint(EndpointName=endpoint_name)["ProductionVariants"]
}

In [None]:
utils.invoke_endpoint_for_two_minutes(endpoint_name)
time.sleep(20) # give metrics time to catch up
utils.plot_endpoint_metrics(**params)

The Amazon CloudWatch metrics for the total invocations for each variant below shows us that all inference requests are being processed by Variant1 and there are no inference requests processed by Variant2.

You can now safely update your endpoint and delete Variant2 from your endpoint. You can also continue testing new models in production by adding new variants to your endpoint and following steps 2 - 4. 

## Delete the endpoint

If you do not plan to use this endpoint further, you should delete the endpoint to avoid incurring additional charges.

In [None]:
sm_session.delete_endpoint(endpoint_name)