{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Model Deployment and Monitoring\n", "\n", "In this notebook, you will see how to manually deploy a DevOps workflow, taking the model you trained in the previous notebook, deploying it into production, and monitoring the model endpoint. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Start this notebook by first deploying a model into production.\n", "\n", "Follow the steps below to manually deploy the most recent training job and set up the endpoint with data capture enabled.\n", "\n", "For this notebook, you will use the public [Credit Card default dataset](https://archive.ics.uci.edu/ml/datasets/default+of+credit+card+clients) downloaded from UCI. The data set was originally presented as part of the paper cited below.\n", "\n", " Yeh, I. C., & Lien, C. H. (2009). The comparisons of data mining techniques for the predictive accuracy of probability of default of credit card clients. Expert Systems with Applications, 36(2), 2473-2480.\n", "\n", "Since this notebook is not connected to the internet the dataset has been provided, locally, to this notebook for you." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Load necessary libraries" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Let's inspect the role we have created for our notebook here:\n", "import boto3\n", "import json\n", "import numpy as np\n", "import pandas as pd\n", "import os\n", "import sagemaker\n", "from sagemaker import get_execution_role\n", "from time import sleep, gmtime, strftime\n", "import time\n", "from threading import Thread\n", "\n", "role = get_execution_role()\n", "sess = sagemaker.Session()\n", "region = boto3.session.Session().region_name\n", "sm = boto3.Session().client('sagemaker')\n", "print (\"Executing in region {} with role {}\".format (region, role))\n", "\n", "# retrieve stored variables from previous notebook\n", "%store -r trial_name \n", "%store -r experiment_name \n", "%store -r training_job_name" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Import SageMaker Experiments \n", "from sagemaker.analytics import ExperimentAnalytics\n", "from smexperiments.experiment import Experiment\n", "from smexperiments.trial import Trial\n", "from smexperiments.trial_component import TrialComponent\n", "from smexperiments.tracker import Tracker" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# let's load the trial from the previous session\n", "cc_trial = Trial.load(sagemaker_boto_client=sm, trial_name=trial_name)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Import Model Monitor API\n", "from sagemaker.model_monitor import DataCaptureConfig\n", "from sagemaker import RealTimePredictor\n", "from sagemaker.predictor import csv_serializer" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create Networking configuration required for all APIs\n", "from sagemaker.network import NetworkConfig\n", "import sagemaker_environment as smenv\n", "\n", "cmk_id = smenv.SAGEMAKER_KMS_KEY_ID \n", "sec_groups = smenv.SAGEMAKER_SECURITY_GROUPS\n", "subnets = smenv.SAGEMAKER_SUBNETS\n", "network_config = NetworkConfig(security_group_ids=sec_groups, subnets =subnets)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# You have already created buckets as part of the Secure Data Science Workshop. Create references to these buckets for later use.\n", "\n", "# raw_bucket: stores raw data and any preprocessing job related code.\n", "# data_bucket: stores train/test data for training/validating ML models.\n", "# output_bucket: where the model artifacts and outputs will be stored.\n", "# For this demo, these buckets are the same, but as best practice, you probably want to keep them separate with different permissions.\n", "\n", "raw_bucket = smenv.SAGEMAKER_DATA_BUCKET #alternatively you can replace with your own buckets\n", "data_bucket = smenv.SAGEMAKER_DATA_BUCKET # alternatively you can replace with your own buckets\n", "output_bucket = smenv.SAGEMAKER_MODEL_BUCKET # alternatively you can replace with your own buckets\n", "prefix = 'secure-sagemaker-demo'\n", "print(\"Data bucket is s3://{}\".format (data_bucket))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Setup an endpoint with Data Capture enabled\n", "\n", "### Load Trained Model\n", "\n", "First we setup a live endpoint to capture inference requests" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sm_client = boto3.client('sagemaker')\n", "\n", "latest_training_job = sm_client.list_training_jobs(\n", " MaxResults=1,\n", " SortBy='CreationTime',\n", " SortOrder='Descending')\n", "\n", "training_job_name = TrainingJobName = latest_training_job['TrainingJobSummaries'][0]['TrainingJobName']\n", "\n", "training_job_description = sm_client.describe_training_job(TrainingJobName=training_job_name)\n", "\n", "model_data = training_job_description['ModelArtifacts']['S3ModelArtifacts']\n", "container_uri = training_job_description['AlgorithmSpecification']['TrainingImage']" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To deploy the model, you need to provide a security group and a subnet to deploy the endpoint into your VPC." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# create a model.\n", "def create_model(role, model_name, container_uri, model_data):\n", " return sm_client.create_model(\n", " ModelName=model_name,\n", " PrimaryContainer={\n", " 'Image': container_uri,\n", " 'ModelDataUrl': model_data,\n", " },\n", " VpcConfig={\n", " 'SecurityGroupIds': sec_groups,\n", " 'Subnets': subnets\n", " }, \n", " ExecutionRoleArn=role,\n", " EnableNetworkIsolation=False \n", " )\n", " \n", "model_name = \"{}-model\".format (training_job_name)\n", "try:\n", " model = create_model(role, model_name, container_uri, model_data)\n", "except Exception as e:\n", " sm_client.delete_model(ModelName=model_name)\n", " model = create_model(role, model_name, container_uri, model_data)\n", " \n", " \n", "print('Model created: '+model['ModelArn'])\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Specify a Capture Config to capture a percentage of the incoming requests being served by the endpoint. Here you set the capture percentage to `100` to capture all traffic." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "s3_capture_upload_path = 's3://{}/{}/monitoring/datacapture'.format(data_bucket, prefix)\n", "data_capture_configuration = {\n", " \"EnableCapture\": True,\n", " \"InitialSamplingPercentage\": 100,\n", " \"DestinationS3Uri\": s3_capture_upload_path,\n", " \"CaptureOptions\": [\n", " { \"CaptureMode\": \"Output\" },\n", " { \"CaptureMode\": \"Input\" }\n", " ],\n", " \"CaptureContentTypeHeader\": {\n", " \"CsvContentTypes\": [\"text/csv\"],\n", " \"JsonContentTypes\": [\"application/json\"]\n", " }\n", "}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def create_endpoint_config(endpoint_name, model_name, data_capture_config): \n", " return sm_client.create_endpoint_config(\n", " EndpointConfigName=\"{}-config\".format (endpoint_name),\n", " ProductionVariants=[\n", " {\n", " 'VariantName': 'AllTraffic',\n", " 'ModelName': model_name,\n", " 'InitialInstanceCount': 1,\n", " 'InstanceType': 'ml.m5.xlarge',\n", " 'InitialVariantWeight': 1.0,\n", " },\n", " ],\n", " DataCaptureConfig=data_capture_config)\n", "\n", "\n", "endpoint_name = \"{}-endpoint\".format (training_job_name)\n", "try:\n", " endpoint_config = create_endpoint_config(endpoint_name, model_name, data_capture_configuration)\n", "except Exception as e:\n", " sm_client.delete_endpoint_config(EndpointConfigName=endpoint_name+'-config')\n", " endpoint_config = create_endpoint_config(endpoint_name, model_name, data_capture_configuration)\n", "\n", "print('Endpoint configuration created: '+ endpoint_config['EndpointConfigArn'])\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def create_endpoint(endpoint_name, config_name):\n", " return sm_client.create_endpoint(\n", " EndpointName=endpoint_name,\n", " EndpointConfigName=config_name)\n", "\n", "\n", "try:\n", " endpoint = create_endpoint(endpoint_name, endpoint_name+'-config')\n", "except Exception as e:\n", " sm_client.delete_endpoint(EndpointName=endpoint_name)\n", " endpoint = create_endpoint(endpoint_name, endpoint_name+'-config')\n", "\n", "print('Endpoint created: '+ endpoint['EndpointArn'])\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "** WAIT **\n", "Even though it says that the endpoint has been created, it may still be in the \"Creating\" stage as it takes some time to set up an HTTPs endpoint behind the scenes. Run this command below and wait for it to get to the **CREATED** status before proceeding" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from time import sleep\n", "\n", "status = sm_client.describe_endpoint(EndpointName=endpoint_name)['EndpointStatus']\n", "print(status)\n", "while status == 'Creating': \n", " sleep (60)\n", " status = sm_client.describe_endpoint(EndpointName=endpoint_name)['EndpointStatus']\n", " print (status)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Test the endpoint\n", "\n", "Let's throw some payload at this endpoint and make some predictions" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "predictor = RealTimePredictor(endpoint_name, content_type = 'text/csv')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!head -10 test_data.csv > test_sample.csv" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "with open('test_sample.csv', 'r') as f:\n", " for row in f:\n", " payload = row.rstrip('\\n')\n", " response = predictor.predict(data=payload[2:])\n", " print (response)\n", " sleep(0.5)\n", "print('done!')\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Extract the captured json files.\n", "data_capture_prefix = '{}/monitoring/datacapture'.format(prefix)\n", "s3_client = boto3.Session().client('s3')\n", "current_endpoint_capture_prefix = '{}/{}/AllTraffic'.format(data_capture_prefix, endpoint_name)\n", "result = s3_client.list_objects(Bucket=data_bucket, Prefix=current_endpoint_capture_prefix)\n", "if 'Contents' not in result:\n", " print (\"No capture files present yet.\")\n", "else:\n", " capture_files = [capture_file.get(\"Key\") for capture_file in result.get('Contents')]\n", " print(\"Found {} Capture Files:\".format (len(capture_files)))\n", " for capture_file in capture_files[-5:]:\n", " print (\"s3://{}\".format (capture_file))\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# View contents of the captured file.\n", "def get_obj_body(bucket, obj_key):\n", " return s3_client.get_object(Bucket=data_bucket, Key=obj_key).get('Body').read().decode(\"utf-8\")\n", "\n", "capture_file = get_obj_body(data_bucket, capture_files[-1])\n", "print(json.dumps(json.loads(capture_file.split('\\n')[5]), indent = 2, sort_keys =True))\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Part 7: Real time Model monitoring\n", "\n", "Now we set up ModelMonitoring in Real Time\n", "\n", "Copy over the training dataset to Amazon S3 (if you already have it in Amazon S3, you could reuse it).\n", "Everything is logged in a separate bucket to provide flexibility and security for teams who require different team members to have different levels of permissions. \n", "\n", "Use the model bucket to capture API calls to monitored models and also to log monitoring calls made for a given model." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model_prefix = prefix + \"/\" + model_name\n", "baseline_prefix = model_prefix + '/baselining'\n", "baseline_data_prefix = baseline_prefix + '/data'\n", "baseline_results_prefix = baseline_prefix + '/results'\n", "\n", "baseline_data_uri = 's3://{}/{}'.format(output_bucket,baseline_data_prefix)\n", "baseline_results_uri = 's3://{}/{}'.format(output_bucket, baseline_results_prefix)\n", "print('Baseline data uri: {}'.format(baseline_data_uri))\n", "print('Baseline results uri: {}'.format(baseline_results_uri))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "train_data_header_location = \"s3://\" + data_bucket + '/' + prefix + '/train_headers'\n", "print(train_data_header_location)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Start a baselining job\n", "\n", "To Monitor data drift, you first need a baseline to monitor against. To create this baseline have the Model Monitor service extract baseline statistics from your training dataset. \n", "\n", "All the outputs generated by the Monitoring Service will be stored in the output_bucket." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "from sagemaker.model_monitor import DefaultModelMonitor\n", "from sagemaker.model_monitor.dataset_format import DatasetFormat\n", "\n", "my_default_monitor = DefaultModelMonitor(\n", " role=role,\n", " instance_count=1,\n", " instance_type='ml.m5.xlarge',\n", " volume_size_in_gb=20,\n", " max_runtime_in_seconds=3600,\n", " network_config=network_config,\n", " volume_kms_key=cmk_id\n", ")\n", "\n", "my_default_monitor.suggest_baseline(\n", " baseline_dataset=train_data_header_location +'/train_data_headers.csv',\n", " #dataset_format=DatasetFormat.json(lines=True),\n", " dataset_format=DatasetFormat.csv(header=True),\n", " output_s3_uri=baseline_results_uri,\n", " wait=True\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Have a look at the outputs!" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "s3_client = boto3.Session().client('s3')\n", "result = s3_client.list_objects(Bucket=output_bucket, Prefix=baseline_results_prefix)\n", "report_files = [report_file.get(\"Key\") for report_file in result.get('Contents')]\n", "print(\"Found Files:\")\n", "print(\"\\n\".join(report_files))\n", "\n", "baseline_job = my_default_monitor.latest_baselining_job\n", "schema_df = pd.io.json.json_normalize(baseline_job.baseline_statistics().body_dict[\"features\"])\n", "schema_df" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "constraints_df = pd.io.json.json_normalize(baseline_job.suggested_constraints().body_dict[\"features\"])\n", "constraints_df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create a Monitoring Schedule\n", "\n", "ModelMonitor sets up a CRON job to inspect the inference requests being sent to the endpoint for data drift. For this we first need to create a schedule." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "code_prefix = '{}/code'.format(prefix)\n", "reports_prefix = '{}/reports'.format(prefix)\n", "s3_report_path = 's3://{}/{}'.format(output_bucket,reports_prefix)\n", "print(s3_report_path)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.model_monitor import CronExpressionGenerator\n", "from time import gmtime, strftime\n", "\n", "mon_schedule_name = 'xgb-credit-score-model-monitor-schedule-' + strftime(\"%Y-%m-%d-%H-%M-%S\", gmtime())\n", "my_default_monitor.create_monitoring_schedule(\n", " monitor_schedule_name=mon_schedule_name,\n", " endpoint_input=predictor.endpoint,\n", " output_s3_uri=s3_report_path,\n", " statistics=my_default_monitor.baseline_statistics(),\n", " constraints=my_default_monitor.suggested_constraints(),\n", " schedule_cron_expression=CronExpressionGenerator.hourly(),\n", " enable_cloudwatch_metrics=True,\n", "\n", ")\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Generate artificial traffic \n", "\n", "For this demo, lets now test the ModelMonitor Capability by altering our input data. Then we will repeatedly invoke our endpoint over and over. Since the Monitoring job runs as a CRON job, it may take up to an hour to see any initial results or violations. But let this keep going for a few hours and you should see some violations appear. \n", "\n", "Let's send our endpoint some \"fake\" traffic where a few of the column distributions have been drastically altered!" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "COLS = ['Label', 'LIMIT_BAL', 'SEX', 'EDUCATION', 'MARRIAGE', 'AGE', 'PAY_0',\n", " 'PAY_2', 'PAY_3', 'PAY_4', 'PAY_5', 'PAY_6', 'BILL_AMT1', 'BILL_AMT2',\n", " 'BILL_AMT3', 'BILL_AMT4', 'BILL_AMT5', 'BILL_AMT6', 'PAY_AMT1',\n", " 'PAY_AMT2', 'PAY_AMT3', 'PAY_AMT4', 'PAY_AMT5', 'PAY_AMT6']\n", "sample_data = pd.read_csv(\n", " 'test_data.csv', \n", " names = ['Label'] +['PAY_AMT1','BILL_AMT1'] + list(COLS[1:])[:11] + list(COLS[1:])[12:17] + list(COLS[1:])[18:])\n", "sample_data.head(5)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Store the test data with one column randomly having negative values for EDUCAATION. We will also modify the distribution\n", "# of the LIMIT_BAL\n", "faketestdata = sample_data\n", "\n", "balance_mutate = faketestdata.sample(frac=0.5)\n", "balance_mutate['LIMIT_BAL'] = balance_mutate['LIMIT_BAL']/1000\n", "faketestdata.update (balance_mutate, overwrite=True)\n", "\n", "education_mutate = faketestdata.sample(frac=0.75)\n", "education_mutate['EDUCATION'] = -education_mutate['EDUCATION']\n", "faketestdata.update (education_mutate)\n", "\n", "faketestdata.head(10)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# save the dataset\n", "faketestdata.drop(columns=['Label']).to_csv('test-data-input-cols.csv', index = None, header = None)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "runtime_client = boto3.client('runtime.sagemaker')\n", "run_flag = True\n", "\n", "# (just repeating code from above for convenience/ able to run this section independently)\n", "def invoke_endpoint(ep_name, file_name, runtime_client):\n", " with open(file_name, 'r') as f:\n", " for row in f:\n", " payload = row.rstrip('\\n')\n", " response = runtime_client.invoke_endpoint(\n", " EndpointName=ep_name,\n", " ContentType='text/csv', \n", " Body=payload)\n", " time.sleep(0.1) # try to send all 6000 records in 10 min\n", " \n", "def invoke_endpoint_forever():\n", " while run_flag:\n", " invoke_endpoint(endpoint_name, 'test-data-input-cols.csv', runtime_client)\n", " \n", "thread = Thread(target = invoke_endpoint_forever)\n", "thread.start()\n", "# Note that you need to stop the kernel to stop the invocations\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "desc_schedule_result = my_default_monitor.describe_schedule()\n", "print('Schedule status: {}'.format(desc_schedule_result['MonitoringScheduleStatus']))\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### List Model Monitor Outputs\n", "\n", "It may take a while for anything to show up in your S3 notebook, since ModelMonitoring runs on a schedule.\n", "\n", "Let the ModelMonitor service collect data from your endpoint for a couple hours and occasionally run the APIs below. You will see that as the service collects more data, it will find newer violations against the baseline dataset we provided earlier." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "mon_executions = my_default_monitor.list_executions()\n", "print(\"We created a hourly schedule above and it will kick off executions ON the hour (plus 0 - 20 min buffer.\\nWe will have to wait till we hit the hour...\")\n", "\n", "while len(mon_executions) == 0:\n", " print(\"Waiting for the 1st execution to happen...\")\n", " time.sleep(600)\n", " mon_executions = my_default_monitor.list_executions()\n", "\n", "print (\"{} executions of the monitor have occurred\".format (len(mon_executions)))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "mon_executions[-1].describe()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Inspect the latest execution and generate a report.\n", "\n", "All the API calls used here can be implemented separately using API Gateway or other tools. ModelMonitor can also be set up to send alerts and notifications through CloudWatch whenever drift is detected." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "latest_execution = mon_executions[-1] # latest execution's index is -1, second to last is -2 and so on..\n", "latest_execution.wait(logs=False)\n", "\n", "print(\"Latest execution status: {}\".format(latest_execution.describe()['ProcessingJobStatus']))\n", "print(\"Latest execution result: {}\".format(latest_execution.describe()['ExitMessage']))\n", "\n", "latest_job = latest_execution.describe()\n", "if (latest_job['ProcessingJobStatus'] != 'Completed'):\n", " print(\"====STOP==== \\n No completed executions to inspect further. Please wait till an execution completes or investigate previously reported failures.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "report_uri=latest_execution.output.destination\n", "print('Report Uri: {}'.format(report_uri))" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "from urllib.parse import urlparse\n", "s3uri = urlparse(report_uri)\n", "report_bucket = s3uri.netloc\n", "report_key = s3uri.path.lstrip('/')\n", "print('Report bucket: {}'.format(report_bucket))\n", "print('Report key: {}'.format(report_key))\n", "\n", "s3_client = boto3.Session().client('s3')\n", "result = s3_client.list_objects(Bucket=report_bucket, Prefix=report_key)\n", "report_files = [report_file.get(\"Key\") for report_file in result.get('Contents')]\n", "print(\"Found Report Files:\")\n", "print(\"\\n \".join(report_files))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pd.set_option('display.max_colwidth', -1)\n", "violations = my_default_monitor.latest_monitoring_constraint_violations()\n", "constraints_df = pd.io.json.json_normalize(violations.body_dict[\"violations\"])\n", "constraints_df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note that the model detect a large drift in certain parameters from the baseline, particularly the LIMIT_BAL and EDUCATION parameters which we modified outselves.\n", "\n", "This can now be used to trigger a model retraining or CloudWatch Alarms to monitor and inform users when data drift is detected. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Part 8. Reproducibility\n", "\n", "Finally, we showcase reproducibility by ensuring that the model trained above can be redeployed. This is an essential requirement for many financial services companies who need to track the model lineage to the source code level to ensure that if a particular version of the code was re-run, it would produce the same model with the same outputs.\n", "\n", "First we run a simple script which pulls the latest source code version history from AWS CodeCommit.\n", "\n", "Next we log this history in SageMaker Experiments using the Tracker feature." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Commit your Notebook to CodeCommit\n", "\n", "Navigate to a Terminal Window in Amazon SageMaker and check in both of our notebooks into the CodeCommit repository you created. See the steps below to check in your code into CodeCommit. To check in the code, run the following code in a Terminal window.\n", "\n", "\n", "Navigate to your Jupyter environment which contains these notebooks and the code. In the drop down **New**, click on **Terminal**.\n", "\n", "In the Terminal window, navigate to the local directory containing your notebooks and run the following cells. \n", "\n", "```bash\n", "cd SageMaker/\n", "git add 02_SageMaker-DevOps-Workflow\n", "git commit -m \"Added Model Deployment Notebook\"\n", "git push -u origin master\n", "git log --pretty=oneline # you should see two logs for both commits. \n", "```\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Keep track of your CommitIDs. We will use them in the next step. \n", "\n", "**Note**: The function below will only work if you push this notebook to your CodeCommit Workshop repository." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# let's run a script to keep track of commits from Git\n", "def get_codecommit(commit_id):\n", " codecommitclient = boto3.client('codecommit')\n", " \n", " reponame = codecommitclient.list_repositories()['repositories'][0]['repositoryName']\n", " \n", " return codecommitclient.get_commit(repositoryName=reponame,\n", " commitId=commit_id\n", " )\n", "\n", "# Below you will need to navigate to CodeCommit to obtain the corresponding commit IDs if you choose to commit your code. \n", "# If you only commit your code once, then use the same repo name and CommitIDs for sclineage and endpointlineage.\n", "\n", "sclineage = get_codecommit('1796a0a6972c8df97fd2d279557a6f94cfe91eae') # Enter your CommitID here\n", "\n", "endpointlineage = get_codecommit('a6df1799849f52295864754a8f1e30e604fefd00') # Enter your CommitID here\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Track the code version and user_id who produced this commit to the source code as well as the deployed endpoint.\n", "with Tracker.create(display_name=\"source-control-lineage\", sagemaker_boto_client=sm) as sourcetracker:\n", " sourcetracker.log_parameters({\n", " \"commit\": sclineage['commit']['commitId'],\n", " \"author\":sclineage['commit']['author']['email'],\n", " \"date\":sclineage['commit']['author']['date'],\n", " \"message\":sclineage['commit']['message'].replace('-', '_').split('\\n')[0] \n", " })\n", " \n", "\n", "with Tracker.create(display_name=\"prod-endpoint-lineage\", sagemaker_boto_client=sm) as endtracker:\n", " endtracker.log_parameters({\n", " \"commit\": endpointlineage['commit']['commitId'],\n", " \"author\":endpointlineage['commit']['author']['email'],\n", " \"date\":endpointlineage['commit']['author']['date'],\n", " \"message\":endpointlineage['commit']['message'].replace('-', '_').split('\\n')[0] \n", " })\n", " endtracker.log_input(name=\"endpoint-name\", value=endpoint_name)\n", " \n", "cc_trial.add_trial_component(sourcetracker.trial_component)\n", "cc_trial.add_trial_component(endtracker.trial_component)\n", " \n", "# Present the Model Lineage as a dataframe\n", "from sagemaker.session import Session\n", "sess = boto3.Session()\n", "lineage_table = ExperimentAnalytics(\n", " sagemaker_session=Session(sess, sm), \n", " search_expression={\n", " \"Filters\":[{\n", " \"Name\": \"Parents.TrialName\",\n", " \"Operator\": \"Equals\",\n", " \"Value\": trial_name\n", " }]\n", " },\n", " sort_by=\"CreationTime\",\n", " sort_order=\"Ascending\",\n", ")\n", "lineagedf= lineage_table.dataframe()\n", "\n", "lineagedf" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can now capture the lineage of the model for Reproducibility\n", "\n", "At the **Source Code** level we have captured:\n", " * Most recent commit\n", " * user IP\n", " * the commit ID\n", " * Timestamp\n", "\n", "At the **Preprocessing** stage:\n", " * Source data location\n", " * Processed training data location\n", " * Processed validation data location\n", " * Processing parameters \n", "\n", "At the **modeling** stage:\n", " * Docker container registry for algorithm\n", " * Training job name\n", " * Training job Hyperparameters\n", " * Model artifact location\n", "\n", "At the **endpoint** stage:\n", " * Production Endpoint Name\n", " * Commit Id of Production Deployment pipeline code\n", " * User IP\n", " * Timestamp\n", "\n", "Although we do not cover this here, an important topic to discuss is versioning your data. Data Versioning Tools such as DVC ( https://dvc.org/) are becoming more and more popular as a way to version your data to ensure that your training jobs, hosted models can be traced back to the correct data version for reproducibility purposes. \n", "\n", "One approach to doing that using **SageMaker Experiments** which you have learned about in this lab is to ensure that your S3 buckets have versioning enabled automatically; this way the different S3 versions of the data will be tracked by SageMaker Experiments. Alternatively, you can enter the VersionId as part of the **Tracker** utility that we used in this lab to keep track of the data version. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Part E: Conclusion of this part" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This concludes our demo of building a secure data science workflow within SageMaker. The key features we demonstrated in this notebook include:\n", "\n", "1/ **DevOps Manual Deployment Workflow:** Here we demonstrated how to securely deploy a trained model using the same network configurations as the data scientist portion of the workflow. \n", "\n", "\n", "2/ **Auditability and Reproducibility:** We demonstrated use of SageMaker Experiments for Model Auditability, how to track the lineage of the processing jobs and model artifacts and hyperparameters as well as the CodeCommit Id and pull request that tracks the latest code changes. \n", "\n", "3/ **Monitoring Data drift:** Finally we also showed how to monitor your models in production for data/concept drift using Model Monitoring.\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Part F: Delete Underlying Resources and Monitoring Jobs\n", "\n", "**Best Practice** once you are done monitoring your jobs, be sure to delete the endpoint to avoid incurring costs. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "my_default_monitor.delete_monitoring_schedule()\n", "time.sleep(60) # actually wait for the deletion" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sm.delete_endpoint(training_job_name)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The information included in this notebook is for illustrative purposes only. Nothing in this notebook is intended to provide you legal, compliance, or regulatory guidance. You should review the laws that apply to you." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "conda_python3", "language": "python", "name": "conda_python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.5" } }, "nbformat": 4, "nbformat_minor": 4 }