# データ品質モニタリングのステップA

このノートブックを実行する時のヒント：   
- KernelはPython3（Data Science）で動作確認をしています。
- デフォルトではSageMakerのデフォルトBucketを利用します。必要に応じて変更することも可能です。
- 実際に動かさなくても出力を確認できるようにセルのアウトプットを残しています。きれいな状態から実行したい場合は、右クリックメニューから "Clear All Outputs"を選択して出力をクリアしてから始めてください。
- 作成されたスケジュールはSageMaker Studioの`SageMaker resource` （左側ペインの一番下）のEndpointメニューからも確認可能です

<span style="color: orange; font-size: 120%; ">モデル名は実行前に設定変更が必要です</span>

In [4]:
# step-0-train-model.ipynb でトレーニングしたモデルの名前に変更してください
model_name = 'nyctaxi-xgboost-regression-2022-12-15-10-58-19-model'

複数のノートブックで共通で使用する変数

In [5]:
# エンドポイント名を指定する
endpoint_name = 'nyctaxi-xgboost-endpoint'

# エンドポイントConfigの名前を指定する
endpoint_config_name = f'{endpoint_name}-config'

# データ品質のモニタリングスケジュールの名前を指定する
data_quality_monitoring_schedule = f'{endpoint_name}-data-quality-schedule'

モニタリング結果を保管するための、ベースラインやレポートのS3上のPrefixを設定します

In [6]:
# ベースラインの出力先Prefixを設定する
baseline_prefix = 'model_monitor/data_quality_baseline'

# 時系列での可視化のために、複数のレポートに共通するPrefixを設定する
report_prefix = 'model_monitor/data_quality_monitoring_report'

### A1. 推論エンドポイントにデータキャプチャの設定を行う

#### データキャプチャの設定を入れた新しい推論エンドポイントを作成する場合

In [7]:
import boto3
import time
import sagemaker

In [8]:
sm_client = boto3.client('sagemaker')
bucket = sagemaker.Session().default_bucket()

# Create endpoint config
create_endpoint_config_response = sm_client.create_endpoint_config(
    EndpointConfigName=endpoint_config_name,
    ProductionVariants=[{
        'InstanceType': 'ml.t2.medium',
        'InitialVariantWeight': 1,
        'InitialInstanceCount': 1,
        'ModelName': model_name,
        'VariantName': 'AllTraffic'}],
    # Set data capture config
    DataCaptureConfig={
        'EnableCapture': True,
        'InitialSamplingPercentage': 100,
        'DestinationS3Uri': f's3://{bucket}/model_monitor/endpoint-data-capture',
        'CaptureOptions': [{'CaptureMode': 'Input'}, {'CaptureMode': 'Output'}],
        'CaptureContentTypeHeader': {
            'CsvContentTypes': ['text/csv'],
            'JsonContentTypes': ['application/json']
        }
    }
)

<span style="color: orange; ">エンドポイントのデプロイに10分程度の時間がかかります</span>

In [9]:
def wait_for_endpoint_creation(endpoint_name):
    sm_client = boto3.client('sagemaker')
    
    # Check endpoint creation status
    resp = sm_client.describe_endpoint(EndpointName=endpoint_name)
    status = resp['EndpointStatus']
    while status=='Creating':
        print("Status: " + status)
        time.sleep(60)
        resp = sm_client.describe_endpoint(EndpointName=endpoint_name)
        status = resp['EndpointStatus']
        
    print('Finished!', status)

    
# Create endpoint
create_endpoint_response = sm_client.create_endpoint(
    EndpointName=endpoint_name,
    EndpointConfigName=endpoint_config_name
)
wait_for_endpoint_creation(endpoint_name)

Status: Creating
Status: Creating
Status: Creating
Status: Creating
Status: Creating
Status: Creating
Finished! InService


### A2. ベースラインを作成する
<span style="color: orange; ">ベースラインの計算には20分から25分程度の時間がかかります</span>

In [None]:
import sagemaker
from sagemaker import model_monitor
from sagemaker.model_monitor.dataset_format import DatasetFormat

my_default_monitor = model_monitor.DefaultModelMonitor(
    role=sagemaker.get_execution_role(),
    instance_count=1,
    instance_type='ml.t3.large',
    volume_size_in_gb=100,
    max_runtime_in_seconds=3600,
)

my_default_monitor.suggest_baseline(
    baseline_dataset=f's3://{bucket}/model_monitor/data_quality_baseline_input/',
    dataset_format=DatasetFormat.csv(header=True, output_columns_position='START'),
    output_s3_uri=f's3://{bucket}/{baseline_prefix}',
    wait=True
)

### A3. モニタリングをスケジュールする

ベースラインを作成したModelMonitorインスタンスをそのまま使う場合

In [11]:
my_default_monitor.create_monitoring_schedule(
    monitor_schedule_name=data_quality_monitoring_schedule,
    endpoint_input=endpoint_name,
    output_s3_uri=f's3://{bucket}/{report_prefix}',
    statistics=my_default_monitor.baseline_statistics(),
    constraints=my_default_monitor.suggested_constraints(),
    schedule_cron_expression=model_monitor.CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,
)

作成されたスケジュールを確認

In [12]:
sm_client.describe_monitoring_schedule(MonitoringScheduleName=data_quality_monitoring_schedule)

{'MonitoringScheduleArn': 'arn:aws:sagemaker:ap-northeast-1:370828233696:monitoring-schedule/nyctaxi-xgboost-endpoint-data-quality-schedule',
 'MonitoringScheduleName': 'nyctaxi-xgboost-endpoint-data-quality-schedule',
 'MonitoringScheduleStatus': 'Pending',
 'MonitoringType': 'DataQuality',
 'CreationTime': datetime.datetime(2022, 12, 16, 10, 19, 36, 357000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2022, 12, 16, 10, 19, 36, 447000, tzinfo=tzlocal()),
 'MonitoringScheduleConfig': {'ScheduleConfig': {'ScheduleExpression': 'cron(0 * ? * * *)'},
  'MonitoringJobDefinitionName': 'data-quality-job-definition-2022-12-16-10-19-35-972',
  'MonitoringType': 'DataQuality'},
 'EndpointName': 'nyctaxi-xgboost-endpoint',
 'LastMonitoringExecutionSummary': {'MonitoringScheduleName': 'nyctaxi-xgboost-endpoint-data-quality-schedule',
  'ScheduledTime': datetime.datetime(2022, 12, 16, 9, 0, tzinfo=tzlocal()),
  'CreationTime': datetime.datetime(2022, 12, 16, 9, 5, 34, 301000, tzinfo=tz

# ステップAに必要なコードの実行はここまで

# 以下は参考

### 参考｜既存の推論エンドポイントにデータキャプチャの設定を行う場合

In [None]:
# データキャプチャの設定をする既存の推論エンドポイントの名前を設定
endpoint_name = 'nyctaxi-xgboost-endpoint'

In [None]:
from sagemaker.model_monitor import DataCaptureConfig
import sagemaker

bucket = sagemaker.Session().default_bucket()
data_capture_config = DataCaptureConfig(
    enable_capture = True,
    sampling_percentage=100,
    destination_s3_uri=f's3://{bucket}/model_monitor/endpoint-data-capture',
    capture_options=["REQUEST", "RESPONSE"],
    csv_content_types=["text/csv"],
    json_content_types=["application/json"]
)

predictor = sagemaker.Predictor(endpoint_name=endpoint_name)
predictor.update_data_capture_config(data_capture_config=data_capture_config)

### 参考｜過去に作成したベースラインを利用してスケジュールを作成する場合

In [61]:
from sagemaker import model_monitor

statistics_from_s3 = model_monitor.Statistics.from_s3_uri(f's3://{bucket}/{baseline_prefix}/statistics.json',)
constraints_from_s3 = model_monitor.Constraints.from_s3_uri(f's3://{bucket}/{baseline_prefix}/constraints.json',)

In [66]:
my_default_monitor = model_monitor.DefaultModelMonitor(
    role=sagemaker.get_execution_role(),
    instance_count=1,
    instance_type='ml.t3.large',
    volume_size_in_gb=100,
    max_runtime_in_seconds=3600,
)

my_default_monitor.create_monitoring_schedule(
    monitor_schedule_name=data_quality_monitoring_schedule,
    endpoint_input=endpoint_name,
    output_s3_uri=f's3://{bucket}/{report_prefix}',
    statistics=statistics_from_s3,
    constraints=constraints_from_s3,
    schedule_cron_expression=model_monitor.CronExpressionGenerator.daily(),
    enable_cloudwatch_metrics=True,
)

### 参考｜スケジュール用cron式の生成例

In [None]:
print(model_monitor.CronExpressionGenerator.hourly())
print(model_monitor.CronExpressionGenerator.daily())
print(model_monitor.CronExpressionGenerator.daily_every_x_hours(6))
print(model_monitor.CronExpressionGenerator.daily_every_x_hours(6, starting_hour=2))