# Amazon SageMaker Model Monitor

본 노트북은 [링크로 연결된 SageMaker Model Monitor 예제](https://sagemaker-examples.readthedocs.io/en/latest/sagemaker_model_monitor/introduction/SageMaker-ModelMonitoring.html)의 한글번역이며 아래 사항을 다루고 있습니다.:
* Amazon SageMaker에서 기계 학습 모델을 호스팅하고 추론 요청, 결과 및 메타데이터 캡처하는 방법
* 훈련 단계에서의 데이터 세트를 분석하여 데이터에 대한 basline 제약 조건 생성
* 호스팅되는 SageMaker 엔드포인트로부터 baseline 제약 조건을 위반하는 추론데이터 라이브 모니터링 
     
---
## 배경

Amazon SageMaker는 모든 개발자와 데이터 과학자에게 기계 학습 모델을 빠르게 구축, 교육 및 배포할 수 있는 기능을 제공합니다. Amazon SageMaker는 전체 기계 학습 워크플로를 포괄하는 완전 관리형 서비스입니다. 데이터에 레이블을 지정하고 준비하고, 알고리즘을 선택하고, 모델을 교육한 다음 배포를 위해 조정 및 최적화할 수 있습니다. Amazon SageMaker를 사용하여 모델을 프로덕션에 배포하여 이전보다 예측하고 비용을 절감할 수 있습니다.

또한 Amazon SageMaker를 사용하면 배포하는 모델 호출에 대한 입력, 출력 및 메타데이터를 캡처할 수 있습니다. 또한 데이터를 분석하고 품질을 모니터링할 수 있습니다. 이 노트북에서는 Amazon SageMaker가 이러한 기능을 활성화하는 방법을 살펴봅니다.

---
## 시작하기

시작시 아래 조건이 만족하는지 확인하십시오.
* 모델을 호스팅할 AWS 리전을 선택합니다.
* Amazon SageMaker에서 Amazon Simple Storage Service(Amazon S3)의 데이터에 대한 액세스 권한을 가지는지 확인합니다. 
* 모델 훈련에 사용되는 데이터, 추가 모델 데이터 및 모델 호출에서 캡처한 데이터를 저장하기 위한 S3 버킷을 생성합니다. 본 예제는 데모를 위해 동일한 버킷을 사용하고 있지만 실제로는 서로 다른 보안 정책으로 이들을 분리할 수 있습니다.


In [2]:
%%time

# Handful of configuration

import os
import boto3
import re
import json
from sagemaker import get_execution_role, session

region = boto3.Session().region_name

role = get_execution_role()
print("RoleArn: {}".format(role))

# You can use a different bucket, but make sure the role you chose for this notebook
# has the s3:PutObject permissions. This is the bucket into which the data is captured
bucket = session.Session(boto3.Session()).default_bucket()
print("Demo Bucket: {}".format(bucket))
prefix = "sagemaker/DEMO-ModelMonitor"

data_capture_prefix = "{}/datacapture".format(prefix)
s3_capture_upload_path = "s3://{}/{}".format(bucket, data_capture_prefix)
reports_prefix = "{}/reports".format(prefix)
s3_report_path = "s3://{}/{}".format(bucket, reports_prefix)
code_prefix = "{}/code".format(prefix)
s3_code_preprocessor_uri = "s3://{}/{}/{}".format(bucket, code_prefix, "preprocessor.py")
s3_code_postprocessor_uri = "s3://{}/{}/{}".format(bucket, code_prefix, "postprocessor.py")

print("Capture path: {}".format(s3_capture_upload_path))
print("Report path: {}".format(s3_report_path))
print("Preproc Code path: {}".format(s3_code_preprocessor_uri))
print("Postproc Code path: {}".format(s3_code_postprocessor_uri))

RoleArn: arn:aws:iam::095389425207:role/SC-095389425207-pp-ppbl7kvl-SageMakerExecutionRole-O99WVTKX81MU
Demo Bucket: sagemaker-ap-northeast-2-095389425207
Capture path: s3://sagemaker-ap-northeast-2-095389425207/sagemaker/DEMO-ModelMonitor/datacapture
Report path: s3://sagemaker-ap-northeast-2-095389425207/sagemaker/DEMO-ModelMonitor/reports
Preproc Code path: s3://sagemaker-ap-northeast-2-095389425207/sagemaker/DEMO-ModelMonitor/code/preprocessor.py
Postproc Code path: s3://sagemaker-ap-northeast-2-095389425207/sagemaker/DEMO-ModelMonitor/code/postprocessor.py
CPU times: user 728 ms, sys: 118 ms, total: 845 ms
Wall time: 1.42 s


이 노트북이 이후 단계의 작업을 진행하는 데 필요한 권한이 있는지 빠르게 확인할 수 있습니다. 위에서 지정한 S3 버킷에 간단한 테스트 객체를 넣어봅니다. 이 명령이 실패하면 버킷에 대한 `s3:PutObject` 권한을 갖도록 IAM 역할을 업데이트하고 다시 시도하십시오.

In [3]:
# Upload some test files
boto3.Session().resource("s3").Bucket(bucket).Object("test_upload/test.txt").upload_file(
    "test_data/upload-test-file.txt"
)
print("Success! You are all set to proceed.")

Success! You are all set to proceed.


# PART A: SageMaker 엔드포인트로부터 실시간 추론 데이터 캡처하기 
SageMaker 엔드포인트에 데이터 캡처 기능을 추가하고 이를 모니터링해 보겠습니다. 

### 사전 훈련된 모델을 Amazon S3에 업로드 
이 코드는 이미 학습이 완료된 XGBoost 모델을 S3에 업로드한 후 엔드포인트로 배포할 것입니다. 이 모델은 SageMaker의 XGB Churn Prediction Notebook을 사용하여 학습되었습니다. Amazon S3에 이미 사전 훈련된 모델이 있다면 해당 모델을 사용할 수도 있습니다. 이 경우 s3_key를 지정하여 해당 모델로 교체합니다.


In [4]:
model_file = open("model/xgb-churn-prediction-model.tar.gz", "rb")
s3_key = os.path.join(prefix, "xgb-churn-prediction-model.tar.gz")
boto3.Session().resource("s3").Bucket(bucket).Object(s3_key).upload_fileobj(model_file)

### 모델을 Amazon SageMaker로 배포하기

먼저 SageMaker Endpoint를 만들겠습니다.
Churn Prediction 모델을 가리키는 url과 XGBoost 이미지를 이용하여 모델 객체를 만듭니다. 


In [5]:
from time import gmtime, strftime
from sagemaker.model import Model
from sagemaker.image_uris import retrieve

model_name = "DEMO-xgb-churn-pred-model-monitor-" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
model_url = "https://{}.s3-{}.amazonaws.com/{}/xgb-churn-prediction-model.tar.gz".format(
    bucket, region, prefix
)

image_uri = retrieve("xgboost", boto3.Session().region_name, "0.90-1")

model = Model(image_uri=image_uri, model_data=model_url, role=role)

모델 데이터 품질을 모니터링하는 데이터 캡처를 활성화하기 위해 `DataCaptureConfig`라는 새 캡처 옵션을 지정합니다. 이 구성으로 요청 페이로드, 응답 페이로드 또는 둘 다를 캡처할 수 있습니다. 캡처 구성은 모든 `variants`에 적용됩니다. 그리고 이 설정을 이용하여 모델 배포를 진행합니다.

In [6]:
from sagemaker.model_monitor import DataCaptureConfig

endpoint_name = "DEMO-xgb-churn-pred-model-monitor-" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print("EndpointName={}".format(endpoint_name))

data_capture_config = DataCaptureConfig(
    enable_capture=True, sampling_percentage=100, destination_s3_uri=s3_capture_upload_path
)

predictor = model.deploy(
    initial_instance_count=1,
    instance_type="ml.m4.xlarge",
    endpoint_name=endpoint_name,
    data_capture_config=data_capture_config,
)

EndpointName=DEMO-xgb-churn-pred-model-monitor-2021-11-08-12-23-43
---------------!

## 배포된 모델의 호출

이제 이 엔드포인트로 추론 데이터를 보내고 실시간으로 추론할 수 있습니다. 이전 단계에서 데이터 캡처를 활성화했기 때문에 요청 및 응답 페이로드는 일부 추가 메타데이터와 함께 DataCaptureConfig에서 지정한 Amazon Simple Storage Service(Amazon S3) 버킷에 저장됩니다.


이 단계는 약 3분 동안 샘플 데이터를 이용하여 SageMaker 엔드포인트를 호출합니다. 앞서 지정한 샘플링 비율에 따라 데이터가 캡처되고 데이터 캡처 옵션이 해제될 때까지 캡처가 계속됩니다.


In [7]:
from sagemaker.predictor import Predictor
from sagemaker.serializers import CSVSerializer
import time

predictor = Predictor(endpoint_name=endpoint_name, serializer=CSVSerializer())

# get a subset of test data for a quick test
!head -180 test_data/test-dataset-input-cols.csv > test_data/test_sample.csv
print("Sending test traffic to the endpoint {}. \nPlease wait...".format(endpoint_name))

with open("test_data/test_sample.csv", "r") as f:
    for row in f:
        payload = row.rstrip("\n")
        response = predictor.predict(data=payload)
        time.sleep(1)

print("Done!")

Sending test traffic to the endpoint DEMO-xgb-churn-pred-model-monitor-2021-11-08-12-23-43. 
Please wait...
Done!


## 캡처된 데이터 확인

이제 Amazon S3에 저장된 데이터 캡처 파일을 리스트업해 봅니다. 호출이 발생한 시간을 기준으로 서로 다른 파일이 표시될 것입니다. Amazon S3 경로의 형식은 다음과 같습니다.

`s3://{destination-bucket-prefix}/{endpoint-name}/{variant-name}/yyyy/mm/dd/hh/filename.jsonl`

In [8]:
s3_client = boto3.Session().client("s3")
current_endpoint_capture_prefix = "{}/{}".format(data_capture_prefix, endpoint_name)
result = s3_client.list_objects(Bucket=bucket, Prefix=current_endpoint_capture_prefix)
capture_files = [capture_file.get("Key") for capture_file in result.get("Contents")]
print("Found Capture Files:")
print("\n ".join(capture_files))

Found Capture Files:
sagemaker/DEMO-ModelMonitor/datacapture/DEMO-xgb-churn-pred-model-monitor-2021-11-08-12-23-43/AllTraffic/2021/11/08/12/48-44-057-5d1da6d1-0f64-4206-bc18-620308e0b0c6.jsonl
 sagemaker/DEMO-ModelMonitor/datacapture/DEMO-xgb-churn-pred-model-monitor-2021-11-08-12-23-43/AllTraffic/2021/11/08/12/49-44-989-5c8d94b0-6571-4108-873d-2950ba7df4e6.jsonl


다음으로 단일 캡처 파일의 내용을 봅니다. 캡처된 모든 데이터가 JSON 라인 형식으로 표시되어야 합니다. 캡처된 파일의 처음 몇 줄을 빠르게 살펴보십시오.

In [9]:
def get_obj_body(obj_key):
    return s3_client.get_object(Bucket=bucket, Key=obj_key).get("Body").read().decode("utf-8")


capture_file = get_obj_body(capture_files[-1])
print(capture_file[:2000])

{"captureData":{"endpointInput":{"observedContentType":"text/csv","mode":"INPUT","data":"130,0,263.7,113,186.5,103,195.3,99,18.3,6,1,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,1,0,1,0","encoding":"CSV"},"endpointOutput":{"observedContentType":"text/csv; charset=utf-8","mode":"OUTPUT","data":"0.3791300654411316","encoding":"CSV"}},"eventMetadata":{"eventId":"ad8a4b12-6f2e-4532-bbb5-1a8309bc5ac4","inferenceTime":"2021-11-08T12:49:44Z"},"eventVersion":"0"}
{"captureData":{"endpointInput":{"observedContentType":"text/csv","mode":"INPUT","data":"85,0,210.3,66,195.8,76,221.6,82,11.2,7,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,1,0,1,0","encoding":"CSV"},"endpointOutput":{"observedContentType":"text/csv; charset=utf-8","mode":"OUTPUT","data":"0.017852725461125374","encoding":"CSV"}},"eventMetadata":{"eventId":"100ba175-20a8-473e-acb4-4ca4322787b5","inferenceTime"

조금 더 눈에 띄는 형식으로 살펴봅니다. 

In [10]:
import json

print(json.dumps(json.loads(capture_file.split("\n")[0]), indent=2))

{
  "captureData": {
    "endpointInput": {
      "observedContentType": "text/csv",
      "mode": "INPUT",
      "data": "130,0,263.7,113,186.5,103,195.3,99,18.3,6,1,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,1,0,1,0",
      "encoding": "CSV"
    },
    "endpointOutput": {
      "observedContentType": "text/csv; charset=utf-8",
      "mode": "OUTPUT",
      "data": "0.3791300654411316",
      "encoding": "CSV"
    }
  },
  "eventMetadata": {
    "eventId": "ad8a4b12-6f2e-4532-bbb5-1a8309bc5ac4",
    "inferenceTime": "2021-11-08T12:49:44Z"
  },
  "eventVersion": "0"
}


각 추론요청은 jsonl 파일에서 하나의 line으로 표시되어 있습니다. 해당 라인은 입력과 출력을 모두 포함합니다. 예제에서 엔드포인트 호출 ContentType이 `text/csv` 로 지정되었기 때문에 `observedContentType` 속성값이 이를 반영하고 있고 `encoding` 항목에서도 마찬가지로 적용된 형식을 알려주고 있습니다. 

본 단계에서는 엔드포인트에 대한 입력과 출력 페이로드를 어떻게 캡처하는지 살펴보았습니다. 그리고 캡처한 데이터의 형식이 S3에 어떻게 저장되는지 살펴보았습니다. 다음 단계에서는 이렇게 수집한 데이터를 모니터링하고자 할 때 SageMaker가 어떻게 도움을 줄 수 있는지 살펴보겠습니다. 


# PART B: Model Monitor - 기준선(Baseline) 설정과 지속적인 모니터링 

추론 데이터 수집 외에도 Amazon SageMaker는 엔드포인트에서 캡처한 데이터를 모니터링하고 평가할 수 있는 기능을 제공합니다. 이를 위해:

1. 실시간 트래픽을 비교할 기준선(baseline)이 필요합니다.
2. 기준선이 준비되면 스케줄을 설정하여 기준선을 지속적으로 평가하고 비교합니다.

## 1. 기준선/학습 데이터 세트를 사용하여 제약 조건 제안

모델을 훈련한 훈련 데이터셋은 일반적으로 잘 정제된 데이터들입니다. 그리고 훈련 데이터셋의 스키마와 추론 데이터셋의 스키마는 정확히 일치해야 합니다(i.e. 기능의 수, 순서 등).

Amazon SageMaker는 학습단계에서 훈련 데이터셋을 탐색하여 기술 통계(`statistics`)를 생성고 이를 이용하여 모니터링용 제약 조건(`constraints`)를 제안하는 기능을 제공합니다. 본 예제에서는 다음 코드를 이용하여 학습에 사용한 훈련 데이터셋을 S3에 업로드하겠습니다. 


In [11]:
# copy over the training dataset to Amazon S3 (if you already have it in Amazon S3, you could reuse it)
baseline_prefix = prefix + "/baselining"
baseline_data_prefix = baseline_prefix + "/data"
baseline_results_prefix = baseline_prefix + "/results"

baseline_data_uri = "s3://{}/{}".format(bucket, baseline_data_prefix)
baseline_results_uri = "s3://{}/{}".format(bucket, baseline_results_prefix)
print("Baseline data uri: {}".format(baseline_data_uri))
print("Baseline results uri: {}".format(baseline_results_uri))

Baseline data uri: s3://sagemaker-ap-northeast-2-095389425207/sagemaker/DEMO-ModelMonitor/baselining/data
Baseline results uri: s3://sagemaker-ap-northeast-2-095389425207/sagemaker/DEMO-ModelMonitor/baselining/results


In [12]:
training_data_file = open("test_data/training-dataset-with-header.csv", "rb")
s3_key = os.path.join(baseline_prefix, "data", "training-dataset-with-header.csv")
boto3.Session().resource("s3").Bucket(bucket).Object(s3_key).upload_fileobj(training_data_file)

### 훈련 데이터셋에 대한 기준선 작업 생성

이제 S3에 훈련 데이터가 준비되었으므로 제약 조건을 제안(`suggest`)하는 작업을 시작합니다. `DefaultModelMonitor.suggest_baseline(..)`은 Amazon SageMaker에서 제공하는 Model Monitor 컨테이너를 사용하여 `ProcessingJob`을 시작하고 제약 조건을 생성합니다.


In [13]:
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat

my_default_monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600,
)

my_default_monitor.suggest_baseline(
    baseline_dataset=baseline_data_uri + "/training-dataset-with-header.csv",
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=baseline_results_uri,
    wait=True,
)


Job Name:  baseline-suggestion-job-2021-11-08-13-18-16-155
Inputs:  [{'InputName': 'baseline_dataset_input', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-ap-northeast-2-095389425207/sagemaker/DEMO-ModelMonitor/baselining/data/training-dataset-with-header.csv', 'LocalPath': '/opt/ml/processing/input/baseline_dataset_input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'monitoring_output', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://sagemaker-ap-northeast-2-095389425207/sagemaker/DEMO-ModelMonitor/baselining/results', 'LocalPath': '/opt/ml/processing/output', 'S3UploadMode': 'EndOfJob'}}]
........................[34m2021-11-08 13:22:07,806 - matplotlib.font_manager - INFO - Generating new fontManager, this may take some time...[0m
[34m2021-11-08 13:22:08.320004: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcu

<sagemaker.processing.ProcessingJob at 0x7fce67e0a410>

### 생성된 제약 조건 및 통계 탐색

In [14]:
s3_client = boto3.Session().client("s3")
result = s3_client.list_objects(Bucket=bucket, Prefix=baseline_results_prefix)
report_files = [report_file.get("Key") for report_file in result.get("Contents")]
print("Found Files:")
print("\n ".join(report_files))

Found Files:
sagemaker/DEMO-ModelMonitor/baselining/results/constraints.json
 sagemaker/DEMO-ModelMonitor/baselining/results/statistics.json


In [15]:
import pandas as pd

baseline_job = my_default_monitor.latest_baselining_job
schema_df = pd.io.json.json_normalize(baseline_job.baseline_statistics().body_dict["features"])
schema_df.head(10)

  after removing the cwd from sys.path.


Unnamed: 0,name,inferred_type,numerical_statistics.common.num_present,numerical_statistics.common.num_missing,numerical_statistics.mean,numerical_statistics.sum,numerical_statistics.std_dev,numerical_statistics.min,numerical_statistics.max,numerical_statistics.distribution.kll.buckets,numerical_statistics.distribution.kll.sketch.parameters.c,numerical_statistics.distribution.kll.sketch.parameters.k,numerical_statistics.distribution.kll.sketch.data
0,Churn,Integral,2333,0,0.139306,325.0,0.346265,0.0,1.0,"[{'lower_bound': 0.0, 'upper_bound': 0.1, 'cou...",0.64,2048.0,"[[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0,..."
1,Account Length,Integral,2333,0,101.276897,236279.0,39.552442,1.0,243.0,"[{'lower_bound': 1.0, 'upper_bound': 25.2, 'co...",0.64,2048.0,"[[119.0, 100.0, 111.0, 181.0, 95.0, 104.0, 70...."
2,VMail Message,Integral,2333,0,8.214316,19164.0,13.776908,0.0,51.0,"[{'lower_bound': 0.0, 'upper_bound': 5.1, 'cou...",0.64,2048.0,"[[19.0, 0.0, 0.0, 40.0, 36.0, 0.0, 0.0, 24.0, ..."
3,Day Mins,Fractional,2333,0,180.226489,420468.4,53.987179,0.0,350.8,"[{'lower_bound': 0.0, 'upper_bound': 35.08, 'c...",0.64,2048.0,"[[178.1, 160.3, 197.1, 105.2, 283.1, 113.6, 23..."
4,Day Calls,Integral,2333,0,100.259323,233905.0,20.165008,0.0,165.0,"[{'lower_bound': 0.0, 'upper_bound': 16.5, 'co...",0.64,2048.0,"[[110.0, 138.0, 117.0, 61.0, 112.0, 87.0, 122...."
5,Eve Mins,Fractional,2333,0,200.050107,466716.9,50.015928,31.2,361.8,"[{'lower_bound': 31.2, 'upper_bound': 64.26, '...",0.64,2048.0,"[[212.8, 221.3, 227.8, 341.3, 286.2, 158.6, 29..."
6,Eve Calls,Integral,2333,0,99.573939,232306.0,19.675578,12.0,170.0,"[{'lower_bound': 12.0, 'upper_bound': 27.8, 'c...",0.64,2048.0,"[[100.0, 92.0, 128.0, 79.0, 86.0, 98.0, 112.0,..."
7,Night Mins,Fractional,2333,0,201.388598,469839.6,50.627961,23.2,395.0,"[{'lower_bound': 23.2, 'upper_bound': 60.37999...",0.64,2048.0,"[[226.3, 150.4, 214.0, 165.7, 261.7, 187.7, 20..."
8,Night Calls,Integral,2333,0,100.227175,233830.0,19.282029,42.0,175.0,"[{'lower_bound': 42.0, 'upper_bound': 55.3, 'c...",0.64,2048.0,"[[123.0, 120.0, 101.0, 97.0, 129.0, 87.0, 112...."
9,Intl Mins,Fractional,2333,0,10.253065,23920.4,2.778766,0.0,18.4,"[{'lower_bound': 0.0, 'upper_bound': 1.8399999...",0.64,2048.0,"[[10.0, 11.2, 9.3, 6.3, 11.3, 10.5, 0.0, 9.7, ..."


In [16]:
constraints_df = pd.io.json.json_normalize(
    baseline_job.suggested_constraints().body_dict["features"]
)
constraints_df.head(10)

  


Unnamed: 0,name,inferred_type,completeness,num_constraints.is_non_negative
0,Churn,Integral,1.0,True
1,Account Length,Integral,1.0,True
2,VMail Message,Integral,1.0,True
3,Day Mins,Fractional,1.0,True
4,Day Calls,Integral,1.0,True
5,Eve Mins,Fractional,1.0,True
6,Eve Calls,Integral,1.0,True
7,Night Mins,Fractional,1.0,True
8,Night Calls,Integral,1.0,True
9,Intl Mins,Fractional,1.0,True


## 2. 데이터 품질 모니터링을 위해 수집 데이터 분석하기

이제 데이터를 수집작업에 대한 모니터링 스케줄을 설정합니다. 

### 스케줄 생성

In [17]:
# First, copy over some test scripts to the S3 bucket so that they can be used for pre and post processing
boto3.Session().resource("s3").Bucket(bucket).Object(code_prefix + "/preprocessor.py").upload_file(
    "preprocessor.py"
)
boto3.Session().resource("s3").Bucket(bucket).Object(code_prefix + "/postprocessor.py").upload_file(
    "postprocessor.py"
)

앞서 생성한 엔드포인트에 대한 모델 모니터링 스케줄을 생성할 수 있습니다. 기준선 정보(제약 조건 및 통계)를 사용하여 실시간 추론 트래픽과 비교합니다.


In [18]:
from sagemaker.model_monitor import CronExpressionGenerator
from time import gmtime, strftime

mon_schedule_name = "DEMO-xgb-churn-pred-model-monitor-schedule-" + strftime(
    "%Y-%m-%d-%H-%M-%S", gmtime()
)
my_default_monitor.create_monitoring_schedule(
    monitor_schedule_name=mon_schedule_name,
    endpoint_input=predictor.endpoint,
    # record_preprocessor_script=pre_processor_script,
    post_analytics_processor_script=s3_code_postprocessor_uri,
    output_s3_uri=s3_report_path,
    statistics=my_default_monitor.baseline_statistics(),
    constraints=my_default_monitor.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,
)

The endpoint attribute has been renamed in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


### 인공 트래픽 생성 시작

아래 셀은 일부 트래픽을 엔드포인트로 보내기 위해 스레드를 시작합니다. 이 스레드를 종료하려면 커널을 중지해야 합니다. 트래픽이 없으면 처리할 데이터가 없기 때문에 모니터링 작업은 실패(`Failed`)로 표시됩니다.


In [19]:
from threading import Thread
from time import sleep
import time

endpoint_name = predictor.endpoint
runtime_client = boto3.client("runtime.sagemaker")

# (just repeating code from above for convenience/ able to run this section independently)
def invoke_endpoint(ep_name, file_name, runtime_client):
    with open(file_name, "r") as f:
        for row in f:
            payload = row.rstrip("\n")
            response = runtime_client.invoke_endpoint(
                EndpointName=ep_name, ContentType="text/csv", Body=payload
            )
            response["Body"].read()
            time.sleep(1)


def invoke_endpoint_forever():
    while True:
        invoke_endpoint(endpoint_name, "test_data/test-dataset-input-cols.csv", runtime_client)


thread = Thread(target=invoke_endpoint_forever)
thread.start()

# Note that you need to stop the kernel to stop the invocations

The endpoint attribute has been renamed in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


### 생성된 스케줄 살펴보기 

`MonitoringScheduleStatus`가 Scheduled로 변경되는 것을 확인합니다.

In [21]:
desc_schedule_result = my_default_monitor.describe_schedule()
print("Schedule status: {}".format(desc_schedule_result["MonitoringScheduleStatus"]))

Schedule status: Scheduled


### 실행 이력 살펴보기

스케줄 생성시 지정한 시간 간격으로 분석 작업이 진행됩니다. 여기에 최근 실행결과가 리스트업됩니다. 우리는 1시간 간격으로 스케줄을 만들었으므로 스케줄 생성한 이후단,  바로 이 작업을 시작하는 경우 실행이 비어 있을 수 있습니다. 실행이 시작되는 것을 보려면 시간 경계(UTC 기준)를 넘을 때까지 기다려야 할 수도 있습니다. 아래 코드에 대기 로직이 있습니다.

참고: 시간별 일정의 경우에도 Amazon SageMaker에는 실행까지 20분의 버퍼 기간이 있습니다. 그래서 매 시간 경계에서 0분에서 ~20분 사이에 실행이 시작되는 것을 볼 수 있습니다. 


In [24]:
mon_executions = my_default_monitor.list_executions()
print(
    "We created a hourly schedule above and it will kick off executions ON the hour (plus 0 - 20 min buffer.\nWe will have to wait till we hit the hour..."
)

while len(mon_executions) == 0:
    print("Waiting for the 1st execution to happen...")
    time.sleep(60)
    mon_executions = my_default_monitor.list_executions()

No executions found for schedule. monitoring_schedule_name: DEMO-xgb-churn-pred-model-monitor-schedule-2021-11-08-13-37-22
We created a hourly schedule above and it will kick off executions ON the hour (plus 0 - 20 min buffer.
We will have to wait till we hit the hour...
Waiting for the 1st execution to happen...
No executions found for schedule. monitoring_schedule_name: DEMO-xgb-churn-pred-model-monitor-schedule-2021-11-08-13-37-22
Waiting for the 1st execution to happen...


### 특정 실행결과 (최근 실행) 살펴보기 

이전 셀에서는 가장 최근에 완료 또는 실패한 스케줄 실행을 살펴보았습니다. 여기서 가능한 터미널 상태와 각각의 의미는 다음과 같습니다.

* Completed - 모니터링 실행이 완료되었으며 위반 보고서에서 문제가 발견되지 않았음을 의미합니다.
* CompletedWithViolations - 실행이 완료되었지만 제약 조건 위반이 감지되었음을 의미합니다.
* Failed - 클라이언트 오류(아마도 잘못된 역할 권한) 또는 인프라 문제로 인해 모니터링 실행이 실패했습니다. 정확히 무슨 일이 일어났는지 확인하려면 FailureReason 및 ExitMessage에 대한 추가 조사가 필요합니다.
* Stopped - 작업이 최대 런타임을 초과했거나 수동으로 중지되었습니다.
     

In [28]:
latest_execution = mon_executions[
    -1
]  # latest execution's index is -1, second to last is -2 and so on..
time.sleep(60)
latest_execution.wait(logs=False)

print("Latest execution status: {}".format(latest_execution.describe()["ProcessingJobStatus"]))
print("Latest execution result: {}".format(latest_execution.describe()["ExitMessage"]))

latest_job = latest_execution.describe()
if latest_job["ProcessingJobStatus"] != "Completed":
    print(
        "====STOP==== \n No completed executions to inspect further. Please wait till an execution completes or investigate previously reported failures."
    )

!Latest execution status: Completed
Latest execution result: CompletedWithViolations: Job completed successfully with 60 violations.


In [29]:
report_uri = latest_execution.output.destination
print("Report Uri: {}".format(report_uri))

Report Uri: s3://sagemaker-ap-northeast-2-095389425207/sagemaker/DEMO-ModelMonitor/reports/DEMO-xgb-churn-pred-model-monitor-2021-11-08-12-23-43/DEMO-xgb-churn-pred-model-monitor-schedule-2021-11-08-13-37-22/2021/11/08/14


### 생성된 보고서 리스트업

In [30]:
from urllib.parse import urlparse

s3uri = urlparse(report_uri)
report_bucket = s3uri.netloc
report_key = s3uri.path.lstrip("/")
print("Report bucket: {}".format(report_bucket))
print("Report key: {}".format(report_key))

s3_client = boto3.Session().client("s3")
result = s3_client.list_objects(Bucket=report_bucket, Prefix=report_key)
report_files = [report_file.get("Key") for report_file in result.get("Contents")]
print("Found Report Files:")
print("\n ".join(report_files))

Report bucket: sagemaker-ap-northeast-2-095389425207
Report key: sagemaker/DEMO-ModelMonitor/reports/DEMO-xgb-churn-pred-model-monitor-2021-11-08-12-23-43/DEMO-xgb-churn-pred-model-monitor-schedule-2021-11-08-13-37-22/2021/11/08/14
Found Report Files:
sagemaker/DEMO-ModelMonitor/reports/DEMO-xgb-churn-pred-model-monitor-2021-11-08-12-23-43/DEMO-xgb-churn-pred-model-monitor-schedule-2021-11-08-13-37-22/2021/11/08/14/constraint_violations.json
 sagemaker/DEMO-ModelMonitor/reports/DEMO-xgb-churn-pred-model-monitor-2021-11-08-12-23-43/DEMO-xgb-churn-pred-model-monitor-schedule-2021-11-08-13-37-22/2021/11/08/14/constraints.json
 sagemaker/DEMO-ModelMonitor/reports/DEMO-xgb-churn-pred-model-monitor-2021-11-08-12-23-43/DEMO-xgb-churn-pred-model-monitor-schedule-2021-11-08-13-37-22/2021/11/08/14/statistics.json


### 위반 보고서

기준과 비교하여 위반 사항이 있는 경우 여기에 나열됩니다.

In [31]:
violations = my_default_monitor.latest_monitoring_constraint_violations()
pd.set_option("display.max_colwidth", -1)
constraints_df = pd.io.json.json_normalize(violations.body_dict["violations"])
constraints_df.head(10)

  
  This is separate from the ipykernel package so we can avoid doing imports until


Unnamed: 0,feature_name,constraint_check_type,description
0,State_MD,data_type_check,"Data type match requirement is not met. Expected data type: Integral, Expected match: 100.0%. Observed: Only 99.63768115942028% of data is Integral."
1,State_TX,data_type_check,"Data type match requirement is not met. Expected data type: Integral, Expected match: 100.0%. Observed: Only 99.63768115942028% of data is Integral."
2,State_MA,data_type_check,"Data type match requirement is not met. Expected data type: Integral, Expected match: 100.0%. Observed: Only 99.63768115942028% of data is Integral."
3,State_RI,data_type_check,"Data type match requirement is not met. Expected data type: Integral, Expected match: 100.0%. Observed: Only 99.63768115942028% of data is Integral."
4,State_UT,data_type_check,"Data type match requirement is not met. Expected data type: Integral, Expected match: 100.0%. Observed: Only 99.63768115942028% of data is Integral."
5,State_CT,data_type_check,"Data type match requirement is not met. Expected data type: Integral, Expected match: 100.0%. Observed: Only 99.63768115942028% of data is Integral."
6,State_MT,data_type_check,"Data type match requirement is not met. Expected data type: Integral, Expected match: 100.0%. Observed: Only 99.63768115942028% of data is Integral."
7,State_PA,data_type_check,"Data type match requirement is not met. Expected data type: Integral, Expected match: 100.0%. Observed: Only 99.63768115942028% of data is Integral."
8,State_WA,data_type_check,"Data type match requirement is not met. Expected data type: Integral, Expected match: 100.0%. Observed: Only 99.63768115942028% of data is Integral."
9,Churn,data_type_check,"Data type match requirement is not met. Expected data type: Integral, Expected match: 100.0%. Observed: Only 0.0% of data is Integral."


### 스케줄 시작/종료

다음 코드를 이용하여 모니터링 스케줄을 중지하거나 시작할 수 있습니다.


In [None]:
# my_default_monitor.stop_monitoring_schedule()
# my_default_monitor.start_monitoring_schedule()

## 리소스 삭제

엔드포인트를 계속 실행하여 데이터 캡처를 계속할 수 있습니다. 더 많은 데이터를 수집하거나 이 엔드포인트를 더 이상 사용할 계획이 없는 경우 추가 요금이 발생하지 않도록 엔드포인트를 삭제해야 합니다. 엔드포인트를 삭제해도 모델 호출 중에 캡처된 데이터는 삭제되지 않습니다. 해당 데이터는 사용자가 직접 삭제할 때까지 Amazon S3에 유지됩니다.

엔드포인트를 삭제하기 위해서는 그 전에 먼저 스케줄을 삭제해야 합니다.

In [None]:
my_default_monitor.delete_monitoring_schedule()
time.sleep(60)  # actually wait for the deletion

In [None]:
predictor.delete_endpoint()

In [None]:
predictor.delete_model()