## SageMaker Pipelines integration with Model Monitor and Clarify


---

This notebook's CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook. 

![This us-west-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/us-west-2/sagemaker-pipelines|tabular|model-monitor-clarify-pipelines|sagemaker-pipeline-model-monitor-clarify-steps.ipynb)

---


This notebook showcases how Model Monitor and Clarify steps can be integrated with SageMaker Pipelines. This allows users to calculate
baselines for data quality and model quality checks by running the underlying Model Monitor and Clarify containers.

## Data/Model Quality, Bias, and Model Explainability Checks in SageMaker Pipelines

This notebook introduces two new step types in SageMaker Pipelines -
* `QualityCheckStep`
* `ClarifyCheckStep`

With these two steps, the pipeline is able to perform baseline calculations that are needed as a standard against which data/model quality issues can be detected (including bias and explainability).

These steps leverage SageMaker pre-built containers:

* `QualityCheckStep` (for Data/Model Quality): [sagemaker-model-monitor-analyzer](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-pre-built-container.html)
* `ClarifyCheckStep` (for Data/Model Bias and Model Explainability): [sagemaker-clarify-processing](https://docs.aws.amazon.com/sagemaker/latest/dg/clarify-configure-processing-jobs.html#clarify-processing-job-configure-container)

The training dataset that you used to train the model is usually a good baseline dataset. The training dataset data schema and the inference dataset schema should exactly match (the number and order of the features). Note that the prediction/output columns are assumed to be the first columns in the training dataset. From the training dataset, you can ask SageMaker to suggest a set of baseline constraints and generate descriptive statistics to explore the data.

These two new steps will always calculate new baselines using the dataset provided.

### Drift Check Baselines in the Model Registry

The `RegisterStep` has a new parameter called `drift_check_baselines`. This refers to the baseline files associated with the model. When deployed, these baseline files are used by Model Monitor for Model Quality/Data Quality checks. In addition, these baselines can be used in `QualityCheckStep` and `ClarifyCheckStep` to compare newly trained models against models that have already been registered in the Model Registry.

### Step Properties

The `QualityCheckStep` has the following properties -

* `CalculatedBaselineStatistics` : The baseline statistics file calculated by the underlying Model Monitor container.
* `CalculatedBaselineConstraints` : The baseline constraints file calculated by the underlying Model Monitor container.
* `BaselineUsedForDriftCheckStatistics` and `BaselineUsedForDriftCheckConstraints` : These are the two properties used to set `drift_check_baseline` in the Model Registry. The values set in these properties vary depending on the parameters passed to the step. The different behaviors are described in the table below.

The `ClarifyCheckStep` has the following properties -

* `CalculatedBaselineConstraints` : The baseline constraints file calculated by the underlying Clarify container.
* `BaselineUsedForDriftCheckConstraints` : This property is used to set `drift_check_baseline` in the Model Registry. The values set in this property will vary depending on the parameters passed to the step. The different behaviors are described in the table below.

### Notebook Overview

This notebook should be run with `Python 3.9` using the SageMaker Studio `Python3 (Data Science)` kernel.

Let's start by installing the SageMaker Python SDK, boto, and AWS CLI.

In [None]:
! pip install botocore boto3 awscli --upgrade
! pip install "sagemaker>=2.99.0"

In [None]:
import os
import json
import boto3
import sagemaker
import sagemaker.session

from sagemaker import utils
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput, CreateModelInput, TransformInput
from sagemaker.model import Model
from sagemaker.transformer import Transformer

from sagemaker.model_metrics import MetricsSource, ModelMetrics, FileSource
from sagemaker.drift_check_baselines import DriftCheckBaselines
from sagemaker.processing import (
    ProcessingInput,
    ProcessingOutput,
    ScriptProcessor,
)
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

from sagemaker.workflow.parameters import (
    ParameterBoolean,
    ParameterInteger,
    ParameterString,
)
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.steps import (
    ProcessingStep,
    TrainingStep,
    CreateModelStep,
    TransformStep,
)
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.pipeline_context import PipelineSession

# Importing new steps and helper functions

from sagemaker.workflow.check_job_config import CheckJobConfig
from sagemaker.workflow.clarify_check_step import (
    DataBiasCheckConfig,
    ClarifyCheckStep,
    ModelBiasCheckConfig,
    ModelPredictedLabelConfig,
    ModelExplainabilityCheckConfig,
    SHAPConfig,
)
from sagemaker.workflow.quality_check_step import (
    DataQualityCheckConfig,
    ModelQualityCheckConfig,
    QualityCheckStep,
)
from sagemaker.workflow.execution_variables import ExecutionVariables
from sagemaker.workflow.functions import Join
from sagemaker.model_monitor import DatasetFormat, model_monitoring
from sagemaker.clarify import BiasConfig, DataConfig, ModelConfig

### Create the SageMaker Session

In [None]:
region = sagemaker.Session().boto_region_name
sm_client = boto3.client("sagemaker")
boto_session = boto3.Session(region_name=region)
sagemaker_session = sagemaker.session.Session(boto_session=boto_session, sagemaker_client=sm_client)
pipeline_session = PipelineSession()
prefix = "model-monitor-clarify-step-pipeline"

### Define variables and parameters needed for the Pipeline steps

In [None]:
role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket()
base_job_prefix = "model-monitor-clarify"
model_package_group_name = "model-monitor-clarify-group"
pipeline_name = "model-monitor-clarify-pipeline"

### Define pipeline parameters

Both `QualityCheckStep` and `ClarifyCheckStep` use two boolean flags `skip_check` and `register_new_baseline` to control their behavior.

* `skip_check` : This determines if a drift check is executed or not.
* `register_new_baseline` : This determines if the newly calculated baselines (in the step property `CalculatedBaselines`) should be set in the step property `BaselineUsedForDriftCheck`.
* `supplied_baseline_statistics` and `supplied_baseline_constraints` : If `skip_check` is set to False, baselines can be provided to this step through this parameter. If provided, the step will compare the newly calculated baselines (`CalculatedBaselines`) against those provided here instead of finding the latest baselines from the Model Registry. In the case of `ClarifyCheckStep`, only `supplied_baseline_constraints` is a valid parameter, for `QualityCheckStep`, both parameters are used.
* `model_package_group_name` : The step will use the `drift_check_baselines` from the latest approved model in the model package group for the drift check. If `supplied_baseline_*` is provided, this field will be ignored.

The first time the pipeline is run, the `skip_check` value should be set to True using the pipeline execution parameters so that new baselines are registered and no drift check is executed.

### Combining Pipeline parameters

This table summarizes how the pipeline parameters work when combined.

The parameter `drift_check_baselines` is used to supply baselines to the `RegisterStep` that will be used for all drift checks involving the model.

Newly calculated baselines can be reference by the properties `CalculatedBaselineStatistics` and `CalculatedBaselineConstraints` on the `QualityCheckStep` and `CalculatedBaselineConstraints` on the `ClarifyCheckStep`.

For example, `data_quality_check_step.properties.CalculatedBaselineStatistics` and `data_quality_check_step.properties.CalculatedBaselineConstraints`. This property refers to the baseline that is calculated when the data quality check step is executed.


| `skip_check` / `register_new_baseline` | Does step do a drift check?                              | Value of step property `CalculatedBaseline`                                                           | Value of step property `BaselineUsedForDriftCheck`      | Possible Circumstances for this parameter combination|
| -------------------------------------- | ---------------------------------------------------------|------------------------------------------------------------                                           |------------------------------------------------- | -----------------------------------------------------|
| F / F                                  | Drift Check executed against existing baselines.         | New baselines calculated by step execution                                                            |  Baseline from latest approved model in Model Registry or baseline supplied as step parameter                                    | Regular re-training with checks enabled to get a new model version, but carry over previous baselines as DriftCheckBaselines in Registry for new model version.                                                                                                                                                             |
| F / T                                  | Drift Check executed against existing baselines.         | New baselines calculated by step execution                  | Newly calculated baseline by step execution (value of property `CalculatedBaseline`)                                     | Regular re-training with checks enabled to get a new model version, but refresh DriftCheckBaselines in Registry with newly calculated baselines for the new model version.                                                                                                                                                  |
| T / F                                  | No Drift Check.                                          | New baselines calculated by step execution          | Baseline from latest approved model in Model Registry or baseline supplied as step parameter                                     | Violation detected by the model monitor on endpoint for a particular type of check and the pipeline is triggered for retraining a new model. Skip the check against previous baselines, but carry over previous baselines as DriftCheckBaselines in Registry for new model version.             |
| T / T                                  | No Drift Check.                                          | New baselines calculated by step execution                  | Newly calculated baseline by step execution (value of property `CalculatedBaseline`)                                     | a. Initial run of the pipeline, building the first model version and generate initial baselines. <br>b. Violation detected by the model monitor on endpoint for a particular type of check and the pipeline is triggered for retraining a new model. Skip the check against previous baselines and refresh DriftCheckBaselines with newly calculated baselines in Registry directly.  |

In [None]:
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)
# The dataset used here is the open source Abalone dataset that can be found
# here - https://archive.ics.uci.edu/ml/datasets/abalone
input_data = ParameterString(
    name="InputDataUrl",
    default_value=f"s3://sagemaker-example-files-prod-{region}/datasets/tabular/uci_abalone/abalone.csv",
)

# for data quality check step
skip_check_data_quality = ParameterBoolean(name="SkipDataQualityCheck", default_value=False)
register_new_baseline_data_quality = ParameterBoolean(
    name="RegisterNewDataQualityBaseline", default_value=False
)
supplied_baseline_statistics_data_quality = ParameterString(
    name="DataQualitySuppliedStatistics", default_value=""
)
supplied_baseline_constraints_data_quality = ParameterString(
    name="DataQualitySuppliedConstraints", default_value=""
)

# for data bias check step
skip_check_data_bias = ParameterBoolean(name="SkipDataBiasCheck", default_value=False)
register_new_baseline_data_bias = ParameterBoolean(
    name="RegisterNewDataBiasBaseline", default_value=False
)
supplied_baseline_constraints_data_bias = ParameterString(
    name="DataBiasSuppliedBaselineConstraints", default_value=""
)

# for model quality check step
skip_check_model_quality = ParameterBoolean(name="SkipModelQualityCheck", default_value=False)
register_new_baseline_model_quality = ParameterBoolean(
    name="RegisterNewModelQualityBaseline", default_value=False
)
supplied_baseline_statistics_model_quality = ParameterString(
    name="ModelQualitySuppliedStatistics", default_value=""
)
supplied_baseline_constraints_model_quality = ParameterString(
    name="ModelQualitySuppliedConstraints", default_value=""
)

# for model bias check step
skip_check_model_bias = ParameterBoolean(name="SkipModelBiasCheck", default_value=False)
register_new_baseline_model_bias = ParameterBoolean(
    name="RegisterNewModelBiasBaseline", default_value=False
)
supplied_baseline_constraints_model_bias = ParameterString(
    name="ModelBiasSuppliedBaselineConstraints", default_value=""
)

# for model explainability check step
skip_check_model_explainability = ParameterBoolean(
    name="SkipModelExplainabilityCheck", default_value=False
)
register_new_baseline_model_explainability = ParameterBoolean(
    name="RegisterNewModelExplainabilityBaseline", default_value=False
)
supplied_baseline_constraints_model_explainability = ParameterString(
    name="ModelExplainabilitySuppliedBaselineConstraints", default_value=""
)

### Processing step for feature engineering

In [None]:
!mkdir -p code

In [None]:
%%writefile code/preprocess.py

"""Feature engineers the abalone dataset."""
import argparse
import logging
import os
import pathlib
import requests
import tempfile

import boto3
import numpy as np
import pandas as pd

from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder

logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())


# Since we get a headerless CSV file we specify the column names here.
feature_columns_names = [
    "sex",
    "length",
    "diameter",
    "height",
    "whole_weight",
    "shucked_weight",
    "viscera_weight",
    "shell_weight",
]
label_column = "rings"

feature_columns_dtype = {
    "sex": str,
    "length": np.float64,
    "diameter": np.float64,
    "height": np.float64,
    "whole_weight": np.float64,
    "shucked_weight": np.float64,
    "viscera_weight": np.float64,
    "shell_weight": np.float64,
}
label_column_dtype = {"rings": np.float64}


def merge_two_dicts(x, y):
    """Merges two dicts, returning a new copy."""
    z = x.copy()
    z.update(y)
    return z


if __name__ == "__main__":
    logger.debug("Starting preprocessing.")
    parser = argparse.ArgumentParser()
    parser.add_argument("--input-data", type=str, required=True)
    args = parser.parse_args()

    base_dir = "/opt/ml/processing"
    pathlib.Path(f"{base_dir}/data").mkdir(parents=True, exist_ok=True)
    input_data = args.input_data
    bucket = input_data.split("/")[2]
    key = "/".join(input_data.split("/")[3:])

    logger.info("Downloading data from bucket: %s, key: %s", bucket, key)
    fn = f"{base_dir}/data/abalone-dataset.csv"
    s3 = boto3.resource("s3")
    s3.Bucket(bucket).download_file(key, fn)

    logger.debug("Reading downloaded data.")
    df = pd.read_csv(
        fn,
        header=None,
        names=feature_columns_names + [label_column],
        dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype),
    )
    os.unlink(fn)

    logger.debug("Defining transformers.")
    numeric_features = list(feature_columns_names)
    numeric_features.remove("sex")
    numeric_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="median")),
            ("scaler", StandardScaler()),
        ]
    )

    categorical_features = ["sex"]
    categorical_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="constant", fill_value="missing")),
            ("onehot", OneHotEncoder(handle_unknown="ignore")),
        ]
    )

    preprocess = ColumnTransformer(
        transformers=[
            ("num", numeric_transformer, numeric_features),
            ("cat", categorical_transformer, categorical_features),
        ]
    )

    logger.info("Applying transforms.")
    y = df.pop("rings")
    X_pre = preprocess.fit_transform(df)
    y_pre = y.to_numpy().reshape(len(y), 1)

    X = np.concatenate((y_pre, X_pre), axis=1)

    logger.info("Splitting %d rows of data into train, validation, test datasets.", len(X))
    np.random.shuffle(X)
    train, validation, test = np.split(X, [int(0.7 * len(X)), int(0.85 * len(X))])

    logger.info("Writing out datasets to %s.", base_dir)
    pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(
        f"{base_dir}/validation/validation.csv", header=False, index=False
    )
    pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

In [None]:
sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name=f"{base_job_prefix}/sklearn-abalone-preprocess",
    sagemaker_session=pipeline_session,
    role=role,
)
processor_args = sklearn_processor.run(
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code="code/preprocess.py",
    arguments=["--input-data", input_data],
)
step_process = ProcessingStep(name="PreprocessAbaloneData", step_args=processor_args)

### Calculating the Data Quality

`CheckJobConfig` is a helper function that's used to define the job configurations used by the `QualityCheckStep`. By separating the job configuration from the step parameters, the same `CheckJobConfig` can be used across multiple steps for quality checks.

The `DataQualityCheckConfig` is used to define the Quality Check job by specifying the dataset used to calculate the baseline, in this case, the training dataset from the data processing step, the dataset format, in this case, a csv file with no headers, and the output path for the results of the data quality check.

In [None]:
check_job_config = CheckJobConfig(
    role=role,
    instance_count=1,
    instance_type="ml.c5.xlarge",
    volume_size_in_gb=120,
    sagemaker_session=sagemaker_session,
)

data_quality_check_config = DataQualityCheckConfig(
    baseline_dataset=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
    dataset_format=DatasetFormat.csv(header=False, output_columns_position="START"),
    output_s3_uri=Join(
        on="/",
        values=[
            "s3:/",
            default_bucket,
            base_job_prefix,
            ExecutionVariables.PIPELINE_EXECUTION_ID,
            "dataqualitycheckstep",
        ],
    ),
)

data_quality_check_step = QualityCheckStep(
    name="DataQualityCheckStep",
    skip_check=skip_check_data_quality,
    register_new_baseline=register_new_baseline_data_quality,
    quality_check_config=data_quality_check_config,
    check_job_config=check_job_config,
    supplied_baseline_statistics=supplied_baseline_statistics_data_quality,
    supplied_baseline_constraints=supplied_baseline_constraints_data_quality,
    model_package_group_name=model_package_group_name,
)

### Calculating the Data Bias

The job configuration from the previous step is used here and the `DataConfig` class is used to define how the `ClarifyCheckStep` should compute the data bias. The training dataset is used again for the bias evaluation, the column representing the label is specified through the `label` parameter, and a `BiasConfig` is provided.

In the `BiasConfig`, we specify a facet name (the column that is the focal point of the bias calculation), the value of the facet that determines the range of values it can hold, and the threshold value for the label.

More details on `BiasConfig` can be found [here](https://sagemaker.readthedocs.io/en/stable/api/training/processing.html#sagemaker.clarify.BiasConfig).

In [None]:
data_bias_analysis_cfg_output_path = (
    f"s3://{default_bucket}/{base_job_prefix}/databiascheckstep/analysis_cfg"
)

data_bias_data_config = DataConfig(
    s3_data_input_path=step_process.properties.ProcessingOutputConfig.Outputs[
        "train"
    ].S3Output.S3Uri,
    s3_output_path=Join(
        on="/",
        values=[
            "s3:/",
            default_bucket,
            base_job_prefix,
            ExecutionVariables.PIPELINE_EXECUTION_ID,
            "databiascheckstep",
        ],
    ),
    label=0,
    dataset_type="text/csv",
    s3_analysis_config_output_path=data_bias_analysis_cfg_output_path,
)


data_bias_config = BiasConfig(
    label_values_or_threshold=[15.0], facet_name=[8], facet_values_or_threshold=[[0.5]]
)

data_bias_check_config = DataBiasCheckConfig(
    data_config=data_bias_data_config,
    data_bias_config=data_bias_config,
)

data_bias_check_step = ClarifyCheckStep(
    name="DataBiasCheckStep",
    clarify_check_config=data_bias_check_config,
    check_job_config=check_job_config,
    skip_check=skip_check_data_bias,
    register_new_baseline=register_new_baseline_data_bias,
    supplied_baseline_constraints=supplied_baseline_constraints_data_bias,
    model_package_group_name=model_package_group_name,
)

### Train an XGBoost Model

In [None]:
model_path = f"s3://{sagemaker_session.default_bucket()}/{base_job_prefix}/AbaloneTrain"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)

xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    base_job_name=f"{base_job_prefix}/abalone-train",
    sagemaker_session=pipeline_session,
    role=role,
)

xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
    silent=0,
)

train_args = xgb_train.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    },
)
step_train = TrainingStep(
    name="TrainAbaloneModel",
    step_args=train_args,
    depends_on=[data_bias_check_step.name, data_quality_check_step.name],
)

### Create the model

The model is created so that a batch transform job can be used to get predictions from the model on a test dataset. These predictions are used when calculating model quality, model bias, and model explainability.

In [None]:
model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)

step_create_model = ModelStep(
    name="AbaloneCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

### Transform Output

The output of the transform step combines the prediction and the input label. The output format is <br>
`prediction, original label`

In [None]:
transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    accept="text/csv",
    assemble_with="Line",
    output_path=f"s3://{default_bucket}/AbaloneTransform",
)

step_transform = TransformStep(
    name="AbaloneTransform",
    transformer=transformer,
    inputs=TransformInput(
        data=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
        input_filter="$[1:]",
        join_source="Input",
        output_filter="$[0,-1]",
        content_type="text/csv",
        split_type="Line",
    ),
)

### Check the Model Quality

In this `QualityCheckStep` we calculate the baselines for statistics and constraints using the predictions that the model generates from the test dataset (output from the TransformStep). We define the problem type as 'Regression' in the `ModelQualityCheckConfig` along with specifying the columns which represent the input and output. Since the dataset has no headers, `_c0`, `_c1` are auto-generated header names that should be used in the `ModelQualityCheckConfig`.

In [None]:
model_quality_check_config = ModelQualityCheckConfig(
    baseline_dataset=step_transform.properties.TransformOutput.S3OutputPath,
    dataset_format=DatasetFormat.csv(header=False),
    output_s3_uri=Join(
        on="/",
        values=[
            "s3:/",
            default_bucket,
            base_job_prefix,
            ExecutionVariables.PIPELINE_EXECUTION_ID,
            "modelqualitycheckstep",
        ],
    ),
    problem_type="Regression",
    inference_attribute="_c0",  # use auto-populated headers since we don't have headers in the dataset
    ground_truth_attribute="_c1",  # use auto-populated headers since we don't have headers in the dataset
)

model_quality_check_step = QualityCheckStep(
    name="ModelQualityCheckStep",
    skip_check=skip_check_model_quality,
    register_new_baseline=register_new_baseline_model_quality,
    quality_check_config=model_quality_check_config,
    check_job_config=check_job_config,
    supplied_baseline_statistics=supplied_baseline_statistics_model_quality,
    supplied_baseline_constraints=supplied_baseline_constraints_model_quality,
    model_package_group_name=model_package_group_name,
)

### Check for Model Bias

Similar to the Data Bias check step, a `BiasConfig` is defined and Clarify is used to calculate the model bias using the training dataset and the model.

In [None]:
model_bias_analysis_cfg_output_path = (
    f"s3://{default_bucket}/{base_job_prefix}/modelbiascheckstep/analysis_cfg"
)

model_bias_data_config = DataConfig(
    s3_data_input_path=step_process.properties.ProcessingOutputConfig.Outputs[
        "train"
    ].S3Output.S3Uri,
    s3_output_path=Join(
        on="/",
        values=[
            "s3:/",
            default_bucket,
            base_job_prefix,
            ExecutionVariables.PIPELINE_EXECUTION_ID,
            "modelbiascheckstep",
        ],
    ),
    s3_analysis_config_output_path=model_bias_analysis_cfg_output_path,
    label=0,
    dataset_type="text/csv",
)

model_config = ModelConfig(
    model_name=step_create_model.properties.ModelName,
    instance_count=1,
    instance_type="ml.m5.xlarge",
)

# We are using this bias config to configure Clarify to detect bias based on the first feature in the featurized vector for Sex
model_bias_config = BiasConfig(
    label_values_or_threshold=[15.0], facet_name=[8], facet_values_or_threshold=[[0.5]]
)

model_bias_check_config = ModelBiasCheckConfig(
    data_config=model_bias_data_config,
    data_bias_config=model_bias_config,
    model_config=model_config,
    model_predicted_label_config=ModelPredictedLabelConfig(),
)

model_bias_check_step = ClarifyCheckStep(
    name="ModelBiasCheckStep",
    clarify_check_config=model_bias_check_config,
    check_job_config=check_job_config,
    skip_check=skip_check_model_bias,
    register_new_baseline=register_new_baseline_model_bias,
    supplied_baseline_constraints=supplied_baseline_constraints_model_bias,
    model_package_group_name=model_package_group_name,
)

### Check Model Explainability

SageMaker Clarify uses a model-agnostic feature attribution approach, which you can use to understand why a model made a prediction after training and to provide per-instance explanation during inference. The implementation includes a scalable and efficient implementation of SHAP, based on the concept of a Shapley value from the field of cooperative game theory that assigns each feature an importance value for a particular prediction.

For Model Explainability, Clarify requires an explainability configuration to be provided. In this example, we use `SHAPConfig`. For more information of `explainability_config`, visit the [Clarify documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/clarify-model-explainability.html).

In [None]:
model_explainability_analysis_cfg_output_path = "s3://{}/{}/{}/{}".format(
    default_bucket, base_job_prefix, "modelexplainabilitycheckstep", "analysis_cfg"
)

model_explainability_data_config = DataConfig(
    s3_data_input_path=step_process.properties.ProcessingOutputConfig.Outputs[
        "train"
    ].S3Output.S3Uri,
    s3_output_path=Join(
        on="/",
        values=[
            "s3:/",
            default_bucket,
            base_job_prefix,
            ExecutionVariables.PIPELINE_EXECUTION_ID,
            "modelexplainabilitycheckstep",
        ],
    ),
    s3_analysis_config_output_path=model_explainability_analysis_cfg_output_path,
    label=0,
    dataset_type="text/csv",
)
shap_config = SHAPConfig(seed=123, num_samples=10)
model_explainability_check_config = ModelExplainabilityCheckConfig(
    data_config=model_explainability_data_config,
    model_config=model_config,
    explainability_config=shap_config,
)
model_explainability_check_step = ClarifyCheckStep(
    name="ModelExplainabilityCheckStep",
    clarify_check_config=model_explainability_check_config,
    check_job_config=check_job_config,
    skip_check=skip_check_model_explainability,
    register_new_baseline=register_new_baseline_model_explainability,
    supplied_baseline_constraints=supplied_baseline_constraints_model_explainability,
    model_package_group_name=model_package_group_name,
)

### Evaluate the performance of the model

Using a processing job, evaluate the performance of the model. The performance is used in the Condition Step to determine if the model should be registered or not.

In [None]:
%%writefile code/evaluate.py

"""Evaluation script for measuring mean squared error."""
import json
import logging
import pathlib
import pickle
import tarfile

import numpy as np
import pandas as pd
import xgboost

from sklearn.metrics import mean_squared_error

logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())


if __name__ == "__main__":
    logger.debug("Starting evaluation.")
    model_path = "/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")

    logger.debug("Loading xgboost model.")
    model = pickle.load(open("xgboost-model", "rb"))

    logger.debug("Reading test data.")
    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)

    logger.debug("Reading test data.")
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    X_test = xgboost.DMatrix(df.values)

    logger.info("Performing predictions against test data.")
    predictions = model.predict(X_test)

    logger.debug("Calculating mean squared error.")
    mse = mean_squared_error(y_test, predictions)
    std = np.std(y_test - predictions)
    report_dict = {
        "regression_metrics": {
            "mse": {"value": mse, "standard_deviation": std},
        },
    }

    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    logger.info("Writing out evaluation report with mse: %f", mse)
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

In [None]:
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name=f"{base_job_prefix}/script-abalone-eval",
    sagemaker_session=pipeline_session,
    role=role,
)
evaluation_report = PropertyFile(
    name="AbaloneEvaluationReport",
    output_name="evaluation",
    path="evaluation.json",
)

eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="code/evaluate.py",
)
step_eval = ProcessingStep(
    name="EvaluateAbaloneModel",
    step_args=eval_args,
    property_files=[evaluation_report],
)

### Define the metrics to be registered with the model in the Model Registry

In [None]:
model_metrics = ModelMetrics(
    model_data_statistics=MetricsSource(
        s3_uri=data_quality_check_step.properties.CalculatedBaselineStatistics,
        content_type="application/json",
    ),
    model_data_constraints=MetricsSource(
        s3_uri=data_quality_check_step.properties.CalculatedBaselineConstraints,
        content_type="application/json",
    ),
    bias_pre_training=MetricsSource(
        s3_uri=data_bias_check_step.properties.CalculatedBaselineConstraints,
        content_type="application/json",
    ),
    model_statistics=MetricsSource(
        s3_uri=model_quality_check_step.properties.CalculatedBaselineStatistics,
        content_type="application/json",
    ),
    model_constraints=MetricsSource(
        s3_uri=model_quality_check_step.properties.CalculatedBaselineConstraints,
        content_type="application/json",
    ),
    bias_post_training=MetricsSource(
        s3_uri=model_bias_check_step.properties.CalculatedBaselineConstraints,
        content_type="application/json",
    ),
    explainability=MetricsSource(
        s3_uri=model_explainability_check_step.properties.CalculatedBaselineConstraints,
        content_type="application/json",
    ),
)

drift_check_baselines = DriftCheckBaselines(
    model_data_statistics=MetricsSource(
        s3_uri=data_quality_check_step.properties.BaselineUsedForDriftCheckStatistics,
        content_type="application/json",
    ),
    model_data_constraints=MetricsSource(
        s3_uri=data_quality_check_step.properties.BaselineUsedForDriftCheckConstraints,
        content_type="application/json",
    ),
    bias_pre_training_constraints=MetricsSource(
        s3_uri=data_bias_check_step.properties.BaselineUsedForDriftCheckConstraints,
        content_type="application/json",
    ),
    bias_config_file=FileSource(
        s3_uri=model_bias_check_config.monitoring_analysis_config_uri,
        content_type="application/json",
    ),
    model_statistics=MetricsSource(
        s3_uri=model_quality_check_step.properties.BaselineUsedForDriftCheckStatistics,
        content_type="application/json",
    ),
    model_constraints=MetricsSource(
        s3_uri=model_quality_check_step.properties.BaselineUsedForDriftCheckConstraints,
        content_type="application/json",
    ),
    bias_post_training_constraints=MetricsSource(
        s3_uri=model_bias_check_step.properties.BaselineUsedForDriftCheckConstraints,
        content_type="application/json",
    ),
    explainability_constraints=MetricsSource(
        s3_uri=model_explainability_check_step.properties.BaselineUsedForDriftCheckConstraints,
        content_type="application/json",
    ),
    explainability_config_file=FileSource(
        s3_uri=model_explainability_check_config.monitoring_analysis_config_uri,
        content_type="application/json",
    ),
)

### Register the model

The two parameters in `RegisterModel` that hold the metrics calculated by the `ClarifyCheckStep` and `QualityCheckStep` are `model_metrics` and `drift_check_baselines`.

`drift_check_baselines` - these are the baseline files that will be used for drift checks in `QualityCheckStep` or `ClarifyCheckStep` and model monitoring jobs that are set up on endpoints hosting this model.

`model_metrics` - these should be the latest baselines calculated in the pipeline run. This can be set using the step property `CalculatedBaseline`

The intention behind these parameters is to give users a way to configure the baselines associated with a model so they can be used in drift checks or model monitoring jobs. Each time a pipeline is executed, users can choose to update the `drift_check_baselines` with newly calculated baselines. The `model_metrics` can be used to register the newly calculated baselines or any other metrics associated with the model.

Every time a baseline is calculated, it is not necessary that the baselines used for drift checks are updated to the newly calculated baselines. In some cases, users may retain an older version of the baseline file to be used for drift checks and not register new baselines that are calculated in the Pipeline run.

In [None]:
register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.large"],
    transform_instances=["ml.m5.large"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
    drift_check_baselines=drift_check_baselines,
)

step_register = ModelStep(name="RegisterAbaloneModel", step_args=register_args)

In [None]:
# condition step for evaluating model quality and branching execution
cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value",
    ),
    right=6.0,
)
step_cond = ConditionStep(
    name="CheckMSEAbaloneEvaluation",
    conditions=[cond_lte],
    if_steps=[step_register],
    else_steps=[],
)

### Create the Pipeline

In [None]:
# pipeline instance
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        training_instance_type,
        model_approval_status,
        input_data,
        skip_check_data_quality,
        register_new_baseline_data_quality,
        supplied_baseline_statistics_data_quality,
        supplied_baseline_constraints_data_quality,
        skip_check_data_bias,
        register_new_baseline_data_bias,
        supplied_baseline_constraints_data_bias,
        skip_check_model_quality,
        register_new_baseline_model_quality,
        supplied_baseline_statistics_model_quality,
        supplied_baseline_constraints_model_quality,
        skip_check_model_bias,
        register_new_baseline_model_bias,
        supplied_baseline_constraints_model_bias,
        skip_check_model_explainability,
        register_new_baseline_model_explainability,
        supplied_baseline_constraints_model_explainability,
    ],
    steps=[
        step_process,
        data_quality_check_step,
        data_bias_check_step,
        step_train,
        step_create_model,
        step_transform,
        model_quality_check_step,
        model_bias_check_step,
        model_explainability_check_step,
        step_eval,
        step_cond,
    ],
    sagemaker_session=pipeline_session,
)

### Get Pipeline definition

In [None]:
import json

definition = json.loads(pipeline.definition())
definition

In [None]:
pipeline.upsert(role_arn=role)

### First time executing

The first time the pipeline is run the parameters need to be overridden so that the checks are skipped and newly calculated baselines are registered

In [None]:
execution = pipeline.start(
    parameters=dict(
        SkipDataQualityCheck=True,
        RegisterNewDataQualityBaseline=True,
        SkipDataBiasCheck=True,
        RegisterNewDataBiasBaseline=True,
        SkipModelQualityCheck=True,
        RegisterNewModelQualityBaseline=True,
        SkipModelBiasCheck=True,
        RegisterNewModelBiasBaseline=True,
        SkipModelExplainabilityCheck=True,
        RegisterNewModelExplainabilityBaseline=True,
    )
)

### Wait for the pipeline execution to complete

In [None]:
execution.wait()

### Cleaning up resources

Users are responsible for cleaning up resources created when running this notebook. Specify the ModelName, ModelPackageName, and ModelPackageGroupName that need to be deleted. The model names are generated by the CreateModel step of the Pipeline and the property values are available only in the Pipeline context. To delete the models created by this pipeline, navigate to the Model Registry and Console to find the models to delete.


In [None]:
# Create a SageMaker client
# sm_client = boto3.client("sagemaker")

# # Delete SageMaker Models
# sm_client.delete_model(ModelName="...")

# # Delete Model Packages
# sm_client.delete_model_package(ModelPackageName="...")

# # Delete the Model Package Group
# sm_client.delete_model_package_group(ModelPackageGroupName="model-monitor-clarify-group")

# # Delete the Pipeline
# sm_client.delete_pipeline(PipelineName="model-monitor-clarify-pipeline")

## Notebook CI Test Results

This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.

![This us-east-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/us-east-1/sagemaker-pipelines|tabular|model-monitor-clarify-pipelines|sagemaker-pipeline-model-monitor-clarify-steps.ipynb)

![This us-east-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/us-east-2/sagemaker-pipelines|tabular|model-monitor-clarify-pipelines|sagemaker-pipeline-model-monitor-clarify-steps.ipynb)

![This us-west-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/us-west-1/sagemaker-pipelines|tabular|model-monitor-clarify-pipelines|sagemaker-pipeline-model-monitor-clarify-steps.ipynb)

![This ca-central-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ca-central-1/sagemaker-pipelines|tabular|model-monitor-clarify-pipelines|sagemaker-pipeline-model-monitor-clarify-steps.ipynb)

![This sa-east-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/sa-east-1/sagemaker-pipelines|tabular|model-monitor-clarify-pipelines|sagemaker-pipeline-model-monitor-clarify-steps.ipynb)

![This eu-west-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-west-1/sagemaker-pipelines|tabular|model-monitor-clarify-pipelines|sagemaker-pipeline-model-monitor-clarify-steps.ipynb)

![This eu-west-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-west-2/sagemaker-pipelines|tabular|model-monitor-clarify-pipelines|sagemaker-pipeline-model-monitor-clarify-steps.ipynb)

![This eu-west-3 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-west-3/sagemaker-pipelines|tabular|model-monitor-clarify-pipelines|sagemaker-pipeline-model-monitor-clarify-steps.ipynb)

![This eu-central-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-central-1/sagemaker-pipelines|tabular|model-monitor-clarify-pipelines|sagemaker-pipeline-model-monitor-clarify-steps.ipynb)

![This eu-north-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/eu-north-1/sagemaker-pipelines|tabular|model-monitor-clarify-pipelines|sagemaker-pipeline-model-monitor-clarify-steps.ipynb)

![This ap-southeast-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-southeast-1/sagemaker-pipelines|tabular|model-monitor-clarify-pipelines|sagemaker-pipeline-model-monitor-clarify-steps.ipynb)

![This ap-southeast-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-southeast-2/sagemaker-pipelines|tabular|model-monitor-clarify-pipelines|sagemaker-pipeline-model-monitor-clarify-steps.ipynb)

![This ap-northeast-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-northeast-1/sagemaker-pipelines|tabular|model-monitor-clarify-pipelines|sagemaker-pipeline-model-monitor-clarify-steps.ipynb)

![This ap-northeast-2 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-northeast-2/sagemaker-pipelines|tabular|model-monitor-clarify-pipelines|sagemaker-pipeline-model-monitor-clarify-steps.ipynb)

![This ap-south-1 badge failed to load. Check your device's internet connectivity, otherwise the service is currently unavailable](https://h75twx4l60.execute-api.us-west-2.amazonaws.com/sagemaker-nb/ap-south-1/sagemaker-pipelines|tabular|model-monitor-clarify-pipelines|sagemaker-pipeline-model-monitor-clarify-steps.ipynb)
