{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Multi-Armed Bandits for Review Helpfulness\n", "\n", "In this notebook we will train two machine learning models to classify amazon reviews, first using an Amazon SageMaker Pipeline, and second using a Hyperparameter Tuning Job. When registered and approved in the Amazon SageMaker Model Registry, an MLOps A/B Testing Deployment pipeline will be triggered to create or update a multi-variant Amazon SageMaker endpoint. You will then perform an A/B testing simulation and visualize the results to identify the best performing model.\n", "\n", "We will be using the [spacy](https://spacy.io/) NLP library to tokenize the text and Amazon SageMaker [BlazingText](https://docs.aws.amazon.com/sagemaker/latest/dg/blazingtext.html) algorithm to train a classification model.\n", "\n", "## Prerequisites\n", "\n", "Prior to running this notebook, you will have:\n", "1. Created the A/B Testing API and Infrastructure\n", "2. Created an MLOPs A/B Testing Deployment project \n", "\n", "This notebook will take you thorugh a number of steps:\n", "1. [Data Prep](#Data-Prep)\n", "2. [Run SageMaker Pipeline](#Run-SageMaker-Pipeline)\n", "3. [Run Tuning Job](#Run-Tuning-Job)\n", "4. [Test Endpoint](#Test-Endpoint)\n", "5. [Run A/B Testing Simulation](#Run-A/B-Testing-Simulation)\n", "6. [Calling the winner](#Calling-the-winner)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%capture\n", "!pip install -U sagemaker pandas seaborn\n", "!pip install spacy\n", "!python -m spacy download en_core_web_sm" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Data Prep\n", "\n", "Download a sample form the [Amazon Customer Reviews](https://s3.amazonaws.com/amazon-reviews-pds/readme.html) dataset" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!aws s3 cp s3://amazon-reviews-pds/tsv/amazon_reviews_us_Electronics_v1_00.tsv.gz reviews.tsv.gz" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Load the compressed reviews into pandas, selecting the review headling, body and star rating (should take approx 30 seconds)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "import pandas as pd\n", "\n", "# Load a sample of the rows\n", "df_reviews = pd.read_csv('reviews.tsv.gz', compression='gzip', error_bad_lines=False, #nrows=1000,\n", " sep='\\t', usecols=['product_id', 'product_title',\n", " 'review_headline', 'review_body', 'star_rating',\n", " 'helpful_votes', 'total_votes']).dropna()\n", "df_reviews.info()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Inspect the first few rows of the dataset" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df_reviews.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Feature engineering\n", "\n", "Filter on reviews that have at least 5 votes, calculate a helpful score based and rating sentiment." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df_reviews = df_reviews[df_reviews['total_votes'] >= 5]\n", "df_reviews['helpful_score'] = df_reviews['helpful_votes'] / df_reviews['total_votes']\n", "df_reviews['sentiment'] = pd.cut(df_reviews['star_rating'], bins=[0,2,3,6], labels=['Negative','Nuetral','Positive'])\n", "df_reviews.describe()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Visualize the helpful score grouped by sentiment. We can validate a high helpfulness count is correlated with strong negative or positive reviews." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import seaborn as sns\n", "import matplotlib.pyplot as plt\n", "\n", "plt.style.use(\"dark_background\")\n", "sns.displot(df_reviews, x='helpful_score', col='sentiment', hue='star_rating', kind='kde', palette='icefire')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Group by the product and get the count of reviews, as well as sum of helpful and total votes." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df_votes = df_reviews.groupby('product_id').agg({'product_id': 'count', 'helpful_votes': 'sum', 'total_votes': 'sum'})\n", "df_votes.describe()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Filter on products that have at least 1 review and at least 5 helpful votes." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "min_reviews = 1\n", "min_helpful = 5\n", "df_votes = df_votes[(df_votes['product_id']>=min_reviews) & (df_votes['helpful_votes']>=min_helpful)]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Merge reviews with minimum helpful votes " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df_reviews = df_reviews.merge(df_votes, how='inner', left_on='product_id', right_index=True, suffixes=('','_total'))\n", "df_reviews.info()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Pick a helpful score cut-off such that we have 50% helpful rating" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df_reviews['is_helpful'] = (df_reviews['helpful_score'] > 0.80)\n", "df_reviews['is_helpful'].sum()/df_reviews['is_helpful'].count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Train/Test split\n", "\n", "Split the dataset into train (90%) / validation (5%) / test (5%) and save to file local files." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sklearn.model_selection import train_test_split\n", "\n", "train_df, val_df = train_test_split(df_reviews, test_size=0.1, random_state=42) \n", "val_df, test_df = train_test_split(val_df, test_size=0.5, random_state=42)\n", "print('split train: {}, val: {}, test: {} '.format(train_df.shape[0], val_df.shape[0], test_df.shape[0]))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Save the test dataset for evaluation" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "test_df.to_csv('test.csv', index=False, header=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Define functions to oiutput blazing text label and tokenized text from dataset" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from spacy.lang.en import English\n", "\n", "index_to_label = {0: 'NotHelpful', 1: 'Helpful'} \n", "nlp = English()\n", "tokenizer = nlp.tokenizer\n", "\n", "def labelize_df(df):\n", " return '__label__' + df['is_helpful'].apply(lambda is_helpful: index_to_label[is_helpful])\n", "\n", "def tokenize_sent(sent, max_length=1000):\n", " return ' '.join([token.text for token in tokenizer(sent)])[:max_length]\n", "\n", "def tokenize_df(df):\n", " return (df['review_headline'].apply(tokenize_sent) + ' ' + \n", " df['review_body'].apply(tokenize_sent))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Test the lableize and tokenize output " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "labelize_df(train_df.head(3)) + ' ' + tokenize_df(train_df.head(3))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create new pandas series that concatenates the label and tokenize values (this should less than 2 minutes)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "train_text = labelize_df(train_df) + ' ' + tokenize_df(train_df)\n", "val_text = labelize_df(val_df) + ' ' + tokenize_df(val_df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Save the tokenized pandas series as text without any header or index" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "train_text.to_csv('train.txt', index=False, header=False)\n", "val_text.to_csv('validation.txt', index=False, header=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Upload Data\n", "\n", "Upload data to s3 in bucket under a key prefix." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sagemaker\n", "\n", "# Get the session and default bucket\n", "role = sagemaker.get_execution_role()\n", "session = sagemaker.session.Session()\n", "bucket = session.default_bucket()\n", "\n", "# Set the prefix for this dataset\n", "prefix = 'mab-reviews-helpfulness'\n", "\n", "s3_train_uri = session.upload_data('train.txt', bucket, prefix + '/data/training')\n", "s3_val_uri = session.upload_data('validation.txt', bucket, prefix + '/data/validation')\n", "s3_output_location = 's3://{}/{}/output'.format(bucket, prefix)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Run SageMaker Pipeline\n", "\n", "### Get Project\n", "\n", "Set the project for your A/B testing pipeline\n", "\n", "👇👇👇" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "project_name = '<>' # << Update this A/B testing deployment project" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import boto3 \n", "from botocore.config import Config\n", "\n", "# Create SageMaker client with up to 10 retries\n", "config = Config(retries={\"max_attempts\": 10, \"mode\": \"standard\"})\n", "sm_client = boto3.client('sagemaker', config=config)\n", "\n", "# Get the project name\n", "response = sm_client.describe_project(ProjectName=project_name)\n", "project_id = response['ProjectId']\n", "\n", "# Define project tags which will be associated with pipeline, experiment and tuning job\n", "project_tags = [\n", " {'Key': 'sagemaker:project-name', 'Value': project_name },\n", " {'Key': 'sagemaker:project-id', 'Value': project_id }\n", "]\n", "\n", "print(f\"Project: {project_name} ({project_id})\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create an experiment to associate training jobs to this project." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from botocore.exceptions import ClientError\n", "\n", "def create_experiment():\n", " try:\n", " sm_client.create_experiment(\n", " ExperimentName=project_id,\n", " DisplayName=project_name,\n", " Description=\"A/B Testing Pipeline experiment\",\n", " Tags=project_tags\n", " )\n", " print(f\"Experiment '{project_id}' created\")\n", " except ClientError as err:\n", " errcontent = err.response[\"Error\"]\n", " if (errcontent[\"Code\"] == \"ValidationException\" and \"must be unique\" in errcontent[\"Message\"]): \n", " print(f\"Experiment '{project_id}' already exists\")\n", " else:\n", " raise err\n", "\n", "def create_trial(trial_name, display_name):\n", " try:\n", " sm_client.create_trial(\n", " TrialName=trial_name,\n", " DisplayName=display_name,\n", " ExperimentName=project_id,\n", " Tags=project_tags\n", " )\n", " print(f\"Trial '{trial_name}' created\")\n", " except ClientError as err:\n", " errcontent = err.response[\"Error\"]\n", " if (errcontent[\"Code\"] == \"ValidationException\" and \"must be unique\" in errcontent[\"Message\"]):\n", " print(f\"Trial '{trial_name}' already exists\")\n", " else:\n", " raise err\n", "\n", "create_experiment()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Define the pipeline\n", "\n", "Create a SageMaker pipeline to run the training job" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.workflow.parameters import (\n", " ParameterInteger,\n", " ParameterString,\n", ")\n", "\n", "training_instance_type = ParameterString(\n", " name=\"TrainingInstanceType\",\n", " default_value=\"ml.c5.4xlarge\"\n", ")\n", "training_instance_count = ParameterInteger(\n", " name=\"TrainingInstanceCount\",\n", " default_value=1\n", ")\n", "input_train_data = ParameterString(\n", " name=\"InputDataTrain\",\n", " default_value=s3_train_uri,\n", ")\n", "input_val_data = ParameterString(\n", " name=\"InputDataValidation\",\n", " default_value=s3_val_uri,\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create the estimator that takes the input parameters" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import boto3\n", "from sagemaker.estimator import Estimator\n", "\n", "region_name = boto3.Session().region_name\n", "image_uri = sagemaker.image_uris.retrieve(\"blazingtext\", region_name)\n", "\n", "estimator = Estimator(image_uri=image_uri,\n", " role=role, \n", " instance_count=training_instance_count, # Param\n", " instance_type=training_instance_type, # Param\n", " volume_size = 30,\n", " max_run = 360000,\n", " input_mode= 'File',\n", " output_path=s3_output_location,\n", " sagemaker_session=session)\n", "\n", "estimator.set_hyperparameters(mode=\"supervised\",\n", " epochs=10,\n", " min_epochs=5, # Min epochs before early stopping is introduced\n", " early_stopping=True,\n", " patience=2,\n", " learning_rate=0.01,\n", " min_count=2, # words that appear less than min_count are discarded \n", " word_ngrams=1, # the number of word n-gram features to use.\n", " vector_dim=16, # dimensions of embedding layer\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now, let's create the steps. First the training step" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.inputs import TrainingInput\n", "from sagemaker.workflow.steps import TrainingStep\n", "\n", "step_train = TrainingStep(\n", " name=\"TrainModel\",\n", " estimator=estimator,\n", " inputs={\n", " \"train\": TrainingInput(s3_data=input_train_data, content_type=\"text/plain\"),\n", " \"validation\": TrainingInput(s3_data=input_val_data, content_type=\"text/plain\"),\n", " },\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Then, the Register step that will add a new version to the `champion` model package group in the Model Registry" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.workflow.step_collections import RegisterModel\n", "\n", "champion_model_group = f\"{project_name}-champion\"\n", "\n", "step_register = RegisterModel(\n", " name=\"RegisterModel\",\n", " estimator=estimator,\n", " model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,\n", " content_types=[\"text/plain\"],\n", " response_types=[\"text/csv\"],\n", " inference_instances=[\"ml.t2.large\", \"ml.m5.xlarge\"],\n", " transform_instances=[\"ml.m5.xlarge\"],\n", " model_package_group_name=champion_model_group\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now, we can create the pipeline." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.workflow.pipeline import Pipeline\n", "import time\n", "\n", "ts = time.strftime('%Y-%m-%d-%H-%M-%S')\n", "\n", "pipeline_name = f\"helpful-reviews-{ts}\"\n", "pipeline = Pipeline(\n", " name=pipeline_name,\n", " parameters=[\n", " training_instance_type,\n", " training_instance_count, \n", " input_train_data,\n", " input_val_data\n", " ],\n", " steps=[step_train, step_register],\n", " sagemaker_session=session,\n", ")\n", "\n", "try:\n", " response = pipeline.create(role_arn=role)\n", "except ClientError as e:\n", " error = e.response[\"Error\"]\n", " if error[\"Code\"] == \"ValidationError\" and \"Pipeline names must be unique within\" in error[\"Message\"]:\n", " print(error[\"Message\"])\n", " response = pipeline.describe()\n", " else:\n", " raise\n", "\n", "pipeline_arn = response[\"PipelineArn\"]\n", "\n", "# Attach this to a specific project\n", "tags = sm_client.add_tags(\n", " ResourceArn=pipeline_arn,\n", " Tags=project_tags\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Start Pipeline\n", "\n", "And then, start pipeline and wait for it to complete." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "start_response = pipeline.start(parameters={\n", " \"TrainingInstanceCount\": \"1\"\n", "})\n", "\n", "pipeline_execution_arn = start_response.arn\n", "\n", "while True:\n", " resp = sm_client.describe_pipeline_execution(PipelineExecutionArn=pipeline_execution_arn)\n", " if resp['PipelineExecutionStatus'] == 'Executing':\n", " print('Running...')\n", " else:\n", " print(resp['PipelineExecutionStatus'])\n", " break\n", " time.sleep(15)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Track Lineage\n", "\n", "List lineage associated with the training job." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.lineage.visualizer import LineageTableVisualizer\n", "\n", "viz = LineageTableVisualizer(sagemaker.session.Session())\n", "\n", "# Get the training job \n", "steps = sm_client.list_pipeline_execution_steps(PipelineExecutionArn=pipeline_execution_arn)['PipelineExecutionSteps']\n", "training_job_arn = [s['Metadata']['TrainingJob']['Arn'] for s in steps if s['StepName'] == 'TrainModel'][0]\n", " \n", "viz.show(training_job_name=training_job_arn.split('/')[-1])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create an experiment trial for the pipeline, and associate the training jobs." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def associate_trial_component(source_arn, trial_name):\n", " for tc in sm_client.list_trial_components(SourceArn=source_arn)['TrialComponentSummaries']:\n", " print(f\"Associated '{tc['TrialComponentName']}'\")\n", " response = sm_client.associate_trial_component(\n", " TrialComponentName=tc['TrialComponentName'],\n", " TrialName=trial_name\n", " )\n", "\n", "trial_name = f\"{project_id}-pipeline\"\n", "create_trial(trial_name, \"pipeline\")\n", "associate_trial_component(training_job_arn, trial_name)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Then list the metrics for this experiment." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.analytics import ExperimentAnalytics\n", "import pandas as pd\n", "\n", "# Pick the key columns we are interested inv\n", "analytics_df = ExperimentAnalytics(experiment_name=project_id).dataframe()\n", "\n", "pd.set_option('display.max_colwidth', 100) # Increase column width to show full copmontent name\n", "cols = ['TrialComponentName', 'SageMaker.InstanceType', \n", " 'train:accuracy - Last', 'validation:accuracy - Last'] # return the last accuracy for training and validation\n", "analytics_df[cols].head(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Approve for deployment\n", "\n", "Finally, approve the model to trigger the A/B testing deployment" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# list all packages and select the latest one\n", "packages = sm_client.list_model_packages(ModelPackageGroupName=champion_model_group,\n", " SortBy='CreationTime', SortOrder='Descending',\n", " MaxResults=1)['ModelPackageSummaryList']\n", "\n", "# Approve the top model\n", "for package in packages:\n", " latest_model_package_arn = package['ModelPackageArn']\n", " model_package_version = latest_model_package_arn.split('/')[-1]\n", " if package['ModelApprovalStatus'] == 'PendingManualApproval':\n", " print(f\"Approving Champion Version: {model_package_version}\")\n", " model_package_update_response = sm_client.update_model_package(\n", " ModelPackageArn=latest_model_package_arn,\n", " ModelApprovalStatus=\"Approved\",\n", " )\n", " else:\n", " print(f\"Champion Version: {model_package_version} approved\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Run Tuning Job\n", "\n", "To try and improve on our model, let's run a tuning job to find the parameters to maximize accuracy, and register this model.\n", "\n", "### Setup Hyperparamter Tuning\n", "\n", "Create the [Blazing Text](https://docs.aws.amazon.com/sagemaker/latest/dg/blazingtext.html) binary classifier for review helpfulness." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import boto3\n", "from sagemaker.estimator import Estimator\n", "from sagemaker.inputs import TrainingInput\n", "from sagemaker.tuner import IntegerParameter, CategoricalParameter, ContinuousParameter, HyperparameterTuner\n", "\n", "region_name = boto3.Session().region_name\n", "image_uri = sagemaker.image_uris.retrieve(\"blazingtext\", region_name)\n", "print(f'Using container: {image_uri}')\n", "\n", "estimator = Estimator(image_uri,\n", " role, \n", " instance_count=1, \n", " instance_type='ml.c5.4xlarge',\n", " volume_size = 30,\n", " max_run = 360000,\n", " input_mode= 'File',\n", " output_path=s3_output_location,\n", " sagemaker_session=session)\n", "\n", "estimator.set_hyperparameters(mode=\"supervised\",\n", " epochs=10,\n", " min_epochs=5, # Min epochs before early stopping is introduced\n", " early_stopping=False,\n", " learning_rate=0.01,\n", " min_count=2, # words that appear less than min_count are discarded \n", " word_ngrams=1, # the number of word n-gram features to use.\n", " vector_dim=32, # dimensions of embedding layer\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Tune an Amazon SageMaker BlazingText text classification model with the following [hyperparameters](https://docs.aws.amazon.com/sagemaker/latest/dg/blazingtext-tuning.html).\n", "\n", "\n", "| Parameter Name | Parameter Type | Recommended Ranges or Values | \n", "| --- | --- | --- | \n", "| buckets | `IntegerParameterRange` | \\[1000000\\-10000000\\] | \n", "| epochs | `IntegerParameterRange` | \\[5\\-15\\] | \n", "| learning\\_rate | `ContinuousParameterRange` | MinValue: 0\\.005, MaxValue: 0\\.01 | \n", "| min\\_count | `IntegerParameterRange` | \\[0\\-100\\] | \n", "| mode | `CategoricalParameterRange` | \\[`'supervised'`\\] | \n", "| vector\\_dim | `IntegerParameterRange` | \\[32\\-300\\] | \n", "| word\\_ngrams | `IntegerParameterRange` | \\[1\\-3\\] | " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hyperparameter_ranges = {'epochs': IntegerParameter(5, 50),\n", " 'learning_rate': ContinuousParameter(0.005, 0.01),\n", " 'min_count': IntegerParameter(0, 100),\n", " 'vector_dim': ContinuousParameter(1, 10),\n", " 'word_ngrams': IntegerParameter(1, 3),\n", " 'vector_dim': IntegerParameter(32, 300)}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now, we'll create a `HyperparameterTuner` object, to which we pass:\n", "\n", "* The `BlazingText` estimator we created above\n", "* Our hyperparameter ranges\n", "* Objective metric name and definition\n", "\n", "Tuning resource configurations such as Number of training jobs to run in total and how many training jobs can be run in parallel." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "max_jobs = 9\n", "objective_name = 'validation:accuracy'\n", "tuner = HyperparameterTuner(estimator, \n", " objective_name,\n", " hyperparameter_ranges,\n", " tags=project_tags,\n", " max_jobs=max_jobs,\n", " max_parallel_jobs=3)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Launch Tuning\n", "\n", "Now we can launch a hyperparameter tuning job by calling `fit()` function. After the hyperparameter tuning job is created, we can go to SageMaker console to track the progress of the hyperparameter tuning job until it is completed (will take approx 30 minutes)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "input_train = TrainingInput(s3_data=s3_train_uri, content_type=\"text/plain\")\n", "input_val = TrainingInput(s3_data=s3_val_uri, content_type=\"text/plain\")\n", "data_channels = {'train': input_train, 'validation': input_val}\n", "\n", "tuner.fit(inputs=data_channels)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's just run a quick check of the hyperparameter tuning jobs status to make sure it started successfully." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "job_name = tuner.latest_tuning_job.job_name\n", "tuning_job_status = sm_client.describe_hyper_parameter_tuning_job(\n", " HyperParameterTuningJobName=job_name)['HyperParameterTuningJobStatus']\n", "if tuning_job_status != 'Completed':\n", " raise Exception(f'Tuning Jobs status is {tuning_job_status}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Analyse results\n", "\n", "Get the top list of training jobs, sorted by the objective metric" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tuner_analytics = sagemaker.HyperparameterTuningJobAnalytics(job_name)\n", "jobs_df = tuner_analytics.dataframe()\n", "jobs_df = jobs_df[jobs_df['TrainingJobStatus'] == 'Completed'].sort_values('FinalObjectiveValue', ascending=False)\n", "jobs_df.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You may want to know the correlation between your objective metric and individual hyperparameters you've selected to tune. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import bokeh\n", "from bokeh.io import output_notebook\n", "from bokeh.plotting import figure, show\n", "from bokeh.models import HoverTool\n", "\n", "output_notebook()\n", "\n", "class HoverHelper():\n", " def __init__(self, tuning_analytics):\n", " self.tuner = tuning_analytics\n", "\n", " def hovertool(self):\n", " tooltips = [\n", " (\"FinalObjectiveValue\", \"@FinalObjectiveValue\"),\n", " (\"TrainingJobName\", \"@TrainingJobName\"),\n", " ]\n", " for k in self.tuner.tuning_ranges.keys():\n", " tooltips.append( (k, \"@{%s}\" % k) )\n", " ht = HoverTool(tooltips=tooltips)\n", " return ht\n", "\n", " def tools(self, standard_tools='pan,crosshair,wheel_zoom,zoom_in,zoom_out,undo,reset'):\n", " return [self.hovertool(), standard_tools]\n", "\n", "def compare_metrics(tuner_analytics, jobs_df):\n", " hover = HoverHelper(tuner_analytics)\n", " figures = []\n", " for hp_name, hp_range in tuner_analytics.tuning_ranges.items():\n", " categorical_args = {}\n", " if hp_range.get('Values'):\n", " # This is marked as categorical. Check if all options are actually numbers.\n", " def is_num(x):\n", " try:\n", " float(x)\n", " return 1\n", " except:\n", " return 0 \n", " vals = hp_range['Values']\n", " if sum([is_num(x) for x in vals]) == len(vals):\n", " # Bokeh has issues plotting a \"categorical\" range that's actually numeric, so plot as numeric\n", " print(f\"Hyperparameter {hp_name} is tuned as categorical, but all values are numeric\")\n", " else:\n", " # Set up extra options for plotting categoricals. A bit tricky when they're actually numbers.\n", " categorical_args['x_range'] = vals\n", "\n", " # Now plot it\n", " p = figure(plot_width=500, plot_height=500, \n", " title=f\"Objective vs {hp_name}\",\n", " tools=hover.tools(),\n", " x_axis_label=hp_name, y_axis_label=objective_name,\n", " **categorical_args)\n", " p.circle(source=jobs_df, x=hp_name, y='FinalObjectiveValue')\n", " figures.append(p)\n", " show(bokeh.layouts.Column(*figures))\n", " \n", "compare_metrics(tuner_analytics, jobs_df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create an experiment trial for the tuning job, and associate the training jobs (this will take < 20 seconds)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "\n", "# Create the tuning trial\n", "trial_name = f\"{project_id}-tuning-job\"\n", "create_trial(trial_name, \"tuning-job\")\n", "\n", "for i, tjs in enumerate(tuner_analytics.training_job_summaries()):\n", " associate_trial_component(tjs['TrainingJobArn'], trial_name)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Register Model\n", "\n", "Register and approve the best performing model in the `challenger` model package group in the Model Registry." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "best_estimator = tuner.best_estimator()\n", "\n", "challenger_model_group = f\"{project_name}-challenger\"\n", "\n", "model_package = best_estimator.register(\n", " content_types=[\"text/plain\"],\n", " response_types=[\"text/csv\"],\n", " inference_instances=[\"ml.t2.large\", \"ml.m5.xlarge\"],\n", " transform_instances=[\"ml.m5.xlarge\"],\n", " model_package_group_name=challenger_model_group,\n", " approval_status=\"Approved\"\n", ")\n", "\n", "model_package_version = model_package.model_package_arn.split('/')[-1]\n", "print(f\"Registered and Approved Challenger Version: {model_package_version}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "List the top `champion` and `challenger` approved models to verify that these have been approved." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "cols = ['ModelPackageGroupName', 'ModelPackageVersion', 'CreationTime', 'ModelApprovalStatus']\n", "champion_response = sm_client.list_model_packages(ModelPackageGroupName=champion_model_group)\n", "challenger_response = sm_client.list_model_packages(ModelPackageGroupName=challenger_model_group)\n", "champion_df = pd.DataFrame(champion_response['ModelPackageSummaryList'], columns=cols).head(1)\n", "challenger_df = pd.DataFrame(challenger_response['ModelPackageSummaryList'], columns=cols).head(1)\n", "pd.concat([champion_df, challenger_df])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You should now have an AWS CodePipeline job running that will update your endpoint with these latest models." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Test Endpoint\n", "\n", "### Invoke endpoint\n", "\n", "Ensure that our endpoint is in service. Set the `stage_name` here if not `dev`\n", "\n", "👇👇👇" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "stage_name = 'dev' # << Update this with the stage name\n", "endpoint_name = f'sagemaker-{project_name}-{stage_name}' \n", "\n", "endpoint_status = sm_client.describe_endpoint(EndpointName = endpoint_name)['EndpointStatus']\n", "if endpoint_status != 'InService':\n", " raise Exception(f'Endpoint {endpoint_name} status is: {endpoint_status}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Firstly we load back the test dataset that we held out during training." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "\n", "test_df = pd.read_csv(\"test.csv\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create a method to get predictions in batches" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import boto3\n", "import json\n", "\n", "runtime = boto3.Session().client('sagemaker-runtime')\n", "\n", "def chunker(seq, batch_size):\n", " return (seq[pos:pos + batch_size] for pos in range(0, len(seq), batch_size))\n", "\n", "def parse_predictions(results):\n", " return [(result['label'][0] == '__label__Helpful', result['prob'][0]) for result in results] \n", "\n", "def predict(endpoint_name, variant_name, data, batch_size=50):\n", " all_predictions = []\n", " for array in chunker(data, batch_size):\n", " payload = {\"instances\" : array, \"configuration\": {\"k\": 1} }\n", " try:\n", " response = runtime.invoke_endpoint(\n", " EndpointName = endpoint_name, \n", " TargetVariant = variant_name,\n", " ContentType = 'application/json', \n", " Body = json.dumps(payload))\n", " predictions = json.loads(response['Body'].read()) \n", " all_predictions += parse_predictions(predictions)\n", " except Exception as e:\n", " print(e)\n", " print(payload)\n", " return all_predictions" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Get the variant names for the deployed endpoint" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "variant_names = [pv['VariantName'] for pv in sm_client.describe_endpoint(EndpointName=endpoint_name)['ProductionVariants']]\n", "if len(variant_names) < 2:\n", " raise Exception('Require at least 2 variants for A/B Testing')\n", "\n", "variant_names" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Make the predictions in batchs of 50 (should take < 20 seconds)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "batch_size = 50\n", "test_input = tokenize_df(test_df).to_list()\n", "test_preds = [predict(endpoint_name, variant_name, test_input, batch_size) for variant_name in variant_names]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Join the dataset with test set to evaluate performance" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def join_test(test_df, predictions, variant_name):\n", " pred_df = pd.DataFrame(predictions, columns=['is_helpful_prediction', 'is_helpful_prob'])\n", " pred_df['variant_name'] = variant_name\n", " return test_df[['is_helpful']].join(pred_df)\n", "\n", "# Create a combined dataset for each variant and output accuracy\n", "join_df = pd.concat([join_test(test_df, predictions, variant_names[i]) for (i, predictions) in enumerate(test_preds)])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Evaulate variants\n", "\n", "Output the test accuracy metric for each variant" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sklearn.metrics import accuracy_score, f1_score\n", "\n", "join_df.groupby('variant_name').apply(lambda g: accuracy_score(g['is_helpful_prediction'], g['is_helpful']))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Get the confusion matrix for this binary classifier" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sklearn.metrics import confusion_matrix\n", "import seaborn as sns\n", "import numpy as np\n", "\n", "def draw_heatmap(data, **kwargs):\n", " cf_matrix = confusion_matrix(data['is_helpful'], data['is_helpful_prediction'])\n", " print(data['variant_name'].unique()[0], cf_matrix)\n", " cf_percent = cf_matrix / cf_matrix.astype(np.float).sum(axis=1)\n", " sns.heatmap(cf_percent, annot=True, fmt=\".4%\")\n", "\n", "fg = sns.FacetGrid(join_df, col='variant_name', height=4)\n", "fg.map_dataframe(draw_heatmap)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Plot the ROC curve for the binary classification." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sklearn.metrics import roc_curve, roc_auc_score\n", "import matplotlib.pyplot as plt\n", "\n", "def plot_roc(data):\n", " # Create horizontally stack plots (max is to to create at least 2 charts)\n", " fig, axs = plt.subplots(1, max(len(variant_names), 2), figsize=(9,4))\n", " fig.suptitle('Receiver Operating Chracteristic (ROC) curves by Variant')\n", " colors = sns.color_palette(\"Paired\")\n", " # Plot the ROC curves by variant name\n", " for i, (variant_name, pred_df) in enumerate(data.groupby('variant_name')): \n", " # Get true probability for ROC\n", " tp = pred_df.apply(lambda r: r['is_helpful_prob'] if r['is_helpful_prediction'] else \n", " 1-r['is_helpful_prob'], axis=1)\n", " fpr, tpr, _ = roc_curve(pred_df['is_helpful'], tp)\n", " auc = roc_auc_score(pred_df['is_helpful'], tp)\n", " axs[i].plot(fpr,tpr,color=colors[i],label=f\"{variant_name} auc={auc:.4f}\") \n", " axs[i].plot([0, 1], [0, 1], linestyle='--')\n", " axs[i].set_xlabel(\"False Poitive Rate\")\n", " axs[i].set_ylabel(\"True Positive Rate\")\n", " axs[i].legend(loc=4)\n", " \n", "plot_roc(join_df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Run A/B Testing Simulation\n", "\n", "### Define API Methods\n", "\n", "Set the REST API Endpoint to the value output in the AWS CloudFormation that creatied the A/B testing infrastructure.\n", "\n", "👇👇👇" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "rest_api = 'https://<>.execute-api.<>.amazonaws.com/prod/' # << Update this with Rest API output" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can run a simulation by sending traffic to the REST API is a wrapper around your deployed SageMaker endpoint. The API will\n", "\n", "1. Assigning users to variants based on the **Multi-Armed Bandit** strategy configured for the `endpoint_name`.\n", "2. Invoking the target endpoint variant, and capturing the `invocation` event.\n", "3. Recording a `conversion` against a user with an optional `reward`.\n", "\n", "We define some methods that to call the rest API using the python requests library:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import json\n", "import os\n", "import requests\n", "\n", "def api_invocation(user_id, text_array):\n", " payload = {\n", " \"user_id\": str(user_id),\n", " \"endpoint_name\": endpoint_name, \n", " \"content_type\": \"application/json\",\n", " \"data\": json.dumps({\"instances\" : text_array, \"configuration\": { \"k\": 1 }}), \n", " }\n", " rest_url = os.path.join(rest_api, 'invocation')\n", " r = requests.post(rest_url, data=json.dumps(payload))\n", " return r.json()\n", "\n", "def api_conversion(payload):\n", " rest_url = os.path.join(rest_api, 'conversion')\n", " r = requests.post(rest_url, data=json.dumps(payload))\n", " return r.json()\n", "\n", "def api_stats():\n", " payload = {\n", " \"endpoint_name\": endpoint_name, \n", " }\n", " rest_url = os.path.join(rest_api, 'stats')\n", " r = requests.post(rest_url, data=json.dumps(payload))\n", " return r.json()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Confirm that our metrics are all clear before we start our test." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "api_stats()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create a small batches of inputs and test results, to find the most helpful review per batch" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "batch_size = 20\n", "input_batch = list(chunker(test_input, batch_size))\n", "test_batch = list(chunker(test_df, batch_size))\n", "len(input_batch), len(test_batch)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Call invocation API with a batch of inputs, if the most helpful is matched with ground truth call conversion API." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def api_predict(i):\n", " result = api_invocation(i, input_batch[i])\n", " if 'predictions' in result:\n", " predictions = parse_predictions(result['predictions']) \n", " # Join predictions with test results\n", " pred_df = join_test(test_batch[i].reset_index(drop=True), predictions, result['endpoint_variant'])\n", " # Set the batch, invocattion and reward fields\n", " pred_df['strategy'] = result['strategy']\n", " pred_df['batch'], pred_df['invocation'] = i, 1\n", " # Set the reward based on prediction having a probability above a threshold\n", " pred_df['reward'] = pred_df.apply(lambda r: r['is_helpful'] == r['is_helpful_prediction'], axis=1).astype(int)\n", " # Get top helpful review based on probability\n", " pred_top = pred_df.sort_values(['is_helpful_prediction', 'is_helpful_prob'], ascending=False).head(1)\n", " reward = float(pred_top['reward'].sum())\n", " # Record converison if reward\n", " if reward > 0:\n", " api_conversion({ **result, 'reward': reward })\n", " return pred_top\n", " else:\n", " print('error, no predictions', result)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Get predictions for the batch of results, output progress (this should take approx 2 minutes)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import tqdm.notebook as tq\n", "\n", "batch_df = pd.concat(api_predict(i) for i in tq.tqdm(range(len(input_batch)))).reset_index(drop=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Output the bandit strategies used these invocations" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "batch_df.groupby(['variant_name', 'strategy'])['is_helpful_prediction'].count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Visualize variant reward\n", "\n", "Group the batch results by varianta rate to summarise the total number of invcations, reward and calculated rate." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "variant_df = batch_df.groupby('variant_name').agg({\n", " 'invocation': 'count', \n", " 'reward': 'sum'\n", "})\n", "variant_df['reward_rate'] = variant_df.apply(lambda r: r['reward']/r['invocation'], axis=1)\n", "variant_df.sort_values('reward_rate', ascending=False)\n", "variant_df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Add cummulative invocation and reward by variant to compare performance over batch" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cum_df = batch_df.groupby(\n", " ['variant_name', 'batch'])[['invocation','reward']].sum().groupby(level=0).cumsum() #.reset_index()\n", "cum_df['reward_rate'] = cum_df.apply(lambda r: 1.0 * r['reward'] / r['invocation'], axis=1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Graph the cumiative reward by variant" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import seaborn as sns\n", "\n", "sns.lineplot(data=cum_df, x=\"batch\", y=\"reward\", hue=\"variant_name\", palette=\"Paired\").set_title(\"Reward by Variant\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Graph the reward rate by variant" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sns.lineplot(data=cum_df, x=\"batch\", y=\"reward_rate\", hue=\"variant_name\", palette=\"Paired\").set_title(\"Reward Rate by Variant\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Plot Beta Distribution\n", "\n", "Creates a series of 9 buckets to get invocation and reward for plotting " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Cut the results into 9 bucket, and plot on a 3x3 chart\n", "bucket = 9\n", "beta_df = cum_df.reset_index()\n", "beta_df['bucket'] = pd.cut(beta_df['batch'], [0,2,5,10,20,50,100,200,500,1000], right=True)\n", "beta_df = beta_df.groupby(['variant_name', 'bucket'])[['invocation', 'reward']].max()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Plot the beta distribution for each bucket" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from scipy import stats\n", "\n", "%matplotlib inline\n", "colors = sns.color_palette(\"Paired\")\n", "plt.style.use(\"dark_background\")\n", "\n", "def plot_beta(beta_df):\n", " fig, axs = plt.subplots(3, 3, figsize=(12, 12))\n", " fig.subplots_adjust(hspace = .5, wspace=.1)\n", " fig.suptitle('Probability Distribution of Each Variant', fontsize=16, y=1.02)\n", " fig.subplots_adjust(top=0.88)\n", " axs = axs.ravel()\n", " \n", " x = np.arange (0, 1.001, 0.001)\n", " for i, (b, bg) in enumerate(beta_df.groupby('bucket')):\n", " for j, (variant_name, vg) in enumerate(bg.groupby('variant_name')):\n", " v = vg.max()\n", " alpha, beta = 1+v['reward'], 1+v['invocation']-v['reward']\n", " axs[i].plot(x, stats.beta.pdf(x, alpha, beta), label=variant_name, color=colors[j])\n", " axs[i].set_title(f'Batch #{int(b.right)}')\n", " axs[i].grid(False)\n", "\n", " handles, labels = axs[0].get_legend_handles_labels()\n", " fig.legend(handles, labels, loc='upper right')\n", " fig.text(0.5, -.01, 'Expected Reward', ha='center', fontsize=16)\n", " fig.text(-.01, 0.5, 'Probability Density', va='center', fontsize=16, rotation='vertical')\n", " fig.tight_layout()\n", " plt.show()\n", " \n", "plot_beta(beta_df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Calling the winner\n", "\n", "Assuming a normal distribution, let's evaluate a confidence score for the best performing variant." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from scipy.stats import norm\n", "\n", "def plot_distribution(variant_a, rate_a, num_a, variant_b, rate_b, num_b):\n", " # Calculate the standard error for a and b\n", " std_a = np.sqrt(rate_a * (1 - rate_a) / num_a)\n", " std_b = np.sqrt(rate_b * (1 - rate_b) / num_b)\n", " z_score = (rate_b - rate_a) / np.sqrt(std_a**2 + std_b**2)\n", " p_value = norm().sf(z_score)\n", " conf = norm.cdf(z_score)\n", " # Plot distribution\n", " click_rate = np.linspace(0, 1.0, 200)\n", " prob_a = norm(rate_a, std_a).pdf(click_rate)\n", " prob_b = norm(rate_b, std_b).pdf(click_rate) \n", " plt.plot(click_rate, prob_a, label=variant_a, color=colors[0])\n", " plt.plot(click_rate, prob_b, label=variant_b, color=colors[1])\n", " plt.legend(frameon=True)\n", " plt.xlabel(\"Conversion rate\"); plt.ylabel(\"Probability\");\n", " plt.title(f\"zscore is {z_score:0.3f}, with p-value {p_value:0.3f}\")\n", " return z_score, p_value, conf\n", "\n", "# Lookup metrics for the variants\n", "variant_a, variant_b = variant_names[:2]\n", "rate_a, num_a = variant_df.loc[variant_a]['reward_rate'], variant_df.loc[variant_a]['invocation']\n", "rate_b, num_b = variant_df.loc[variant_b]['reward_rate'], variant_df.loc[variant_b]['invocation']\n", "rate_diff = (rate_b - rate_a) / rate_a\n", "rate_dir = f'{rate_diff:.2%} higher' if rate_diff > 0 else f'{-rate_diff:.2%} lower'\n", "print(f'{variant_b} conversion rate ({rate_b:.2%}) is {rate_dir} than {variant_a} rate ({rate_a:.2%})')\n", "\n", "# Plot distribution and get confidence interval\n", "z_score, p_value, conf = plot_distribution(variant_a, rate_a, num_a, variant_b, rate_b, num_b)\n", "\n", "if rate_diff > 0 and conf > 0.95:\n", " print(f'The result is statisticially significant')\n", "conf" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Promote Model\n", "\n", "If our new estimator is a winning model, we can register that model in the `Champion` model package group in the registry to trigger a new deployment with this single variant." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model_package = best_estimator.register(\n", " content_types=[\"text/plain\"],\n", " response_types=[\"text/csv\"],\n", " inference_instances=[\"ml.t2.large\", \"ml.m5.xlarge\"],\n", " transform_instances=[\"ml.m5.xlarge\"],\n", " model_package_group_name=champion_model_group,\n", " approval_status=\"Approved\"\n", ")\n", "\n", "model_package_version = model_package.model_package_arn.split('/')[-1]\n", "print(f\"Registered and Approved Champion Version: {model_package_version}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Monitoring\n", "\n", "### Metrics\n", "\n", "AWS CloudWatch metrics are published to every time metrics are updated in Amazon DynamoDB.\n", "\n", "The following metrics are recorded against dimensions `EndpointName` and `VariantName` in namespace `aws/sagemaker/Endpoints/ab-testing`\n", "* `Invocations`\n", "* `Conversions`\n", "* `Rewards`\n", "\n", "### Traces\n", "\n", "The API Lambda functions are instrumented with [AWS X-Ray](https://aws.amazon.com/xray/) so you can inspect the latency for all downstream services including\n", "* DynamoDB\n", "* Amazon SageMaker\n", "* Kinesis Firehose" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Clean up\n", "\n", "Start by deleting the AWS CloudFormation stack created to provision the SageMaker endpoint. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "deploy_stack = f\"sagemaker-{project_name}-deploy-{stage_name}\"\n", "!aws cloudformation delete-stack --stack-name $deploy_stack" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Then empty the S3 bucket containing the artifacts output from the A/B Testing deployment pipeline." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "s3_artifact_path = f\"s3://sagemaker-{project_id}-artifact-{stage_name}-{region_name}\"\n", "!aws s3 rm --recursive $s3_artifact_path" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Once complete, delete the project which removes the AWS CloudFormation stack that created the CodePipeline." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!aws sagemaker delete-project --project-name $project_name" ] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-west-1:742091327244:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 4 }