{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Data Scientist - Model Building\n", "\n", "This notebook demonstrates a sample of the activities and artifacts prepared by a Data Scientist to establish the Model Building pipeline.\n", "\n", "***\n", "*This notebook should work well with the Python 3 (Data Science) kernel in SageMaker Studio*\n", "***" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Environment setup\n", "Import libraries, setup logging, and define few variables. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import json\n", "import logging\n", "import shutil\n", "from pathlib import Path\n", "\n", "import sagemaker\n", "from sagemaker.lambda_helper import Lambda" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%load_ext autoreload\n", "%autoreload 2" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Set up a logger" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "logger = logging.getLogger(\"__name__\")\n", "logger.setLevel(logging.INFO)\n", "logger.addHandler(logging.StreamHandler())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Define the local path where to store all the development artifact that will eventually be committed to the ModelBuilding Git repository." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model_building_path = Path(\"model_building\")\n", "model_building_path.mkdir(exist_ok=True)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model_group_dev_name = \"mlops-demo-dev\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Pipeline definition\n", "We define the model building pipeline as the `get_pipeline()` function in a python script. We use SageMaker SDK to define the steps and define the DAG. \n", "After writing the file to the local storage, we will `import` the `get_pipeline()` function to create and test the Model Building pipeline. Thanks to the use of the `autoreload` extension, we can update the `get_pipeline()` definition by saving an updated version of `model_building/xgboost_pipeline.py`. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This SageMaker Pipeline manages all the tasks necessary for model training:\n", "- data extraction from Feature Store\n", "- data processing specific to Model Training, e.g., joining datasets, train/test split\n", "- train the model\n", "- evaluate initial dataset bias across features\n", "- record a baseline of the distribution of the training dataset for Model Monitor use\n", "- evaluate model performance on the `test` dataset, creating a baseline to monitor the model performance\n", "- Register the model if the model performance metric (in this case, AUC) against a set threshold\n", "\n", "To compare the model performance against a set threshold, we use a `LambdaStep` to parse the report generated by the `ModelQualityStep`. The code for the lambda is described below." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model_building_pipelines_path = model_building_path / \"pipelines\"\n", "model_building_pipelines_path.mkdir(exist_ok=True, parents=True)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile {model_building_pipelines_path}/xgboost_pipeline.py\n", "from typing import Dict\n", "\n", "import sagemaker\n", "from sagemaker.clarify import BiasConfig, DataConfig\n", "from sagemaker.dataset_definition.inputs import (\n", " AthenaDatasetDefinition,\n", " DatasetDefinition,\n", ")\n", "from sagemaker.drift_check_baselines import DriftCheckBaselines\n", "from sagemaker.feature_store.feature_group import FeatureGroup\n", "from sagemaker.inputs import TransformInput\n", "from sagemaker.lambda_helper import Lambda\n", "from sagemaker.model_metrics import MetricsSource, ModelMetrics\n", "from sagemaker.model_monitor.dataset_format import DatasetFormat\n", "from sagemaker.processing import ProcessingInput, ProcessingOutput\n", "from sagemaker.sklearn.processing import SKLearnProcessor\n", "from sagemaker.workflow.check_job_config import CheckJobConfig\n", "from sagemaker.workflow.clarify_check_step import ClarifyCheckStep, DataBiasCheckConfig\n", "from sagemaker.workflow.condition_step import ConditionStep\n", "from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo\n", "from sagemaker.workflow.execution_variables import ExecutionVariables\n", "from sagemaker.workflow.functions import Join\n", "from sagemaker.workflow.lambda_step import (\n", " LambdaOutput,\n", " LambdaOutputTypeEnum,\n", " LambdaStep,\n", ")\n", "from sagemaker.workflow.parameters import (\n", " ParameterFloat,\n", " ParameterInteger,\n", " ParameterString,\n", ")\n", "from sagemaker.workflow.pipeline import Pipeline\n", "from sagemaker.workflow.quality_check_step import (\n", " DataQualityCheckConfig,\n", " ModelQualityCheckConfig,\n", " QualityCheckStep,\n", ")\n", "from sagemaker.workflow.step_collections import EstimatorTransformer, RegisterModel\n", "from sagemaker.workflow.steps import CacheConfig, ProcessingStep, Step, TrainingStep\n", "from sagemaker.xgboost.estimator import XGBoost\n", "\n", "\n", "def get_pipeline(\n", " role: str,\n", " pipeline_name: str,\n", " sagemaker_session: sagemaker.Session = None,\n", " **kwargs,\n", ") -> Pipeline:\n", " cache_config = CacheConfig(enable_caching=True, expire_after=\"PT1H\")\n", " create_dataset_instance_count = 1\n", " transformer_instance_count = 1\n", "\n", " default_bucket = sagemaker_session.default_bucket()\n", " prefix = kwargs[\"prefix\"]\n", " model_package_group_name = kwargs[\"model_package_group_name\"]\n", "\n", " # Pipeline parameters\n", " train_instance_count = ParameterInteger(\n", " name=\"TrainingInstanceCount\",\n", " default_value=1,\n", " )\n", " train_instance_type = ParameterString(\n", " name=\"TrainingInstance\",\n", " default_value=\"ml.m4.xlarge\",\n", " )\n", "\n", " baseline_instance_type = ParameterString(\n", " name=\"BaselineInstanceType\",\n", " default_value=\"ml.c5.xlarge\",\n", " )\n", " baseline_instance_count = ParameterInteger(\n", " name=\"BaselineInstanceCount\",\n", " default_value=1,\n", " )\n", " model_threshold_auc = ParameterString(\n", " name=\"ModelMinAcceptableAUC\",\n", " default_value=\"0.75\",\n", " )\n", " model_approval_status = ParameterString(\n", " name=\"ModelApprovalStatus\",\n", " default_value=\"PendingManualApproval\",\n", " enum_values=[\n", " \"PendingManualApproval\",\n", " \"Approved\",\n", " ],\n", " )\n", "\n", " check_job_config = CheckJobConfig(\n", " role=role,\n", " instance_count=baseline_instance_count,\n", " instance_type=baseline_instance_type,\n", " volume_size_in_gb=120,\n", " sagemaker_session=sagemaker_session,\n", " )\n", "\n", " ##### Create Dataset\n", " create_dataset_step = get_dataset_step(\n", " role=role,\n", " sagemaker_session=sagemaker_session,\n", " instance_count=create_dataset_instance_count,\n", " cache_config=cache_config,\n", " **kwargs,\n", " )\n", "\n", " #### Data Quality Baseline\n", " data_quality_baseline_step = get_data_quality_step(\n", " role=role,\n", " sagemaker_session=sagemaker_session,\n", " dataset_uri=create_dataset_step.properties.ProcessingOutputConfig.Outputs[\n", " \"baseline\"\n", " ].S3Output.S3Uri,\n", " check_job_config=check_job_config,\n", " cache_config=cache_config,\n", " **kwargs,\n", " )\n", "\n", " # Model training step\n", " training_step = get_model_training_step(\n", " role=role,\n", " sagemaker_session=sagemaker_session,\n", " dataset_uri=create_dataset_step.properties.ProcessingOutputConfig.Outputs[\n", " \"train_data\"\n", " ].S3Output.S3Uri,\n", " instance_count=train_instance_count,\n", " instance_type=train_instance_type,\n", " cache_config=cache_config,\n", " **kwargs,\n", " )\n", "\n", " transformer = EstimatorTransformer(\n", " name=\"TestScoring-\",\n", " estimator=training_step.estimator,\n", " model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,\n", " model_inputs=None,\n", " instance_type=train_instance_type,\n", " instance_count=transformer_instance_count,\n", " transform_inputs=TransformInput(\n", " data=create_dataset_step.properties.ProcessingOutputConfig.Outputs[\n", " \"test_data\"\n", " ].S3Output.S3Uri,\n", " content_type=\"text/csv\",\n", " data_type=\"S3Prefix\",\n", " split_type=\"Line\",\n", " input_filter=\"$[1:]\",\n", " output_filter=\"$[0, -1]\",\n", " join_source=\"Input\",\n", " ),\n", " accept=\"text/csv\",\n", " assemble_with=\"Line\",\n", " description=\"Scoring of test dataset\",\n", " output_path=Join(\n", " on=\"/\",\n", " values=[\n", " \"s3:/\",\n", " default_bucket,\n", " prefix,\n", " ExecutionVariables.PIPELINE_EXECUTION_ID,\n", " \"test_step\",\n", " \"output\",\n", " ],\n", " ),\n", " )\n", "\n", " ### Model Quality Baseline\n", " model_quality_baseline_step = get_model_quality_step(\n", " role=role,\n", " sagemaker_session=sagemaker_session,\n", " dataset_uri=transformer.steps[-1].properties.TransformOutput.S3OutputPath,\n", " check_job_config=check_job_config,\n", " **kwargs,\n", " )\n", "\n", " ### Data bias analysis\n", " bias_step = get_data_bias_step(\n", " role=role,\n", " sagemaker_session=sagemaker_session,\n", " dataset_uri=create_dataset_step.properties.ProcessingOutputConfig.Outputs[\n", " \"train_data\"\n", " ].S3Output.S3Uri,\n", " check_job_config=check_job_config,\n", " **kwargs,\n", " )\n", "\n", " model_metrics = ModelMetrics(\n", " model_data_statistics=MetricsSource(\n", " s3_uri=data_quality_baseline_step.properties.CalculatedBaselineStatistics,\n", " content_type=\"application/json\",\n", " ),\n", " model_data_constraints=MetricsSource(\n", " s3_uri=data_quality_baseline_step.properties.CalculatedBaselineConstraints,\n", " content_type=\"application/json\",\n", " ),\n", " bias_pre_training=MetricsSource(\n", " s3_uri=bias_step.properties.CalculatedBaselineConstraints,\n", " content_type=\"application/json\",\n", " ),\n", " model_statistics=MetricsSource(\n", " s3_uri=model_quality_baseline_step.properties.CalculatedBaselineStatistics,\n", " content_type=\"application/json\",\n", " ),\n", " model_constraints=MetricsSource(\n", " s3_uri=model_quality_baseline_step.properties.CalculatedBaselineConstraints,\n", " content_type=\"application/json\",\n", " ),\n", " bias=MetricsSource(\n", " s3_uri=bias_step.properties.CalculatedBaselineConstraints,\n", " content_type=\"application/json\",\n", " ),\n", " )\n", "\n", " drift_check_baselines = DriftCheckBaselines(\n", " model_data_statistics=MetricsSource(\n", " s3_uri=data_quality_baseline_step.properties.BaselineUsedForDriftCheckStatistics,\n", " content_type=\"application/json\",\n", " ),\n", " model_data_constraints=MetricsSource(\n", " s3_uri=data_quality_baseline_step.properties.BaselineUsedForDriftCheckConstraints,\n", " content_type=\"application/json\",\n", " ),\n", " bias_pre_training_constraints=MetricsSource(\n", " s3_uri=bias_step.properties.BaselineUsedForDriftCheckConstraints,\n", " content_type=\"application/json\",\n", " ),\n", " model_statistics=MetricsSource(\n", " s3_uri=model_quality_baseline_step.properties.BaselineUsedForDriftCheckStatistics,\n", " content_type=\"application/json\",\n", " ),\n", " model_constraints=MetricsSource(\n", " s3_uri=model_quality_baseline_step.properties.BaselineUsedForDriftCheckConstraints,\n", " content_type=\"application/json\",\n", " ),\n", " )\n", "\n", " register_step = RegisterModel(\n", " name=\"RegisterModel\",\n", " estimator=training_step.estimator,\n", " model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,\n", " content_types=[\"text/csv\"],\n", " response_types=[\"text/csv\"],\n", " inference_instances=[\"ml.t2.medium\", \"ml.t2.large\", \"ml.m5.large\"],\n", " transform_instances=[\"ml.m5.xlarge\"],\n", " model_package_group_name=model_package_group_name,\n", " approval_status=model_approval_status,\n", " model_metrics=model_metrics,\n", " drift_check_baselines=drift_check_baselines,\n", " description=\"Binary classification model based on XGBoost\",\n", " )\n", "\n", " lambda_step = get_lambda_step(\n", " sagemaker_session=sagemaker_session,\n", " function_arn=kwargs[\"metric_extraction_lambda_arn\"],\n", " model_quality_report_uri=model_quality_baseline_step.properties.CalculatedBaselineStatistics,\n", " metric_name=\"auc\",\n", " )\n", "\n", " cond_lte = ConditionGreaterThanOrEqualTo(\n", " left=lambda_step.properties.Outputs[\"metric_value\"],\n", " right=model_threshold_auc,\n", " )\n", "\n", " step_cond = ConditionStep(\n", " name=\"CheckAUC\",\n", " conditions=[cond_lte],\n", " if_steps=[register_step],\n", " else_steps=[],\n", " )\n", "\n", " # pipeline instance\n", " pipeline = Pipeline(\n", " name=pipeline_name,\n", " parameters=[\n", " baseline_instance_type,\n", " baseline_instance_count,\n", " train_instance_type,\n", " train_instance_count,\n", " model_approval_status,\n", " model_threshold_auc,\n", " ],\n", " steps=[\n", " create_dataset_step,\n", " bias_step,\n", " data_quality_baseline_step,\n", " training_step,\n", " transformer,\n", " model_quality_baseline_step,\n", " lambda_step,\n", " step_cond,\n", " ],\n", " sagemaker_session=sagemaker_session,\n", " )\n", "\n", " return pipeline\n", "\n", "\n", "def get_model_training_step(\n", " role: str,\n", " sagemaker_session: sagemaker.Session,\n", " dataset_uri: str,\n", " instance_count: int,\n", " instance_type: str,\n", " cache_config: CacheConfig = None,\n", " **kwargs,\n", "):\n", " default_bucket = sagemaker_session.default_bucket()\n", " prefix = kwargs[\"prefix\"]\n", " model_entry_point = kwargs[\"model_training_script_path\"]\n", "\n", " metric_uri = f\"{prefix}/training_jobs/metrics_output/metrics.json\"\n", " hyperparameters = {\n", " \"max_depth\": \"3\",\n", " \"eta\": \"0.2\",\n", " \"objective\": \"binary:logistic\",\n", " \"num_round\": \"100\",\n", " \"bucket\": f\"{default_bucket}\",\n", " \"object\": f\"{metric_uri}\",\n", " }\n", " estimator = XGBoost(\n", " entry_point=model_entry_point,\n", " hyperparameters=hyperparameters,\n", " role=role,\n", " instance_count=instance_count,\n", " instance_type=instance_type,\n", " framework_version=\"1.0-1\",\n", " sagemaker_session=sagemaker_session,\n", " )\n", "\n", " train_step = TrainingStep(\n", " name=\"ModelTraining\",\n", " estimator=estimator,\n", " inputs={\"train\": sagemaker.inputs.TrainingInput(s3_data=dataset_uri)},\n", " cache_config=cache_config,\n", " )\n", "\n", " return train_step\n", "\n", "\n", "def get_lambda_step(\n", " sagemaker_session: sagemaker.Session,\n", " function_arn: str,\n", " model_quality_report_uri: str,\n", " metric_name: str,\n", ") -> Step:\n", " output_param_1 = LambdaOutput(\n", " output_name=\"statusCode\",\n", " output_type=LambdaOutputTypeEnum.String,\n", " )\n", " output_param_2 = LambdaOutput(\n", " output_name=\"body\",\n", " output_type=LambdaOutputTypeEnum.String,\n", " )\n", " output_param_3 = LambdaOutput(\n", " output_name=\"metric_value\",\n", " output_type=LambdaOutputTypeEnum.String,\n", " )\n", "\n", " step = LambdaStep(\n", " name=\"LambdaExtractMetrics\",\n", " lambda_func=Lambda(function_arn=function_arn, session=sagemaker_session),\n", " inputs={\n", " \"model_quality_report_uri\": model_quality_report_uri,\n", " \"metric_name\": metric_name,\n", " },\n", " outputs=[output_param_1, output_param_2, output_param_3],\n", " )\n", " return step\n", "\n", "\n", "def get_data_bias_step(\n", " sagemaker_session: sagemaker.Session,\n", " dataset_uri: str,\n", " check_job_config: CheckJobConfig,\n", " cache_config: CacheConfig = None,\n", " **kwargs,\n", ") -> Step:\n", " prefix = kwargs[\"prefix\"]\n", " default_bucket = sagemaker_session.default_bucket()\n", " label_name = kwargs[\"label_name\"]\n", "\n", " data_bias_analysis_cfg_output_path = (\n", " f\"s3://{default_bucket}/{prefix}/databiascheckstep/analysis_cfg\"\n", " )\n", "\n", " data_bias_data_config = DataConfig(\n", " s3_data_input_path=dataset_uri,\n", " s3_output_path=Join(\n", " on=\"/\",\n", " values=[\n", " \"s3:/\",\n", " default_bucket,\n", " prefix,\n", " ExecutionVariables.PIPELINE_EXECUTION_ID,\n", " \"databiascheckstep\",\n", " ],\n", " ),\n", " label=label_name,\n", " dataset_type=\"text/csv\",\n", " s3_analysis_config_output_path=data_bias_analysis_cfg_output_path,\n", " )\n", "\n", " data_bias_config = BiasConfig(\n", " label_values_or_threshold=[0],\n", " facet_name=\"customer_gender_female\",\n", " facet_values_or_threshold=[1],\n", " )\n", "\n", " data_bias_check_config = DataBiasCheckConfig(\n", " data_config=data_bias_data_config,\n", " data_bias_config=data_bias_config,\n", " )\n", "\n", " data_bias_check_step = ClarifyCheckStep(\n", " name=\"DataBiasCheckStep\",\n", " skip_check=True,\n", " clarify_check_config=data_bias_check_config,\n", " check_job_config=check_job_config,\n", " register_new_baseline=True,\n", " cache_config=cache_config,\n", " )\n", " return data_bias_check_step\n", "\n", "\n", "def get_model_quality_step(\n", " sagemaker_session: sagemaker.Session,\n", " dataset_uri: str,\n", " check_job_config: CheckJobConfig,\n", " cache_config: CacheConfig = None,\n", " **kwargs,\n", ") -> Step:\n", "\n", " prefix = kwargs[\"prefix\"]\n", " default_bucket = sagemaker_session.default_bucket()\n", "\n", " model_quality_check_config = ModelQualityCheckConfig(\n", " baseline_dataset=dataset_uri,\n", " dataset_format=DatasetFormat.csv(header=False),\n", " output_s3_uri=Join(\n", " on=\"/\",\n", " values=[\n", " \"s3:/\",\n", " default_bucket,\n", " prefix,\n", " ExecutionVariables.PIPELINE_EXECUTION_ID,\n", " \"modelqualitycheckstep\",\n", " ],\n", " ),\n", " problem_type=\"BinaryClassification\",\n", " probability_attribute=\"_c1\",\n", " ground_truth_attribute=\"_c0\",\n", " probability_threshold_attribute=\".1\",\n", " )\n", "\n", " model_quality_check_step = QualityCheckStep(\n", " name=\"ModelQualityCheckStep\",\n", " skip_check=True,\n", " register_new_baseline=True,\n", " quality_check_config=model_quality_check_config,\n", " check_job_config=check_job_config,\n", " cache_config=cache_config,\n", " )\n", " return model_quality_check_step\n", "\n", "\n", "def get_data_quality_step(\n", " sagemaker_session: sagemaker.Session,\n", " dataset_uri: str,\n", " check_job_config: CheckJobConfig,\n", " cache_config: CacheConfig = None,\n", " **kwargs,\n", ") -> Step:\n", "\n", " prefix = kwargs[\"prefix\"]\n", " default_bucket = sagemaker_session.default_bucket()\n", "\n", " data_quality_check_config = DataQualityCheckConfig(\n", " baseline_dataset=dataset_uri,\n", " dataset_format=DatasetFormat.csv(header=True, output_columns_position=\"START\"),\n", " output_s3_uri=Join(\n", " on=\"/\",\n", " values=[\n", " \"s3:/\",\n", " default_bucket,\n", " prefix,\n", " ExecutionVariables.PIPELINE_EXECUTION_ID,\n", " \"dataqualitycheckstep\",\n", " ],\n", " ),\n", " )\n", "\n", " data_quality_check_step = QualityCheckStep(\n", " name=\"DataQualityCheckStep\",\n", " skip_check=True,\n", " register_new_baseline=True,\n", " quality_check_config=data_quality_check_config,\n", " check_job_config=check_job_config,\n", " cache_config=cache_config,\n", " )\n", "\n", " return data_quality_check_step\n", "\n", "\n", "def get_dataset_step(\n", " role: str,\n", " sagemaker_session: sagemaker.Session,\n", " instance_count: int = 1,\n", " cache_config: CacheConfig = None,\n", " **kwargs,\n", ") -> Step:\n", " default_bucket = sagemaker_session.default_bucket()\n", " script_path = kwargs[\"create_dataset_script_path\"]\n", " prefix = kwargs[\"prefix\"]\n", "\n", " athena_data_path = \"/opt/ml/processing/athena\"\n", "\n", " # Create dataset step\n", " create_dataset_processor = SKLearnProcessor(\n", " framework_version=\"0.23-1\",\n", " role=role,\n", " instance_type=\"ml.m5.xlarge\",\n", " instance_count=instance_count,\n", " base_job_name=f\"{prefix}/create-dataset\",\n", " sagemaker_session=sagemaker_session,\n", " )\n", "\n", " data_sources = [\n", " ProcessingInput(\n", " input_name=\"athena_dataset\",\n", " dataset_definition=DatasetDefinition(\n", " local_path=athena_data_path,\n", " data_distribution_type=\"FullyReplicated\",\n", " athena_dataset_definition=AthenaDatasetDefinition(\n", " **generate_query(kwargs, sagemaker_session=sagemaker_session),\n", " output_s3_uri=Join(\n", " on=\"/\",\n", " values=[\n", " \"s3:/\",\n", " default_bucket,\n", " prefix,\n", " ExecutionVariables.PIPELINE_EXECUTION_ID,\n", " \"raw_dataset\",\n", " ],\n", " ),\n", " output_format=\"PARQUET\",\n", " ),\n", " ),\n", " )\n", " ]\n", "\n", " step = ProcessingStep(\n", " name=\"CreateDataset\",\n", " processor=create_dataset_processor,\n", " cache_config=cache_config,\n", " inputs=data_sources,\n", " outputs=[\n", " ProcessingOutput(\n", " output_name=\"train_data\",\n", " source=\"/opt/ml/processing/output/train\",\n", " destination=Join(\n", " on=\"/\",\n", " values=[\n", " \"s3:/\",\n", " default_bucket,\n", " prefix,\n", " ExecutionVariables.PIPELINE_EXECUTION_ID,\n", " \"train_dataset\",\n", " ],\n", " ),\n", " ),\n", " ProcessingOutput(\n", " output_name=\"test_data\",\n", " source=\"/opt/ml/processing/output/test\",\n", " destination=Join(\n", " on=\"/\",\n", " values=[\n", " \"s3:/\",\n", " default_bucket,\n", " prefix,\n", " ExecutionVariables.PIPELINE_EXECUTION_ID,\n", " \"test_dataset\",\n", " ],\n", " ),\n", " ),\n", " ProcessingOutput(\n", " output_name=\"baseline\",\n", " source=\"/opt/ml/processing/output/baseline\",\n", " destination=Join(\n", " on=\"/\",\n", " values=[\n", " \"s3:/\",\n", " default_bucket,\n", " prefix,\n", " ExecutionVariables.PIPELINE_EXECUTION_ID,\n", " \"baseline_dataset\",\n", " ],\n", " ),\n", " ),\n", " ],\n", " job_arguments=[\n", " \"--athena-data\",\n", " athena_data_path,\n", " ],\n", " code=script_path,\n", " )\n", " return step\n", "\n", "\n", "def generate_query(dataset_dict: Dict, sagemaker_session: sagemaker.Session):\n", " customer_fg_info = get_fg_info(\n", " dataset_dict[\"customers_fg_name\"],\n", " sagemaker_session=sagemaker_session,\n", " )\n", " claims_fg_info = get_fg_info(\n", " dataset_dict[\"claims_fg_name\"],\n", " sagemaker_session=sagemaker_session,\n", " )\n", "\n", " label_name = dataset_dict[\"label_name\"]\n", " features_names = dataset_dict[\"features_names\"]\n", " training_columns = [label_name] + features_names\n", " training_columns_string = \", \".join(f'\"{c}\"' for c in training_columns)\n", "\n", " query_string = f\"\"\"SELECT DISTINCT {training_columns_string}\n", " FROM \"{claims_fg_info.table_name}\" claims LEFT JOIN \"{customer_fg_info.table_name}\" customers\n", " ON claims.policy_id = customers.policy_id\n", " \"\"\"\n", " return dict(\n", " catalog=claims_fg_info.catalog,\n", " database=claims_fg_info.database,\n", " query_string=query_string,\n", " )\n", "\n", "\n", "def get_fg_info(fg_name: str, sagemaker_session: sagemaker.Session):\n", " boto_session = sagemaker_session.boto_session\n", " featurestore_runtime = sagemaker_session.sagemaker_featurestore_runtime_client\n", " feature_store_session = sagemaker.Session(\n", " boto_session=boto_session,\n", " sagemaker_client=sagemaker_session.sagemaker_client,\n", " sagemaker_featurestore_runtime_client=featurestore_runtime,\n", " )\n", " fg = FeatureGroup(name=fg_name, sagemaker_session=feature_store_session)\n", " return fg.athena_query()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can now import `get_pipeline()` from `model_building/xgboost_pipeline.py`. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from model_building.pipelines.xgboost_pipeline import get_pipeline" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Model Quality Check lambda\n", "As mentioned previously, the pipeline we defined requires a lambda function to parse the Model Quality Report. Here we define the lambda source code and use `sagemaker.lambda_helper` to create the lambda for development." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "lambda_code_path = model_building_path / \"lambdas/extract_metrics/extract_metrics.py\"\n", "lambda_code_path.parent.mkdir(exist_ok=True, parents=True)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile {lambda_code_path}\n", "\n", "\"\"\"\n", "This Lambda parses the output of ModelQualityStep to extract the value of a specific metric\n", "\"\"\"\n", "\n", "import json\n", "import boto3\n", "\n", "sm_client = boto3.client(\"sagemaker\")\n", "s3 = boto3.resource('s3')\n", "\n", "def lambda_handler(event, context):\n", " # model quality report URI\n", " model_quality_report_uri = event['model_quality_report_uri']\n", " metric_name = event['metric_name']\n", " \n", " \n", " o = s3.Object(*split_s3_path(model_quality_report_uri))\n", " retval = json.load(o.get()['Body'])\n", " \n", " metrics = json.load(o.get()['Body'])\n", "\n", " return {\n", " \"statusCode\": 200,\n", " \"body\": json.dumps(f\"{metric_name} extracted\"),\n", " \"metric_value\": json.dumps(metrics['binary_classification_metrics'][metric_name]['value'])\n", " }\n", "\n", "def split_s3_path(s3_path):\n", " path_parts=s3_path.replace(\"s3://\",\"\").split(\"/\")\n", " bucket=path_parts.pop(0)\n", " key=\"/\".join(path_parts)\n", " return bucket, key\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Lambda helper class can be used to create the Lambda function\n", "lambda_fn = Lambda(\n", " function_name=\"sagemaker_test_lambda\",\n", " execution_role_arn=f\"arn:aws:iam::{sagemaker.Session().account_id()}:role/service-role/AmazonSageMakerServiceCatalogProductsUseRole\",\n", " script=lambda_code_path.as_posix(),\n", " handler=f\"{lambda_code_path.stem}.lambda_handler\",\n", " timeout=10,\n", " memory_size=512,\n", ")\n", "\n", "try:\n", " retval = lambda_fn.update()\n", "except:\n", " retval = lambda_fn.create()\n", "function_arn = retval[\"FunctionArn\"]\n", "print(json.dumps(retval, indent=2))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Test the Pipeline\n", "To test the pipeline we create a dictionary with all the arguments expected by `get_pipeline`. This dictionary will then serve as configuration for the operationalization of the Model Building pipeline.\n", "\n", "The creation (and successful execution!) of the pipeline depends on the existence of the *Feature Groups* for `claims` and `customers` data. The code below points at the FGs created by the *FeatureIngestion* CI/CD pipeline. To use instead the FGs created using the [DataScientist-01-FeatureEng notebook](DataScientist-01-FeatureEng.ipynb), replace the `*_fg_name` appropriately. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "dataset_dict = {\n", " \"create_dataset_script_path\": \"scripts/create_dataset.py\",\n", " \"customers_fg_name\": \"customers\",\n", " \"claims_fg_name\": \"claims\",\n", " \"label_name\": \"fraud\",\n", " \"features_names\": [\n", " \"incident_severity\",\n", " \"num_vehicles_involved\",\n", " \"num_injuries\",\n", " \"num_witnesses\",\n", " \"police_report_available\",\n", " \"injury_claim\",\n", " \"vehicle_claim\",\n", " \"total_claim_amount\",\n", " \"incident_month\",\n", " \"incident_day\",\n", " \"incident_dow\",\n", " \"incident_hour\",\n", " \"driver_relationship_self\",\n", " \"driver_relationship_na\",\n", " \"driver_relationship_spouse\",\n", " \"driver_relationship_child\",\n", " \"driver_relationship_other\",\n", " \"incident_type_collision\",\n", " \"incident_type_breakin\",\n", " \"incident_type_theft\",\n", " \"collision_type_front\",\n", " \"collision_type_rear\",\n", " \"collision_type_side\",\n", " \"collision_type_na\",\n", " \"authorities_contacted_police\",\n", " \"authorities_contacted_none\",\n", " \"authorities_contacted_fire\",\n", " \"authorities_contacted_ambulance\",\n", " \"customer_age\",\n", " \"customer_education\",\n", " \"months_as_customer\",\n", " \"policy_deductable\",\n", " \"policy_annual_premium\",\n", " \"policy_liability\",\n", " \"auto_year\",\n", " \"num_claims_past_year\",\n", " \"num_insurers_past_5_years\",\n", " \"customer_gender_male\",\n", " \"customer_gender_female\",\n", " \"policy_state_ca\",\n", " \"policy_state_wa\",\n", " \"policy_state_az\",\n", " \"policy_state_or\",\n", " \"policy_state_nv\",\n", " \"policy_state_id\",\n", " ],\n", "}\n", "\n", "pipeline_kwargs = {\n", " \"prefix\": \"mlops-demo\",\n", " \"model_package_group_name\": model_group_dev_name,\n", " \"model_training_script_path\": \"scripts/xgboost_starter_script.py\",\n", " \"metric_extraction_lambda_arn\": function_arn,\n", " **dataset_dict,\n", "}\n", "\n", "model_training_pipeline = get_pipeline(\n", " role=sagemaker.get_execution_role(),\n", " pipeline_name=\"dev-mlops-demo-xgboost\",\n", " sagemaker_session=sagemaker.Session(),\n", " **pipeline_kwargs\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's check the pipeline definition" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "json.loads(model_training_pipeline.definition())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create the pipeline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "payload = dict(\n", " role_arn=sagemaker.get_execution_role(), description=\"Training XGBoost model\"\n", ")\n", "try:\n", " retval = model_training_pipeline.update(**payload)\n", "except:\n", " retval = model_training_pipeline.create(**payload)\n", "retval" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Start an execution" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model_training_pipeline_execution = model_training_pipeline.start(\n", " execution_display_name=\"demo-run\"\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model_training_pipeline_execution.describe()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model_training_pipeline_execution.list_steps()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Prepare artifacts for operationalization" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The artifact necessary for the operationalization are:\n", "- python script containing `get_pipeline()` functions, that returns a SageMaker Pipeline object\n", "- any script required by any step of the Pipeline\n", "- the source code of any lambda invoked by the Pipeline\n", "- a `*.pipeline.json` configuration file" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "conf_path = model_building_path / \"configurations\"\n", "conf_path.mkdir(exist_ok=True, parents=True)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline_config = {\n", " \"prefix\": \"xgboost_build\",\n", " \"model_package_group_name\": \"mlops-demo-fraud-classification\",\n", " \"model_training_script_path\": \"scripts/xgboost_starter_script.py\",\n", " \"metric_extraction_lambda_arn\": \"to be replaced\", # <-- replaced by CI/CD\n", " **dataset_dict,\n", "}\n", "build_config = dict(\n", " pipeline_name=\"build-xgboost\",\n", " code_file_path=\"pipelines/xgboost_pipeline.py\",\n", " pipeline_configuration=pipeline_config,\n", " lambdas=[\n", " dict(\n", " arn_handler=\"metric_extraction_lambda_arn\",\n", " function_name=\"extract_metrics\",\n", " script=\"lambdas/extract_metrics/extract_metrics.py\",\n", " handler=\"extract_metrics.lambda_handler\",\n", " timeout=10,\n", " memory_size=128,\n", " runtime=\"python3.8\",\n", " )\n", " ],\n", ")\n", "with (conf_path / \"xgboost.pipeline.json\").open(\"w\") as f:\n", " json.dump(build_config, f, indent=2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Cleanup" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Delete the model building pipeline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model_training_pipeline.delete()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Delete all model packages created by the development pipeline and the Model Package Group from the Model registry" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sagemaker_session = sagemaker.Session()\n", "sagemaker_client = sagemaker_session.sagemaker_client" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "[\n", " sagemaker_client.delete_model_package(ModelPackageName=k['ModelPackageArn'])\n", " for k\n", " in sagemaker_client.list_model_packages(ModelPackageGroupName=model_group_dev_name)['ModelPackageSummaryList']\n", "]\n", "\n", "sagemaker_client.delete_model_package_group(ModelPackageGroupName=model_group_dev_name)" ] } ], "metadata": { "interpreter": { "hash": "1c353f3a68dd98206e82f3d50f1fc89596d19746233993630d3adac6ea32e63e" }, "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.10" } }, "nbformat": 4, "nbformat_minor": 4 }