# Model Deployment and Monitoring

In this notebook, you will see how to manually deploy a DevOps workflow, taking the model you trained in the previous notebook, deploying it into production, and monitoring the model endpoint. 

Start this notebook by first deploying a model into production.

Follow the steps below to manually deploy the most recent training job and set up the endpoint with data capture enabled.

For this notebook, you will use the public [Credit Card default dataset](https://archive.ics.uci.edu/ml/datasets/default+of+credit+card+clients) downloaded from UCI. The data set was originally presented as part of the paper cited below.

 Yeh, I. C., & Lien, C. H. (2009). The comparisons of data mining techniques for the predictive accuracy of probability of default of credit card clients. Expert Systems with Applications, 36(2), 2473-2480.

Since this notebook is not connected to the internet the dataset has been provided, locally, to this notebook for you.

### Load necessary libraries

In [None]:
# Let's inspect the role we have created for our notebook here:
import boto3
import json
import numpy as np
import pandas as pd
import os
import sagemaker
from sagemaker import get_execution_role
from time import sleep, gmtime, strftime
import time
from threading import Thread

role = get_execution_role()
sess = sagemaker.Session()
region = boto3.session.Session().region_name
sm = boto3.Session().client('sagemaker')
print ("Executing in region {} with role {}".format (region, role))

# retrieve stored variables from previous notebook
%store -r trial_name 
%store -r experiment_name 
%store -r training_job_name

In [None]:
# Import SageMaker Experiments 
from sagemaker.analytics import ExperimentAnalytics
from smexperiments.experiment import Experiment
from smexperiments.trial import Trial
from smexperiments.trial_component import TrialComponent
from smexperiments.tracker import Tracker

In [None]:
# let's load the trial from the previous session
cc_trial = Trial.load(sagemaker_boto_client=sm, trial_name=trial_name)

In [None]:
# Import Model Monitor API
from sagemaker.model_monitor import DataCaptureConfig
from sagemaker import RealTimePredictor
from sagemaker.predictor import csv_serializer

In [None]:
# Create Networking configuration required for all APIs
from sagemaker.network import NetworkConfig
import sagemaker_environment as smenv

cmk_id = smenv.SAGEMAKER_KMS_KEY_ID 
sec_groups = smenv.SAGEMAKER_SECURITY_GROUPS
subnets = smenv.SAGEMAKER_SUBNETS
network_config = NetworkConfig(security_group_ids=sec_groups, subnets =subnets)

In [None]:
# You have already created buckets as part of the Secure Data Science Workshop. Create references to these buckets for later use.

# raw_bucket: stores raw data and any preprocessing job related code.
# data_bucket: stores train/test data for training/validating ML models.
# output_bucket: where the model artifacts and outputs will be stored.
# For this demo, these buckets are the same, but as best practice, you probably want to keep them separate with different permissions.

raw_bucket = smenv.SAGEMAKER_DATA_BUCKET #alternatively you can replace with your own buckets
data_bucket = smenv.SAGEMAKER_DATA_BUCKET # alternatively you can replace with your own buckets
output_bucket = smenv.SAGEMAKER_MODEL_BUCKET # alternatively you can replace with your own buckets
prefix = 'secure-sagemaker-demo'
print("Data bucket is s3://{}".format (data_bucket))

## Setup an endpoint with Data Capture enabled

### Load Trained Model

First we setup a live endpoint to capture inference requests

In [None]:
sm_client = boto3.client('sagemaker')

latest_training_job = sm_client.list_training_jobs(
 MaxResults=1,
 SortBy='CreationTime',
 SortOrder='Descending')

training_job_name = TrainingJobName = latest_training_job['TrainingJobSummaries'][0]['TrainingJobName']

training_job_description = sm_client.describe_training_job(TrainingJobName=training_job_name)

model_data = training_job_description['ModelArtifacts']['S3ModelArtifacts']
container_uri = training_job_description['AlgorithmSpecification']['TrainingImage']

To deploy the model, you need to provide a security group and a subnet to deploy the endpoint into your VPC.

In [None]:
# create a model.
def create_model(role, model_name, container_uri, model_data):
 return sm_client.create_model(
 ModelName=model_name,
 PrimaryContainer={
 'Image': container_uri,
 'ModelDataUrl': model_data,
 },
 VpcConfig={
 'SecurityGroupIds': sec_groups,
 'Subnets': subnets
 }, 
 ExecutionRoleArn=role,
 EnableNetworkIsolation=False 
 )
 
model_name = "{}-model".format (training_job_name)
try:
 model = create_model(role, model_name, container_uri, model_data)
except Exception as e:
 sm_client.delete_model(ModelName=model_name)
 model = create_model(role, model_name, container_uri, model_data)
 
 
print('Model created: '+model['ModelArn'])


Specify a Capture Config to capture a percentage of the incoming requests being served by the endpoint. Here you set the capture percentage to `100` to capture all traffic.

In [None]:
s3_capture_upload_path = 's3://{}/{}/monitoring/datacapture'.format(data_bucket, prefix)
data_capture_configuration = {
 "EnableCapture": True,
 "InitialSamplingPercentage": 100,
 "DestinationS3Uri": s3_capture_upload_path,
 "CaptureOptions": [
 { "CaptureMode": "Output" },
 { "CaptureMode": "Input" }
 ],
 "CaptureContentTypeHeader": {
 "CsvContentTypes": ["text/csv"],
 "JsonContentTypes": ["application/json"]
 }
}

In [None]:
def create_endpoint_config(endpoint_name, model_name, data_capture_config): 
 return sm_client.create_endpoint_config(
 EndpointConfigName="{}-config".format (endpoint_name),
 ProductionVariants=[
 {
 'VariantName': 'AllTraffic',
 'ModelName': model_name,
 'InitialInstanceCount': 1,
 'InstanceType': 'ml.m5.xlarge',
 'InitialVariantWeight': 1.0,
 },
 ],
 DataCaptureConfig=data_capture_config)


endpoint_name = "{}-endpoint".format (training_job_name)
try:
 endpoint_config = create_endpoint_config(endpoint_name, model_name, data_capture_configuration)
except Exception as e:
 sm_client.delete_endpoint_config(EndpointConfigName=endpoint_name+'-config')
 endpoint_config = create_endpoint_config(endpoint_name, model_name, data_capture_configuration)

print('Endpoint configuration created: '+ endpoint_config['EndpointConfigArn'])


In [None]:
def create_endpoint(endpoint_name, config_name):
 return sm_client.create_endpoint(
 EndpointName=endpoint_name,
 EndpointConfigName=config_name)


try:
 endpoint = create_endpoint(endpoint_name, endpoint_name+'-config')
except Exception as e:
 sm_client.delete_endpoint(EndpointName=endpoint_name)
 endpoint = create_endpoint(endpoint_name, endpoint_name+'-config')

print('Endpoint created: '+ endpoint['EndpointArn'])


** WAIT **
Even though it says that the endpoint has been created, it may still be in the "Creating" stage as it takes some time to set up an HTTPs endpoint behind the scenes. Run this command below and wait for it to get to the **CREATED** status before proceeding

In [None]:
from time import sleep

status = sm_client.describe_endpoint(EndpointName=endpoint_name)['EndpointStatus']
print(status)
while status == 'Creating': 
 sleep (60)
 status = sm_client.describe_endpoint(EndpointName=endpoint_name)['EndpointStatus']
 print (status)

### Test the endpoint

Let's throw some payload at this endpoint and make some predictions

In [None]:
predictor = RealTimePredictor(endpoint_name, content_type = 'text/csv')

In [None]:
!head -10 test_data.csv > test_sample.csv

In [None]:
with open('test_sample.csv', 'r') as f:
 for row in f:
 payload = row.rstrip('\n')
 response = predictor.predict(data=payload[2:])
 print (response)
 sleep(0.5)
print('done!')


In [None]:
# Extract the captured json files.
data_capture_prefix = '{}/monitoring/datacapture'.format(prefix)
s3_client = boto3.Session().client('s3')
current_endpoint_capture_prefix = '{}/{}/AllTraffic'.format(data_capture_prefix, endpoint_name)
result = s3_client.list_objects(Bucket=data_bucket, Prefix=current_endpoint_capture_prefix)
if 'Contents' not in result:
 print ("No capture files present yet.")
else:
 capture_files = [capture_file.get("Key") for capture_file in result.get('Contents')]
 print("Found {} Capture Files:".format (len(capture_files)))
 for capture_file in capture_files[-5:]:
 print ("s3://{}".format (capture_file))


In [None]:
# View contents of the captured file.
def get_obj_body(bucket, obj_key):
 return s3_client.get_object(Bucket=data_bucket, Key=obj_key).get('Body').read().decode("utf-8")

capture_file = get_obj_body(data_bucket, capture_files[-1])
print(json.dumps(json.loads(capture_file.split('\n')[5]), indent = 2, sort_keys =True))


### Part 7: Real time Model monitoring

Now we set up ModelMonitoring in Real Time

Copy over the training dataset to Amazon S3 (if you already have it in Amazon S3, you could reuse it).
Everything is logged in a separate bucket to provide flexibility and security for teams who require different team members to have different levels of permissions. 

Use the model bucket to capture API calls to monitored models and also to log monitoring calls made for a given model.

In [None]:
model_prefix = prefix + "/" + model_name
baseline_prefix = model_prefix + '/baselining'
baseline_data_prefix = baseline_prefix + '/data'
baseline_results_prefix = baseline_prefix + '/results'

baseline_data_uri = 's3://{}/{}'.format(output_bucket,baseline_data_prefix)
baseline_results_uri = 's3://{}/{}'.format(output_bucket, baseline_results_prefix)
print('Baseline data uri: {}'.format(baseline_data_uri))
print('Baseline results uri: {}'.format(baseline_results_uri))

In [None]:
train_data_header_location = "s3://" + data_bucket + '/' + prefix + '/train_headers'
print(train_data_header_location)

### Start a baselining job

To Monitor data drift, you first need a baseline to monitor against. To create this baseline have the Model Monitor service extract baseline statistics from your training dataset. 

All the outputs generated by the Monitoring Service will be stored in the output_bucket.

In [None]:
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat

my_default_monitor = DefaultModelMonitor(
 role=role,
 instance_count=1,
 instance_type='ml.m5.xlarge',
 volume_size_in_gb=20,
 max_runtime_in_seconds=3600,
 network_config=network_config,
 volume_kms_key=cmk_id
)

my_default_monitor.suggest_baseline(
 baseline_dataset=train_data_header_location +'/train_data_headers.csv',
 #dataset_format=DatasetFormat.json(lines=True),
 dataset_format=DatasetFormat.csv(header=True),
 output_s3_uri=baseline_results_uri,
 wait=True
)

Have a look at the outputs!

In [None]:
s3_client = boto3.Session().client('s3')
result = s3_client.list_objects(Bucket=output_bucket, Prefix=baseline_results_prefix)
report_files = [report_file.get("Key") for report_file in result.get('Contents')]
print("Found Files:")
print("\n".join(report_files))

baseline_job = my_default_monitor.latest_baselining_job
schema_df = pd.io.json.json_normalize(baseline_job.baseline_statistics().body_dict["features"])
schema_df

In [None]:
constraints_df = pd.io.json.json_normalize(baseline_job.suggested_constraints().body_dict["features"])
constraints_df

### Create a Monitoring Schedule

ModelMonitor sets up a CRON job to inspect the inference requests being sent to the endpoint for data drift. For this we first need to create a schedule.

In [None]:
code_prefix = '{}/code'.format(prefix)
reports_prefix = '{}/reports'.format(prefix)
s3_report_path = 's3://{}/{}'.format(output_bucket,reports_prefix)
print(s3_report_path)

In [None]:
from sagemaker.model_monitor import CronExpressionGenerator
from time import gmtime, strftime

mon_schedule_name = 'xgb-credit-score-model-monitor-schedule-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
my_default_monitor.create_monitoring_schedule(
 monitor_schedule_name=mon_schedule_name,
 endpoint_input=predictor.endpoint,
 output_s3_uri=s3_report_path,
 statistics=my_default_monitor.baseline_statistics(),
 constraints=my_default_monitor.suggested_constraints(),
 schedule_cron_expression=CronExpressionGenerator.hourly(),
 enable_cloudwatch_metrics=True,

)


### Generate artificial traffic 

For this demo, lets now test the ModelMonitor Capability by altering our input data. Then we will repeatedly invoke our endpoint over and over. Since the Monitoring job runs as a CRON job, it may take up to an hour to see any initial results or violations. But let this keep going for a few hours and you should see some violations appear. 

Let's send our endpoint some "fake" traffic where a few of the column distributions have been drastically altered!

In [None]:
COLS = ['Label', 'LIMIT_BAL', 'SEX', 'EDUCATION', 'MARRIAGE', 'AGE', 'PAY_0',
 'PAY_2', 'PAY_3', 'PAY_4', 'PAY_5', 'PAY_6', 'BILL_AMT1', 'BILL_AMT2',
 'BILL_AMT3', 'BILL_AMT4', 'BILL_AMT5', 'BILL_AMT6', 'PAY_AMT1',
 'PAY_AMT2', 'PAY_AMT3', 'PAY_AMT4', 'PAY_AMT5', 'PAY_AMT6']
sample_data = pd.read_csv(
 'test_data.csv', 
 names = ['Label'] +['PAY_AMT1','BILL_AMT1'] + list(COLS[1:])[:11] + list(COLS[1:])[12:17] + list(COLS[1:])[18:])
sample_data.head(5)

In [None]:
#Store the test data with one column randomly having negative values for EDUCAATION. We will also modify the distribution
# of the LIMIT_BAL
faketestdata = sample_data

balance_mutate = faketestdata.sample(frac=0.5)
balance_mutate['LIMIT_BAL'] = balance_mutate['LIMIT_BAL']/1000
faketestdata.update (balance_mutate, overwrite=True)

education_mutate = faketestdata.sample(frac=0.75)
education_mutate['EDUCATION'] = -education_mutate['EDUCATION']
faketestdata.update (education_mutate)

faketestdata.head(10)


In [None]:
# save the dataset
faketestdata.drop(columns=['Label']).to_csv('test-data-input-cols.csv', index = None, header = None)

In [None]:
runtime_client = boto3.client('runtime.sagemaker')
run_flag = True

# (just repeating code from above for convenience/ able to run this section independently)
def invoke_endpoint(ep_name, file_name, runtime_client):
 with open(file_name, 'r') as f:
 for row in f:
 payload = row.rstrip('\n')
 response = runtime_client.invoke_endpoint(
 EndpointName=ep_name,
 ContentType='text/csv', 
 Body=payload)
 time.sleep(0.1) # try to send all 6000 records in 10 min
 
def invoke_endpoint_forever():
 while run_flag:
 invoke_endpoint(endpoint_name, 'test-data-input-cols.csv', runtime_client)
 
thread = Thread(target = invoke_endpoint_forever)
thread.start()
# Note that you need to stop the kernel to stop the invocations


In [None]:
desc_schedule_result = my_default_monitor.describe_schedule()
print('Schedule status: {}'.format(desc_schedule_result['MonitoringScheduleStatus']))


### List Model Monitor Outputs

It may take a while for anything to show up in your S3 notebook, since ModelMonitoring runs on a schedule.

Let the ModelMonitor service collect data from your endpoint for a couple hours and occasionally run the APIs below. You will see that as the service collects more data, it will find newer violations against the baseline dataset we provided earlier.

In [None]:
mon_executions = my_default_monitor.list_executions()
print("We created a hourly schedule above and it will kick off executions ON the hour (plus 0 - 20 min buffer.\nWe will have to wait till we hit the hour...")

while len(mon_executions) == 0:
 print("Waiting for the 1st execution to happen...")
 time.sleep(600)
 mon_executions = my_default_monitor.list_executions()

print ("{} executions of the monitor have occurred".format (len(mon_executions)))

In [None]:
mon_executions[-1].describe()

### Inspect the latest execution and generate a report.

All the API calls used here can be implemented separately using API Gateway or other tools. ModelMonitor can also be set up to send alerts and notifications through CloudWatch whenever drift is detected.

In [None]:
latest_execution = mon_executions[-1] # latest execution's index is -1, second to last is -2 and so on..
latest_execution.wait(logs=False)

print("Latest execution status: {}".format(latest_execution.describe()['ProcessingJobStatus']))
print("Latest execution result: {}".format(latest_execution.describe()['ExitMessage']))

latest_job = latest_execution.describe()
if (latest_job['ProcessingJobStatus'] != 'Completed'):
 print("====STOP==== \n No completed executions to inspect further. Please wait till an execution completes or investigate previously reported failures.")

In [None]:
report_uri=latest_execution.output.destination
print('Report Uri: {}'.format(report_uri))

In [None]:
from urllib.parse import urlparse
s3uri = urlparse(report_uri)
report_bucket = s3uri.netloc
report_key = s3uri.path.lstrip('/')
print('Report bucket: {}'.format(report_bucket))
print('Report key: {}'.format(report_key))

s3_client = boto3.Session().client('s3')
result = s3_client.list_objects(Bucket=report_bucket, Prefix=report_key)
report_files = [report_file.get("Key") for report_file in result.get('Contents')]
print("Found Report Files:")
print("\n ".join(report_files))

In [None]:
pd.set_option('display.max_colwidth', -1)
violations = my_default_monitor.latest_monitoring_constraint_violations()
constraints_df = pd.io.json.json_normalize(violations.body_dict["violations"])
constraints_df

Note that the model detect a large drift in certain parameters from the baseline, particularly the LIMIT_BAL and EDUCATION parameters which we modified outselves.

This can now be used to trigger a model retraining or CloudWatch Alarms to monitor and inform users when data drift is detected. 

## Part 8. Reproducibility

Finally, we showcase reproducibility by ensuring that the model trained above can be redeployed. This is an essential requirement for many financial services companies who need to track the model lineage to the source code level to ensure that if a particular version of the code was re-run, it would produce the same model with the same outputs.

First we run a simple script which pulls the latest source code version history from AWS CodeCommit.

Next we log this history in SageMaker Experiments using the Tracker feature.

### Commit your Notebook to CodeCommit

Navigate to a Terminal Window in Amazon SageMaker and check in both of our notebooks into the CodeCommit repository you created. See the steps below to check in your code into CodeCommit. To check in the code, run the following code in a Terminal window.


Navigate to your Jupyter environment which contains these notebooks and the code. In the drop down **New**, click on **Terminal**.

In the Terminal window, navigate to the local directory containing your notebooks and run the following cells. 

```bash
cd SageMaker/
git add 02_SageMaker-DevOps-Workflow
git commit -m "Added Model Deployment Notebook"
git push -u origin master
git log --pretty=oneline # you should see two logs for both commits. 
```


Keep track of your CommitIDs. We will use them in the next step. 

**Note**: The function below will only work if you push this notebook to your CodeCommit Workshop repository.

In [None]:
# let's run a script to keep track of commits from Git
def get_codecommit(commit_id):
 codecommitclient = boto3.client('codecommit')
 
 reponame = codecommitclient.list_repositories()['repositories'][0]['repositoryName']
 
 return codecommitclient.get_commit(repositoryName=reponame,
 commitId=commit_id
 )

# Below you will need to navigate to CodeCommit to obtain the corresponding commit IDs if you choose to commit your code. 
# If you only commit your code once, then use the same repo name and CommitIDs for sclineage and endpointlineage.

sclineage = get_codecommit('1796a0a6972c8df97fd2d279557a6f94cfe91eae') # Enter your CommitID here

endpointlineage = get_codecommit('a6df1799849f52295864754a8f1e30e604fefd00') # Enter your CommitID here



In [None]:
# Track the code version and user_id who produced this commit to the source code as well as the deployed endpoint.
with Tracker.create(display_name="source-control-lineage", sagemaker_boto_client=sm) as sourcetracker:
 sourcetracker.log_parameters({
 "commit": sclineage['commit']['commitId'],
 "author":sclineage['commit']['author']['email'],
 "date":sclineage['commit']['author']['date'],
 "message":sclineage['commit']['message'].replace('-', '_').split('\n')[0] 
 })
 

with Tracker.create(display_name="prod-endpoint-lineage", sagemaker_boto_client=sm) as endtracker:
 endtracker.log_parameters({
 "commit": endpointlineage['commit']['commitId'],
 "author":endpointlineage['commit']['author']['email'],
 "date":endpointlineage['commit']['author']['date'],
 "message":endpointlineage['commit']['message'].replace('-', '_').split('\n')[0] 
 })
 endtracker.log_input(name="endpoint-name", value=endpoint_name)
 
cc_trial.add_trial_component(sourcetracker.trial_component)
cc_trial.add_trial_component(endtracker.trial_component)
 
# Present the Model Lineage as a dataframe
from sagemaker.session import Session
sess = boto3.Session()
lineage_table = ExperimentAnalytics(
 sagemaker_session=Session(sess, sm), 
 search_expression={
 "Filters":[{
 "Name": "Parents.TrialName",
 "Operator": "Equals",
 "Value": trial_name
 }]
 },
 sort_by="CreationTime",
 sort_order="Ascending",
)
lineagedf= lineage_table.dataframe()

lineagedf

We can now capture the lineage of the model for Reproducibility

At the **Source Code** level we have captured:
 * Most recent commit
 * user IP
 * the commit ID
 * Timestamp

At the **Preprocessing** stage:
 * Source data location
 * Processed training data location
 * Processed validation data location
 * Processing parameters 

At the **modeling** stage:
 * Docker container registry for algorithm
 * Training job name
 * Training job Hyperparameters
 * Model artifact location

At the **endpoint** stage:
 * Production Endpoint Name
 * Commit Id of Production Deployment pipeline code
 * User IP
 * Timestamp

Although we do not cover this here, an important topic to discuss is versioning your data. Data Versioning Tools such as DVC ( https://dvc.org/) are becoming more and more popular as a way to version your data to ensure that your training jobs, hosted models can be traced back to the correct data version for reproducibility purposes. 

One approach to doing that using **SageMaker Experiments** which you have learned about in this lab is to ensure that your S3 buckets have versioning enabled automatically; this way the different S3 versions of the data will be tracked by SageMaker Experiments. Alternatively, you can enter the VersionId as part of the **Tracker** utility that we used in this lab to keep track of the data version. 

# Part E: Conclusion of this part

This concludes our demo of building a secure data science workflow within SageMaker. The key features we demonstrated in this notebook include:

1/ **DevOps Manual Deployment Workflow:** Here we demonstrated how to securely deploy a trained model using the same network configurations as the data scientist portion of the workflow. 


2/ **Auditability and Reproducibility:** We demonstrated use of SageMaker Experiments for Model Auditability, how to track the lineage of the processing jobs and model artifacts and hyperparameters as well as the CodeCommit Id and pull request that tracks the latest code changes. 

3/ **Monitoring Data drift:** Finally we also showed how to monitor your models in production for data/concept drift using Model Monitoring.


# Part F: Delete Underlying Resources and Monitoring Jobs

**Best Practice** once you are done monitoring your jobs, be sure to delete the endpoint to avoid incurring costs. 

In [None]:
my_default_monitor.delete_monitoring_schedule()
time.sleep(60) # actually wait for the deletion

In [None]:
sm.delete_endpoint(training_job_name)

The information included in this notebook is for illustrative purposes only. Nothing in this notebook is intended to provide you legal, compliance, or regulatory guidance. You should review the laws that apply to you.