# Data Scientist - Model Building

This notebook demonstrates a sample of the activities and artifacts prepared by a Data Scientist to establish the Model Building pipeline.

***
*This notebook should work well with the Python 3 (Data Science) kernel in SageMaker Studio*
***

#### Environment setup
Import libraries, setup logging, and define few variables. 

In [None]:
import json
import logging
import shutil
from pathlib import Path

import sagemaker
from sagemaker.lambda_helper import Lambda

In [None]:
%load_ext autoreload
%autoreload 2

Set up a logger

In [None]:
logger = logging.getLogger("__name__")
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())

Define the local path where to store all the development artifact that will eventually be committed to the ModelBuilding Git repository.

In [None]:
model_building_path = Path("model_building")
model_building_path.mkdir(exist_ok=True)

In [None]:
model_group_dev_name = "mlops-demo-dev"

## Pipeline definition
We define the model building pipeline as the `get_pipeline()` function in a python script. We use SageMaker SDK to define the steps and define the DAG. 
After writing the file to the local storage, we will `import` the `get_pipeline()` function to create and test the Model Building pipeline. Thanks to the use of the `autoreload` extension, we can update the `get_pipeline()` definition by saving an updated version of `model_building/xgboost_pipeline.py`. 

This SageMaker Pipeline manages all the tasks necessary for model training:
- data extraction from Feature Store
- data processing specific to Model Training, e.g., joining datasets, train/test split
- train the model
- evaluate initial dataset bias across features
- record a baseline of the distribution of the training dataset for Model Monitor use
- evaluate model performance on the `test` dataset, creating a baseline to monitor the model performance
- Register the model if the model performance metric (in this case, AUC) against a set threshold

To compare the model performance against a set threshold, we use a `LambdaStep` to parse the report generated by the `ModelQualityStep`. The code for the lambda is described below.

In [None]:
model_building_pipelines_path = model_building_path / "pipelines"
model_building_pipelines_path.mkdir(exist_ok=True, parents=True)

In [None]:
%%writefile {model_building_pipelines_path}/xgboost_pipeline.py
from typing import Dict

import sagemaker
from sagemaker.clarify import BiasConfig, DataConfig
from sagemaker.dataset_definition.inputs import (
 AthenaDatasetDefinition,
 DatasetDefinition,
)
from sagemaker.drift_check_baselines import DriftCheckBaselines
from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker.inputs import TransformInput
from sagemaker.lambda_helper import Lambda
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.model_monitor.dataset_format import DatasetFormat
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.check_job_config import CheckJobConfig
from sagemaker.workflow.clarify_check_step import ClarifyCheckStep, DataBiasCheckConfig
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.execution_variables import ExecutionVariables
from sagemaker.workflow.functions import Join
from sagemaker.workflow.lambda_step import (
 LambdaOutput,
 LambdaOutputTypeEnum,
 LambdaStep,
)
from sagemaker.workflow.parameters import (
 ParameterFloat,
 ParameterInteger,
 ParameterString,
)
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.quality_check_step import (
 DataQualityCheckConfig,
 ModelQualityCheckConfig,
 QualityCheckStep,
)
from sagemaker.workflow.step_collections import EstimatorTransformer, RegisterModel
from sagemaker.workflow.steps import CacheConfig, ProcessingStep, Step, TrainingStep
from sagemaker.xgboost.estimator import XGBoost


def get_pipeline(
 role: str,
 pipeline_name: str,
 sagemaker_session: sagemaker.Session = None,
 **kwargs,
) -> Pipeline:
 cache_config = CacheConfig(enable_caching=True, expire_after="PT1H")
 create_dataset_instance_count = 1
 transformer_instance_count = 1

 default_bucket = sagemaker_session.default_bucket()
 prefix = kwargs["prefix"]
 model_package_group_name = kwargs["model_package_group_name"]

 # Pipeline parameters
 train_instance_count = ParameterInteger(
 name="TrainingInstanceCount",
 default_value=1,
 )
 train_instance_type = ParameterString(
 name="TrainingInstance",
 default_value="ml.m4.xlarge",
 )

 baseline_instance_type = ParameterString(
 name="BaselineInstanceType",
 default_value="ml.c5.xlarge",
 )
 baseline_instance_count = ParameterInteger(
 name="BaselineInstanceCount",
 default_value=1,
 )
 model_threshold_auc = ParameterString(
 name="ModelMinAcceptableAUC",
 default_value="0.75",
 )
 model_approval_status = ParameterString(
 name="ModelApprovalStatus",
 default_value="PendingManualApproval",
 enum_values=[
 "PendingManualApproval",
 "Approved",
 ],
 )

 check_job_config = CheckJobConfig(
 role=role,
 instance_count=baseline_instance_count,
 instance_type=baseline_instance_type,
 volume_size_in_gb=120,
 sagemaker_session=sagemaker_session,
 )

 ##### Create Dataset
 create_dataset_step = get_dataset_step(
 role=role,
 sagemaker_session=sagemaker_session,
 instance_count=create_dataset_instance_count,
 cache_config=cache_config,
 **kwargs,
 )

 #### Data Quality Baseline
 data_quality_baseline_step = get_data_quality_step(
 role=role,
 sagemaker_session=sagemaker_session,
 dataset_uri=create_dataset_step.properties.ProcessingOutputConfig.Outputs[
 "baseline"
 ].S3Output.S3Uri,
 check_job_config=check_job_config,
 cache_config=cache_config,
 **kwargs,
 )

 # Model training step
 training_step = get_model_training_step(
 role=role,
 sagemaker_session=sagemaker_session,
 dataset_uri=create_dataset_step.properties.ProcessingOutputConfig.Outputs[
 "train_data"
 ].S3Output.S3Uri,
 instance_count=train_instance_count,
 instance_type=train_instance_type,
 cache_config=cache_config,
 **kwargs,
 )

 transformer = EstimatorTransformer(
 name="TestScoring-",
 estimator=training_step.estimator,
 model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
 model_inputs=None,
 instance_type=train_instance_type,
 instance_count=transformer_instance_count,
 transform_inputs=TransformInput(
 data=create_dataset_step.properties.ProcessingOutputConfig.Outputs[
 "test_data"
 ].S3Output.S3Uri,
 content_type="text/csv",
 data_type="S3Prefix",
 split_type="Line",
 input_filter="$[1:]",
 output_filter="$[0, -1]",
 join_source="Input",
 ),
 accept="text/csv",
 assemble_with="Line",
 description="Scoring of test dataset",
 output_path=Join(
 on="/",
 values=[
 "s3:/",
 default_bucket,
 prefix,
 ExecutionVariables.PIPELINE_EXECUTION_ID,
 "test_step",
 "output",
 ],
 ),
 )

 ### Model Quality Baseline
 model_quality_baseline_step = get_model_quality_step(
 role=role,
 sagemaker_session=sagemaker_session,
 dataset_uri=transformer.steps[-1].properties.TransformOutput.S3OutputPath,
 check_job_config=check_job_config,
 **kwargs,
 )

 ### Data bias analysis
 bias_step = get_data_bias_step(
 role=role,
 sagemaker_session=sagemaker_session,
 dataset_uri=create_dataset_step.properties.ProcessingOutputConfig.Outputs[
 "train_data"
 ].S3Output.S3Uri,
 check_job_config=check_job_config,
 **kwargs,
 )

 model_metrics = ModelMetrics(
 model_data_statistics=MetricsSource(
 s3_uri=data_quality_baseline_step.properties.CalculatedBaselineStatistics,
 content_type="application/json",
 ),
 model_data_constraints=MetricsSource(
 s3_uri=data_quality_baseline_step.properties.CalculatedBaselineConstraints,
 content_type="application/json",
 ),
 bias_pre_training=MetricsSource(
 s3_uri=bias_step.properties.CalculatedBaselineConstraints,
 content_type="application/json",
 ),
 model_statistics=MetricsSource(
 s3_uri=model_quality_baseline_step.properties.CalculatedBaselineStatistics,
 content_type="application/json",
 ),
 model_constraints=MetricsSource(
 s3_uri=model_quality_baseline_step.properties.CalculatedBaselineConstraints,
 content_type="application/json",
 ),
 bias=MetricsSource(
 s3_uri=bias_step.properties.CalculatedBaselineConstraints,
 content_type="application/json",
 ),
 )

 drift_check_baselines = DriftCheckBaselines(
 model_data_statistics=MetricsSource(
 s3_uri=data_quality_baseline_step.properties.BaselineUsedForDriftCheckStatistics,
 content_type="application/json",
 ),
 model_data_constraints=MetricsSource(
 s3_uri=data_quality_baseline_step.properties.BaselineUsedForDriftCheckConstraints,
 content_type="application/json",
 ),
 bias_pre_training_constraints=MetricsSource(
 s3_uri=bias_step.properties.BaselineUsedForDriftCheckConstraints,
 content_type="application/json",
 ),
 model_statistics=MetricsSource(
 s3_uri=model_quality_baseline_step.properties.BaselineUsedForDriftCheckStatistics,
 content_type="application/json",
 ),
 model_constraints=MetricsSource(
 s3_uri=model_quality_baseline_step.properties.BaselineUsedForDriftCheckConstraints,
 content_type="application/json",
 ),
 )

 register_step = RegisterModel(
 name="RegisterModel",
 estimator=training_step.estimator,
 model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
 content_types=["text/csv"],
 response_types=["text/csv"],
 inference_instances=["ml.t2.medium", "ml.t2.large", "ml.m5.large"],
 transform_instances=["ml.m5.xlarge"],
 model_package_group_name=model_package_group_name,
 approval_status=model_approval_status,
 model_metrics=model_metrics,
 drift_check_baselines=drift_check_baselines,
 description="Binary classification model based on XGBoost",
 )

 lambda_step = get_lambda_step(
 sagemaker_session=sagemaker_session,
 function_arn=kwargs["metric_extraction_lambda_arn"],
 model_quality_report_uri=model_quality_baseline_step.properties.CalculatedBaselineStatistics,
 metric_name="auc",
 )

 cond_lte = ConditionGreaterThanOrEqualTo(
 left=lambda_step.properties.Outputs["metric_value"],
 right=model_threshold_auc,
 )

 step_cond = ConditionStep(
 name="CheckAUC",
 conditions=[cond_lte],
 if_steps=[register_step],
 else_steps=[],
 )

 # pipeline instance
 pipeline = Pipeline(
 name=pipeline_name,
 parameters=[
 baseline_instance_type,
 baseline_instance_count,
 train_instance_type,
 train_instance_count,
 model_approval_status,
 model_threshold_auc,
 ],
 steps=[
 create_dataset_step,
 bias_step,
 data_quality_baseline_step,
 training_step,
 transformer,
 model_quality_baseline_step,
 lambda_step,
 step_cond,
 ],
 sagemaker_session=sagemaker_session,
 )

 return pipeline


def get_model_training_step(
 role: str,
 sagemaker_session: sagemaker.Session,
 dataset_uri: str,
 instance_count: int,
 instance_type: str,
 cache_config: CacheConfig = None,
 **kwargs,
):
 default_bucket = sagemaker_session.default_bucket()
 prefix = kwargs["prefix"]
 model_entry_point = kwargs["model_training_script_path"]

 metric_uri = f"{prefix}/training_jobs/metrics_output/metrics.json"
 hyperparameters = {
 "max_depth": "3",
 "eta": "0.2",
 "objective": "binary:logistic",
 "num_round": "100",
 "bucket": f"{default_bucket}",
 "object": f"{metric_uri}",
 }
 estimator = XGBoost(
 entry_point=model_entry_point,
 hyperparameters=hyperparameters,
 role=role,
 instance_count=instance_count,
 instance_type=instance_type,
 framework_version="1.0-1",
 sagemaker_session=sagemaker_session,
 )

 train_step = TrainingStep(
 name="ModelTraining",
 estimator=estimator,
 inputs={"train": sagemaker.inputs.TrainingInput(s3_data=dataset_uri)},
 cache_config=cache_config,
 )

 return train_step


def get_lambda_step(
 sagemaker_session: sagemaker.Session,
 function_arn: str,
 model_quality_report_uri: str,
 metric_name: str,
) -> Step:
 output_param_1 = LambdaOutput(
 output_name="statusCode",
 output_type=LambdaOutputTypeEnum.String,
 )
 output_param_2 = LambdaOutput(
 output_name="body",
 output_type=LambdaOutputTypeEnum.String,
 )
 output_param_3 = LambdaOutput(
 output_name="metric_value",
 output_type=LambdaOutputTypeEnum.String,
 )

 step = LambdaStep(
 name="LambdaExtractMetrics",
 lambda_func=Lambda(function_arn=function_arn, session=sagemaker_session),
 inputs={
 "model_quality_report_uri": model_quality_report_uri,
 "metric_name": metric_name,
 },
 outputs=[output_param_1, output_param_2, output_param_3],
 )
 return step


def get_data_bias_step(
 sagemaker_session: sagemaker.Session,
 dataset_uri: str,
 check_job_config: CheckJobConfig,
 cache_config: CacheConfig = None,
 **kwargs,
) -> Step:
 prefix = kwargs["prefix"]
 default_bucket = sagemaker_session.default_bucket()
 label_name = kwargs["label_name"]

 data_bias_analysis_cfg_output_path = (
 f"s3://{default_bucket}/{prefix}/databiascheckstep/analysis_cfg"
 )

 data_bias_data_config = DataConfig(
 s3_data_input_path=dataset_uri,
 s3_output_path=Join(
 on="/",
 values=[
 "s3:/",
 default_bucket,
 prefix,
 ExecutionVariables.PIPELINE_EXECUTION_ID,
 "databiascheckstep",
 ],
 ),
 label=label_name,
 dataset_type="text/csv",
 s3_analysis_config_output_path=data_bias_analysis_cfg_output_path,
 )

 data_bias_config = BiasConfig(
 label_values_or_threshold=[0],
 facet_name="customer_gender_female",
 facet_values_or_threshold=[1],
 )

 data_bias_check_config = DataBiasCheckConfig(
 data_config=data_bias_data_config,
 data_bias_config=data_bias_config,
 )

 data_bias_check_step = ClarifyCheckStep(
 name="DataBiasCheckStep",
 skip_check=True,
 clarify_check_config=data_bias_check_config,
 check_job_config=check_job_config,
 register_new_baseline=True,
 cache_config=cache_config,
 )
 return data_bias_check_step


def get_model_quality_step(
 sagemaker_session: sagemaker.Session,
 dataset_uri: str,
 check_job_config: CheckJobConfig,
 cache_config: CacheConfig = None,
 **kwargs,
) -> Step:

 prefix = kwargs["prefix"]
 default_bucket = sagemaker_session.default_bucket()

 model_quality_check_config = ModelQualityCheckConfig(
 baseline_dataset=dataset_uri,
 dataset_format=DatasetFormat.csv(header=False),
 output_s3_uri=Join(
 on="/",
 values=[
 "s3:/",
 default_bucket,
 prefix,
 ExecutionVariables.PIPELINE_EXECUTION_ID,
 "modelqualitycheckstep",
 ],
 ),
 problem_type="BinaryClassification",
 probability_attribute="_c1",
 ground_truth_attribute="_c0",
 probability_threshold_attribute=".1",
 )

 model_quality_check_step = QualityCheckStep(
 name="ModelQualityCheckStep",
 skip_check=True,
 register_new_baseline=True,
 quality_check_config=model_quality_check_config,
 check_job_config=check_job_config,
 cache_config=cache_config,
 )
 return model_quality_check_step


def get_data_quality_step(
 sagemaker_session: sagemaker.Session,
 dataset_uri: str,
 check_job_config: CheckJobConfig,
 cache_config: CacheConfig = None,
 **kwargs,
) -> Step:

 prefix = kwargs["prefix"]
 default_bucket = sagemaker_session.default_bucket()

 data_quality_check_config = DataQualityCheckConfig(
 baseline_dataset=dataset_uri,
 dataset_format=DatasetFormat.csv(header=True, output_columns_position="START"),
 output_s3_uri=Join(
 on="/",
 values=[
 "s3:/",
 default_bucket,
 prefix,
 ExecutionVariables.PIPELINE_EXECUTION_ID,
 "dataqualitycheckstep",
 ],
 ),
 )

 data_quality_check_step = QualityCheckStep(
 name="DataQualityCheckStep",
 skip_check=True,
 register_new_baseline=True,
 quality_check_config=data_quality_check_config,
 check_job_config=check_job_config,
 cache_config=cache_config,
 )

 return data_quality_check_step


def get_dataset_step(
 role: str,
 sagemaker_session: sagemaker.Session,
 instance_count: int = 1,
 cache_config: CacheConfig = None,
 **kwargs,
) -> Step:
 default_bucket = sagemaker_session.default_bucket()
 script_path = kwargs["create_dataset_script_path"]
 prefix = kwargs["prefix"]

 athena_data_path = "/opt/ml/processing/athena"

 # Create dataset step
 create_dataset_processor = SKLearnProcessor(
 framework_version="0.23-1",
 role=role,
 instance_type="ml.m5.xlarge",
 instance_count=instance_count,
 base_job_name=f"{prefix}/create-dataset",
 sagemaker_session=sagemaker_session,
 )

 data_sources = [
 ProcessingInput(
 input_name="athena_dataset",
 dataset_definition=DatasetDefinition(
 local_path=athena_data_path,
 data_distribution_type="FullyReplicated",
 athena_dataset_definition=AthenaDatasetDefinition(
 **generate_query(kwargs, sagemaker_session=sagemaker_session),
 output_s3_uri=Join(
 on="/",
 values=[
 "s3:/",
 default_bucket,
 prefix,
 ExecutionVariables.PIPELINE_EXECUTION_ID,
 "raw_dataset",
 ],
 ),
 output_format="PARQUET",
 ),
 ),
 )
 ]

 step = ProcessingStep(
 name="CreateDataset",
 processor=create_dataset_processor,
 cache_config=cache_config,
 inputs=data_sources,
 outputs=[
 ProcessingOutput(
 output_name="train_data",
 source="/opt/ml/processing/output/train",
 destination=Join(
 on="/",
 values=[
 "s3:/",
 default_bucket,
 prefix,
 ExecutionVariables.PIPELINE_EXECUTION_ID,
 "train_dataset",
 ],
 ),
 ),
 ProcessingOutput(
 output_name="test_data",
 source="/opt/ml/processing/output/test",
 destination=Join(
 on="/",
 values=[
 "s3:/",
 default_bucket,
 prefix,
 ExecutionVariables.PIPELINE_EXECUTION_ID,
 "test_dataset",
 ],
 ),
 ),
 ProcessingOutput(
 output_name="baseline",
 source="/opt/ml/processing/output/baseline",
 destination=Join(
 on="/",
 values=[
 "s3:/",
 default_bucket,
 prefix,
 ExecutionVariables.PIPELINE_EXECUTION_ID,
 "baseline_dataset",
 ],
 ),
 ),
 ],
 job_arguments=[
 "--athena-data",
 athena_data_path,
 ],
 code=script_path,
 )
 return step


def generate_query(dataset_dict: Dict, sagemaker_session: sagemaker.Session):
 customer_fg_info = get_fg_info(
 dataset_dict["customers_fg_name"],
 sagemaker_session=sagemaker_session,
 )
 claims_fg_info = get_fg_info(
 dataset_dict["claims_fg_name"],
 sagemaker_session=sagemaker_session,
 )

 label_name = dataset_dict["label_name"]
 features_names = dataset_dict["features_names"]
 training_columns = [label_name] + features_names
 training_columns_string = ", ".join(f'"{c}"' for c in training_columns)

 query_string = f"""SELECT DISTINCT {training_columns_string}
 FROM "{claims_fg_info.table_name}" claims LEFT JOIN "{customer_fg_info.table_name}" customers
 ON claims.policy_id = customers.policy_id
 """
 return dict(
 catalog=claims_fg_info.catalog,
 database=claims_fg_info.database,
 query_string=query_string,
 )


def get_fg_info(fg_name: str, sagemaker_session: sagemaker.Session):
 boto_session = sagemaker_session.boto_session
 featurestore_runtime = sagemaker_session.sagemaker_featurestore_runtime_client
 feature_store_session = sagemaker.Session(
 boto_session=boto_session,
 sagemaker_client=sagemaker_session.sagemaker_client,
 sagemaker_featurestore_runtime_client=featurestore_runtime,
 )
 fg = FeatureGroup(name=fg_name, sagemaker_session=feature_store_session)
 return fg.athena_query()


We can now import `get_pipeline()` from `model_building/xgboost_pipeline.py`. 

In [None]:
from model_building.pipelines.xgboost_pipeline import get_pipeline

#### Model Quality Check lambda
As mentioned previously, the pipeline we defined requires a lambda function to parse the Model Quality Report. Here we define the lambda source code and use `sagemaker.lambda_helper` to create the lambda for development.

In [None]:
lambda_code_path = model_building_path / "lambdas/extract_metrics/extract_metrics.py"
lambda_code_path.parent.mkdir(exist_ok=True, parents=True)

In [None]:
%%writefile {lambda_code_path}

"""
This Lambda parses the output of ModelQualityStep to extract the value of a specific metric
"""

import json
import boto3

sm_client = boto3.client("sagemaker")
s3 = boto3.resource('s3')

def lambda_handler(event, context):
 # model quality report URI
 model_quality_report_uri = event['model_quality_report_uri']
 metric_name = event['metric_name']
 
 
 o = s3.Object(*split_s3_path(model_quality_report_uri))
 retval = json.load(o.get()['Body'])
 
 metrics = json.load(o.get()['Body'])

 return {
 "statusCode": 200,
 "body": json.dumps(f"{metric_name} extracted"),
 "metric_value": json.dumps(metrics['binary_classification_metrics'][metric_name]['value'])
 }

def split_s3_path(s3_path):
 path_parts=s3_path.replace("s3://","").split("/")
 bucket=path_parts.pop(0)
 key="/".join(path_parts)
 return bucket, key


In [None]:
# Lambda helper class can be used to create the Lambda function
lambda_fn = Lambda(
 function_name="sagemaker_test_lambda",
 execution_role_arn=f"arn:aws:iam::{sagemaker.Session().account_id()}:role/service-role/AmazonSageMakerServiceCatalogProductsUseRole",
 script=lambda_code_path.as_posix(),
 handler=f"{lambda_code_path.stem}.lambda_handler",
 timeout=10,
 memory_size=512,
)

try:
 retval = lambda_fn.update()
except:
 retval = lambda_fn.create()
function_arn = retval["FunctionArn"]
print(json.dumps(retval, indent=2))

## Test the Pipeline
To test the pipeline we create a dictionary with all the arguments expected by `get_pipeline`. This dictionary will then serve as configuration for the operationalization of the Model Building pipeline.

The creation (and successful execution!) of the pipeline depends on the existence of the *Feature Groups* for `claims` and `customers` data. The code below points at the FGs created by the *FeatureIngestion* CI/CD pipeline. To use instead the FGs created using the [DataScientist-01-FeatureEng notebook](DataScientist-01-FeatureEng.ipynb), replace the `*_fg_name` appropriately. 

In [None]:
dataset_dict = {
 "create_dataset_script_path": "scripts/create_dataset.py",
 "customers_fg_name": "customers",
 "claims_fg_name": "claims",
 "label_name": "fraud",
 "features_names": [
 "incident_severity",
 "num_vehicles_involved",
 "num_injuries",
 "num_witnesses",
 "police_report_available",
 "injury_claim",
 "vehicle_claim",
 "total_claim_amount",
 "incident_month",
 "incident_day",
 "incident_dow",
 "incident_hour",
 "driver_relationship_self",
 "driver_relationship_na",
 "driver_relationship_spouse",
 "driver_relationship_child",
 "driver_relationship_other",
 "incident_type_collision",
 "incident_type_breakin",
 "incident_type_theft",
 "collision_type_front",
 "collision_type_rear",
 "collision_type_side",
 "collision_type_na",
 "authorities_contacted_police",
 "authorities_contacted_none",
 "authorities_contacted_fire",
 "authorities_contacted_ambulance",
 "customer_age",
 "customer_education",
 "months_as_customer",
 "policy_deductable",
 "policy_annual_premium",
 "policy_liability",
 "auto_year",
 "num_claims_past_year",
 "num_insurers_past_5_years",
 "customer_gender_male",
 "customer_gender_female",
 "policy_state_ca",
 "policy_state_wa",
 "policy_state_az",
 "policy_state_or",
 "policy_state_nv",
 "policy_state_id",
 ],
}

pipeline_kwargs = {
 "prefix": "mlops-demo",
 "model_package_group_name": model_group_dev_name,
 "model_training_script_path": "scripts/xgboost_starter_script.py",
 "metric_extraction_lambda_arn": function_arn,
 **dataset_dict,
}

model_training_pipeline = get_pipeline(
 role=sagemaker.get_execution_role(),
 pipeline_name="dev-mlops-demo-xgboost",
 sagemaker_session=sagemaker.Session(),
 **pipeline_kwargs
)

Let's check the pipeline definition

In [None]:
json.loads(model_training_pipeline.definition())

### Create the pipeline

In [None]:
payload = dict(
 role_arn=sagemaker.get_execution_role(), description="Training XGBoost model"
)
try:
 retval = model_training_pipeline.update(**payload)
except:
 retval = model_training_pipeline.create(**payload)
retval

### Start an execution

In [None]:
model_training_pipeline_execution = model_training_pipeline.start(
 execution_display_name="demo-run"
)

In [None]:
model_training_pipeline_execution.describe()

In [None]:
model_training_pipeline_execution.list_steps()

## Prepare artifacts for operationalization

The artifact necessary for the operationalization are:
- python script containing `get_pipeline()` functions, that returns a SageMaker Pipeline object
- any script required by any step of the Pipeline
- the source code of any lambda invoked by the Pipeline
- a `*.pipeline.json` configuration file

In [None]:
conf_path = model_building_path / "configurations"
conf_path.mkdir(exist_ok=True, parents=True)

In [None]:
pipeline_config = {
 "prefix": "xgboost_build",
 "model_package_group_name": "mlops-demo-fraud-classification",
 "model_training_script_path": "scripts/xgboost_starter_script.py",
 "metric_extraction_lambda_arn": "to be replaced", # <-- replaced by CI/CD
 **dataset_dict,
}
build_config = dict(
 pipeline_name="build-xgboost",
 code_file_path="pipelines/xgboost_pipeline.py",
 pipeline_configuration=pipeline_config,
 lambdas=[
 dict(
 arn_handler="metric_extraction_lambda_arn",
 function_name="extract_metrics",
 script="lambdas/extract_metrics/extract_metrics.py",
 handler="extract_metrics.lambda_handler",
 timeout=10,
 memory_size=128,
 runtime="python3.8",
 )
 ],
)
with (conf_path / "xgboost.pipeline.json").open("w") as f:
 json.dump(build_config, f, indent=2)

## Cleanup

Delete the model building pipeline

In [None]:
model_training_pipeline.delete()

Delete all model packages created by the development pipeline and the Model Package Group from the Model registry

In [None]:
sagemaker_session = sagemaker.Session()
sagemaker_client = sagemaker_session.sagemaker_client

In [None]:
[
 sagemaker_client.delete_model_package(ModelPackageName=k['ModelPackageArn'])
 for k
 in sagemaker_client.list_model_packages(ModelPackageGroupName=model_group_dev_name)['ModelPackageSummaryList']
]

sagemaker_client.delete_model_package_group(ModelPackageGroupName=model_group_dev_name)