In [None]:
import boto3
import sagemaker
import json
import os
from utils import get_aws_profile_name, get_aws_iam_role
from datetime import datetime 

LOCAL_EXECUTION = True

if LOCAL_EXECUTION:
 sess = boto3.Session(profile_name=get_aws_profile_name())
 sm = sess.client("sagemaker")
 iam = sess.client('iam')
 role = iam.get_role(RoleName=get_aws_iam_role())['Role']['Arn']
else:
 sess = boto3.Session()
 sm = sess.client("sagemaker")
 role = sagemaker.get_execution_role()

sagemaker_session = sagemaker.Session(boto_session=sess)
bucket = sagemaker_session.default_bucket()
prefix = "model-monitor-bring-your-own-model/"
region = sess.region_name

output_uri = "s3://{}/{}{}".format(bucket, prefix, "results-model-quality-"+datetime.now().strftime("%Y-%m-%d-%H-%M-%S"))

In [None]:
from sagemaker.processing import (
 ProcessingInput,
 ProcessingOutput,
 Processor,
)
from sagemaker.model_monitor.dataset_format import DatasetFormat
from utils import get_baseline_uri, get_dataset_uri


model_monitor_container_uri = sagemaker.image_uris.retrieve(
 framework="model-monitor",
 region=region,
 version="latest",
 )

 # Create the baseline job using
dataset_format = DatasetFormat.csv()

env = {
 "dataset_format": json.dumps(dataset_format),
 "dataset_source": "/opt/ml/processing/input/baseline_dataset_input",
 "output_path": "/opt/ml/processing/output",
 "publish_cloudwatch_metrics": "Disabled",
 "analysis_type":"MODEL_QUALITY",
 "problem_type":'BinaryClassification',
 "inference_attribute": "prediction", # The column in the dataset that contains predictions.
 "probability_attribute": "prediction_probability", # The column in the dataset that contains probabilities.
 "ground_truth_attribute": "credit_risk",
 "baseline_constraints": "/opt/ml/processing/baseline/constraints/constraints.json",
 "baseline_statistics": "/opt/ml/processing/baseline/stats/statistics.json",
}

In [None]:
from sagemaker.processing import (
 ProcessingInput,
 ProcessingOutput,
 Processor,
)

mode_quality_monitor_analyzer = Processor(
 image_uri=model_monitor_container_uri,
 role=role,
 instance_count=1,
 instance_type='ml.m5.xlarge',
 base_job_name=f"model-monitor-byom",
 sagemaker_session=sagemaker_session,
 max_runtime_in_seconds=1800,
 env=env,
)

mode_quality_monitor_analyzer.run(
 inputs=[ProcessingInput(
 source=get_dataset_uri('model-quality-modified-data'),
 destination="/opt/ml/processing/input/baseline_dataset_input",
 input_name="baseline_dataset_input",),
 ProcessingInput(
 source=get_baseline_uri('model-quality-constraints'),
 destination="/opt/ml/processing/baseline/constraints",
 input_name="constraints",
 ),
 ProcessingInput(
 source=get_baseline_uri('model-quality-statistics'),
 destination="/opt/ml/processing/baseline/stats",
 input_name="baseline",
 ),
 ],
 outputs=[
 ProcessingOutput(
 source="/opt/ml/processing/output",
 output_name="monitoring_output",
 destination=output_uri,
 )
 ],
)

In [None]:
from utils import save_baseline
violations_uri = mode_quality_monitor_analyzer.latest_job.outputs[0].destination + '/constraint_violations.json'
save_baseline('model-quality-violoations', violations_uri)

In [None]:
try:
 print(json.loads(sagemaker.s3.S3Downloader().read_file(violations_uri, sagemaker_session=sagemaker_session)))
except ClientError as ex:
 if ex.response['Error']['Code'] == 'NoSuchKey':
 print("No violation file found")