{ "cells": [ { "cell_type": "markdown", "id": "43b70349-6d60-439b-96f9-6d51ad738101", "metadata": {}, "source": [ "## Introduction" ] }, { "cell_type": "markdown", "id": "a26a66ee-baca-4d20-b713-5950c7e18248", "metadata": {}, "source": [ "This is our fourth notebook which will dive deep into automating machine learning workflows to create a more repeatable path to production. \n", "\n", "Here, we will put on the hat of a `ML Engineer` and perform the tasks required to automate the tasks within our machine learning workflows as well as orchestrate the steps. For this, we'll build pipeline steps that include all the previous notebooks components into one singular entity. This pipeline entity accomplishes a repeatable ML workflow with some reliability built in through minimal quality gates. \n", "\n", "For this task we will be using Amazon SageMaker Pipelines capabilities to build out an end-to-end machine learning pipeline. \n", "\n", "Let's get started!" ] }, { "cell_type": "code", "execution_count": null, "id": "6e7fd54d-9a9a-469c-9682-47aebd040627", "metadata": { "tags": [] }, "outputs": [], "source": [ "!pip install -U sagemaker" ] }, { "cell_type": "code", "execution_count": null, "id": "2fbbd01a-0abf-4553-b43c-fd8d5bd27f27", "metadata": { "tags": [] }, "outputs": [], "source": [ "!pip show sagemaker" ] }, { "cell_type": "code", "execution_count": null, "id": "8bddc040-672a-4f39-b587-5ac5059e690c", "metadata": { "tags": [] }, "outputs": [], "source": [ "%store -r" ] }, { "cell_type": "code", "execution_count": null, "id": "4c5d65d2-2733-4e6c-b858-ad25ae277706", "metadata": { "tags": [] }, "outputs": [], "source": [ "# Processing imports\n", "from sagemaker.sklearn.processing import SKLearnProcessor\n", "from sagemaker.processing import ProcessingInput, ProcessingOutput, ScriptProcessor\n", "\n", "# SageMaker Pipeline imports\n", "from sagemaker.workflow.properties import PropertyFile\n", "from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo\n", "from sagemaker.workflow.condition_step import ConditionStep\n", "from sagemaker.workflow.functions import JsonGet\n", "\n", "from sagemaker.workflow.pipeline import Pipeline\n", "from sagemaker.workflow.steps import ProcessingStep, TrainingStep, CreateModelStep, TransformStep, TuningStep\n", "from sagemaker.workflow.model_step import ModelStep\n", "\n", "from sagemaker.workflow.parameters import (\n", " ParameterInteger,\n", " ParameterString,\n", " ParameterFloat,\n", ")\n", "\n", "# Other imports\n", "import json\n", "import time\n", "from time import gmtime, strftime\n", "from sagemaker.sklearn.estimator import SKLearn\n", "from sagemaker.model import Model\n", "from sagemaker.tuner import IntegerParameter, HyperparameterTuner\n", "from sagemaker.inputs import TrainingInput\n", "from sagemaker.lambda_helper import Lambda\n", "from sagemaker.workflow.lambda_step import (\n", " LambdaStep,\n", " LambdaOutput,\n", " LambdaOutputTypeEnum,\n", ")\n", "\n", "# To test the endpoint once it's deployed\n", "from sagemaker.predictor import Predictor\n", "from sagemaker.serializers import CSVSerializer\n", "from sagemaker.deserializers import JSONDeserializer, CSVDeserializer\n", "from sagemaker.workflow.pipeline_context import PipelineSession\n", "from sagemaker.tuner import IntegerParameter, ContinuousParameter, HyperparameterTuner\n", "\n", "import sagemaker\n", "import json\n", "import boto3\n", "from sagemaker.model_metrics import ModelMetrics, MetricsSource\n", "import pandas as pd\n", "from sagemaker.feature_store.feature_group import FeatureGroup\n", "from helper_library import *\n", "\n", "from sagemaker.workflow.steps import CacheConfig" ] }, { "cell_type": "markdown", "id": "c2bebc0d-cffb-4969-a44e-1cf5501603e1", "metadata": {}, "source": [ "**Session variables**" ] }, { "cell_type": "code", "execution_count": null, "id": "fc34280e-cb38-4d9b-a32a-7ba90908998d", "metadata": { "tags": [] }, "outputs": [], "source": [ "# Useful SageMaker variables\n", "session = PipelineSession()\n", "bucket = session.default_bucket()\n", "role_arn= sagemaker.get_execution_role()\n", "region = session.boto_region_name\n", "sagemaker_client = boto3.client('sagemaker')\n", "aws_account_id = boto3.client('sts').get_caller_identity().get('Account')\n", "#lambda_role = create_lambda_iam_role('LambdaSageMakerExecutionRole')\n", "# Data paths in S3\n", "s3_prefix = 'aws-sm-ray-pipeline-workshop'\n", "bucket_prefix = f'{s3_prefix}/data/feature-store'\n", "model_prefix = f'{s3_prefix}/models'\n", "output_path = f's3://{bucket}/{s3_prefix}/data/sm_processed'\n", "fs_s3_path = f's3://{bucket}/{s3_prefix}/data/feature-store'" ] }, { "cell_type": "code", "execution_count": null, "id": "4256471a-5c63-4015-a77d-f215226234f0", "metadata": { "tags": [] }, "outputs": [], "source": [ "processing_instance_type='ml.m5.2xlarge'\n", "num_actors = 10\n", "train_instance_type = 'ml.c5.xlarge'\n", "train_instance_count = 3" ] }, { "cell_type": "markdown", "id": "7576aa33-7ca5-4719-9c72-a65e66da61e4", "metadata": {}, "source": [ "## Model Build pipeline with SageMaker Pipelines" ] }, { "cell_type": "markdown", "id": "0bcc1681-35c5-4977-99e7-386f69f34bf2", "metadata": { "tags": [] }, "source": [ "[Amazon SageMaker Pipelines](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-sdk.html) provides the ability to create a directed acryclic graph (DAG) containing the pipeline steps need to build and/or deploy machine learning models. Each pipeline, created through the provided Python SDK, is a series of interconnected steps. This same pipeline can also be exported as a JSON pipeline definition. \n", "\n", "The structure of a pipeline's DAG is determined by the data dependencies between steps. These data dependencies are created when the properties of a step's output are passed as the input to another step. The following image is a pipeline DAG that we'll be creating for our training pipeline:\n", "\n", "![](images/sagemaker-pipelines-dag.png)" ] }, { "cell_type": "markdown", "id": "c2a62deb-04a6-4e0d-adc4-fcf3227af690", "metadata": {}, "source": [ "#### Pipeline Parameters" ] }, { "cell_type": "markdown", "id": "fb653db1-edae-4dbc-b501-45c6388382c5", "metadata": {}, "source": [ "SageMaker Pipelines supports [pipeline parameters](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-parameters.html) allowing you to provide runtime parameters for each run of your pipeline. This allows you to change key inputs for each run of your pipeline without changing your pipeline code (ex. raw data on input)\n", "\n", "Here, we'll identify the parameters and set the parameter default. You can also use this feature to make it reusable (you'll be able to override these inputs upon executing the pipeline later in the notebook)." ] }, { "cell_type": "code", "execution_count": null, "id": "aad95589-c285-4d96-8338-fb2ddba81ddf", "metadata": { "tags": [] }, "outputs": [], "source": [ "# Upload raw data to S3\n", "local_data_path_ray = \"data/raw/ray/house_pricing.csv\"\n", "\n", "raw_data_s3_prefix = '{}/data/raw'.format(s3_prefix)\n", "raw_s3 = session.upload_data(path=local_data_path_ray, key_prefix=raw_data_s3_prefix)" ] }, { "cell_type": "code", "execution_count": null, "id": "9e78b39b-6458-4be5-894e-c12c151ad426", "metadata": { "tags": [] }, "outputs": [], "source": [ "# Optional step\n", "# Delete all file in the S3 prefix before begining preprocessing. \n", "# This is to prevent duplication of data when running this workshop multiple time.\n", "\n", "s3 = boto3.resource('s3')\n", "print(bucket)\n", "bucket_obj = s3.Bucket(bucket)\n", "print(f\"{s3_prefix}/data/sm_processed/\")\n", "files = bucket_obj.objects.filter(Prefix=f\"{s3_prefix}/data/sm_processed/\")\n", "files.delete()" ] }, { "cell_type": "code", "execution_count": null, "id": "00c38492-2c0a-48d0-a655-143dea5c2f50", "metadata": { "tags": [] }, "outputs": [], "source": [ "# Optional step\n", "# Delete all Feature Groups that start with the prefix fs-ray-. \n", "# This is to prevent duplication of feature stores when running this workshop multiple time.\n", "\n", "sm_client = boto3.client('sagemaker', region_name='us-east-1')\n", "sagemaker_session = sagemaker.Session(boto3.Session(region_name='us-east-1'))\n", "response = sm_client.list_feature_groups(\n", " NameContains='fs-ray-'\n", ")\n", "\n", "for feature in response[\"FeatureGroupSummaries\"]:\n", " print(f'deleting {feature[\"FeatureGroupName\"]}')\n", " resp = sm_client.delete_feature_group(\n", " FeatureGroupName=feature[\"FeatureGroupName\"]\n", " )" ] }, { "cell_type": "code", "execution_count": null, "id": "43f661e0-2184-4032-adea-921f9636a3c2", "metadata": { "tags": [] }, "outputs": [], "source": [ "processing_instance_count = ParameterInteger(\n", " name='ProcessingInstanceCount',\n", " default_value=1\n", ")\n", "\n", "feature_group_name = ParameterString(\n", " name='FeatureGroupName',\n", " default_value='fs-ray-synthetic-housing-data'\n", ")\n", "\n", "bucket_prefix = ParameterString(\n", " name='Bucket_Prefix',\n", " default_value='aws-ray-mlops-workshop/feature-store'\n", ")\n", "\n", "rmse_threshold = ParameterFloat(name=\"RMSEThreshold\", default_value=15000.0)\n", "\n", "train_size = ParameterString(\n", " name='TrainSize',\n", " default_value=\"0.6\"\n", ")\n", "\n", "val_size = ParameterString(\n", " name='ValidationSize',\n", " default_value=\"0.2\"\n", ")\n", "\n", "test_size = ParameterString(\n", " name='TestSize',\n", " default_value=\"0.2\"\n", ")\n" ] }, { "cell_type": "markdown", "id": "83c6fb4d-3dd5-45b4-a8aa-35cd8ed001b0", "metadata": {}, "source": [ "#### Setup Step Caching Configuration\n", "\n", "This configuration can be enabled on pipeline steps to allow SageMaker Pipelines to automatically check if a previous (successful) run of a pipeline step with the same values for specific parameters is found. If it is found, Pipelines propogates the results of that step to the next step without re-running the step saving both time and compute costs." ] }, { "cell_type": "code", "execution_count": null, "id": "c9f47071-47af-4d4b-afb3-2489b4d3d42b", "metadata": { "tags": [] }, "outputs": [], "source": [ "cache_config = CacheConfig(enable_caching=True, expire_after=\"PT12H\")" ] }, { "cell_type": "markdown", "id": "4c0df27b-cd2a-46ab-a180-535640cb629f", "metadata": {}, "source": [ "#### SageMaker Processing step" ] }, { "cell_type": "markdown", "id": "c0802af5-b304-4bb3-9865-20d0061a7e41", "metadata": {}, "source": [ "This should look very similar to the SageMaker Processing Job you configured in notebook 2. The only new line of code is the `ProcessingStep` line at the bottom of the cell below which allows us to take the Processing Job configuration and include it as a pipeline step. " ] }, { "cell_type": "code", "execution_count": null, "id": "35d19ba9-3b71-460b-ac77-84c24ce62cf1", "metadata": { "tags": [] }, "outputs": [], "source": [ "from sagemaker.workflow.functions import Join\n", "\n", "feature_store_ingestion = SKLearnProcessor(\n", " framework_version='1.0-1',\n", " role=role_arn,\n", " instance_type=processing_instance_type,\n", " instance_count=processing_instance_count,\n", " base_job_name='feature-store-ingestion',\n", " sagemaker_session=session\n", ")\n", "\n", "fs_processor_args = feature_store_ingestion.run(\n", " code=\"./pipeline_scripts/feature-store/script-fs.py\",\n", " inputs=[\n", " ProcessingInput(\n", " source=raw_s3,\n", " destination='/opt/ml/processing/input',\n", " s3_data_distribution_type='ShardedByS3Key'\n", " )\n", " ], \n", " arguments=[\n", " \"--feature_group_name\", feature_group_name,\n", " \"--num_actors\", str(num_actors),\n", " \"--bucket_prefix\", bucket_prefix,\n", " \"--role_arn\", role_arn,\n", " \"--region\", region,\n", " ]\n", ")\n", " \n", "feature_store_ingestion_step = ProcessingStep(\n", " name='FeatureStoreIngestion',\n", " step_args=fs_processor_args,\n", " cache_config=cache_config\n", ")" ] }, { "cell_type": "code", "execution_count": null, "id": "02ac87ff-c072-46da-9a58-9fc5cba77fd5", "metadata": { "tags": [] }, "outputs": [], "source": [ "f'{output_path}/test'" ] }, { "cell_type": "code", "execution_count": null, "id": "b091a470-28e5-4657-99ee-f03c6d614f5e", "metadata": { "tags": [] }, "outputs": [], "source": [ "preprocess_data_processor = SKLearnProcessor(\n", " framework_version='1.0-1',\n", " role=role_arn,\n", " instance_type=processing_instance_type,\n", " instance_count=processing_instance_count,\n", " base_job_name='preprocess-data',\n", " sagemaker_session=session,\n", " \n", ")\n", "\n", "processor_args = preprocess_data_processor.run(\n", " code=\"./pipeline_scripts/preprocessing/script.py\",\n", " outputs=[\n", " ProcessingOutput(\n", " output_name='train',\n", " destination=f'{output_path}/train',\n", " source='/opt/ml/processing/output/train'\n", " ),\n", " ProcessingOutput(\n", " output_name='validation',\n", " destination=f'{output_path}/validation',\n", " source='/opt/ml/processing/output/validation'\n", " ),\n", " ProcessingOutput(\n", " output_name='test',\n", " destination=f'{output_path}/test',\n", " source='/opt/ml/processing/output/test'\n", " )\n", " ], \n", " arguments=[\n", " \"--feature_group_name\", feature_group_name,\n", " \"--train_size\", train_size,\n", " \"--val_size\", val_size,\n", " \"--test_size\", test_size,\n", " \"--role_arn\", role_arn,\n", " \"--region\", region\n", " ]\n", ")\n", "\n", "preprocess_dataset_step = ProcessingStep(\n", " name='PreprocessData',\n", " step_args=processor_args,\n", " cache_config=cache_config\n", ")\n", "preprocess_dataset_step.add_depends_on([feature_store_ingestion_step])" ] }, { "cell_type": "markdown", "id": "08a4eb68-b40d-44f0-bee9-3a80f9c44ca9", "metadata": { "tags": [] }, "source": [ "## Hyperparameter Tuning Step\n", "Amazon SageMaker automatic model tuning, also known as hyperparameter tuning, finds the best version of a model by running many training jobs on your dataset using the algorithm and ranges of hyperparameters that you specify. It then chooses the hyperparameter values that result in a model that performs the best, as measured by a metric that you choose.\n", "\n", "This configuration should also look very similar to the SageMaker Training job you did in notebook 2. The only new line of code is the `TuningStep` line at the bottom of the cell below to allow us to run the training job as a step in our pipeline.\n", "\n", "You can learn more about [Hyperparameter Tuning](https://docs.aws.amazon.com/sagemaker/latest/dg/automatic-model-tuning-how-it-works.html) in the SageMaker docs." ] }, { "cell_type": "code", "execution_count": null, "id": "fb00126a-b940-468b-a848-cdff040d1ad2", "metadata": { "tags": [] }, "outputs": [], "source": [ "role_arn" ] }, { "cell_type": "code", "execution_count": null, "id": "26ee0931-91cf-4bda-9865-3f2de1316a95", "metadata": { "tags": [] }, "outputs": [], "source": [ "from sagemaker.xgboost.estimator import XGBoost\n", "\n", "hyperparams = {\n", " # Tuned hyperparameters\n", " #\"max_depth\": \"4\",\n", " #\"eta\": \"0.810249\",\n", " #\"min_child_weight\": \"79\",\n", " #\"subsample\": \"0.984023\",\n", " #\"objective\": \"reg:squarederror\",\n", " # Training job params\n", " # \"feature_group_name\": feature_group_name.to_string(),\n", " #\"validation_feature_group_name\": validation_feature_group_name,\n", " \"role_arn\": role_arn,\n", " \"region\": region,\n", "}\n", "\n", "estimator_parameters = {\n", " 'source_dir': './pipeline_scripts/train/',\n", " 'entry_point': 'script.py',\n", " 'framework_version': '1.7-1',\n", " 'instance_type': train_instance_type,\n", " 'instance_count': train_instance_count,\n", " 'hyperparameters': hyperparams,\n", " 'role': role_arn,\n", " 'base_job_name': 'XGBoost-model',\n", " 'output_path': f's3://{bucket}/{s3_prefix}/',\n", " 'image_scope': 'training',\n", "}\n", "\n", "estimator = XGBoost(**estimator_parameters)\n" ] }, { "cell_type": "code", "execution_count": null, "id": "5b87ae6d-0d77-488d-9add-c64c93b89303", "metadata": { "tags": [] }, "outputs": [], "source": [ "hyperparameter_ranges = {\n", " \"max_depth\": IntegerParameter(1, 8),\n", " \"eta\": ContinuousParameter(0.2, 1),\n", " \"min_child_weight\": IntegerParameter(0, 120),\n", " \"subsample\": ContinuousParameter(0.2, 1),\n", "}\n", "\n", "objective_metric_name = 'validation:rmse'\n", "objective_type = 'Minimize'\n", "tuner_parameters = {\n", " 'estimator': estimator,\n", " 'objective_metric_name': objective_metric_name,\n", " 'hyperparameter_ranges': hyperparameter_ranges,\n", " # 'metric_definitions': metric_definitions,\n", " 'max_jobs': 4,\n", " 'max_parallel_jobs': 4,\n", " 'objective_type': objective_type\n", " }\n", " \n", "tuner = HyperparameterTuner(**tuner_parameters)\n", "\n", "tuning_step = TuningStep(\n", " name=\"HPTuning\",\n", " tuner=tuner,\n", " inputs={\n", " \"train\": TrainingInput(\n", " s3_data=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[\n", " \"train\"\n", " ].S3Output.S3Uri,\n", " content_type=\"text/csv\"\n", " ),\n", " \"validation\": TrainingInput(\n", " s3_data=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[\n", " \"validation\"\n", " ].S3Output.S3Uri,\n", " content_type=\"text/csv\"\n", " )\n", " },\n", " #step_args=hpo_args,\n", " cache_config=cache_config,\n", ")\n", "tuning_step.add_depends_on([preprocess_dataset_step])" ] }, { "cell_type": "markdown", "id": "e6fb88d3-1a6a-4a93-8e83-c0c4ad169097", "metadata": { "tags": [] }, "source": [ "## Creating and Registering the best models" ] }, { "cell_type": "markdown", "id": "2ae5126f-4886-408a-aa8c-3e984362b7d4", "metadata": {}, "source": [ "After successfully completing the Hyperparameter Tuning step, you can either create SageMaker models from the model artifacts created by the training jobs from the TuningStep or register the models into the Model Registry. \n", "\n", "In this example we will implement a minimum quality gate that will compare the HPO objective metric (validation RMSE) against a threshold (`rmse_threshold` input parameter)\n", "\n", "To do that, we can create a SageMaker Processing Step that will utilize evaluation code (evaluation.py). The evaluation will be read the eval_metric (MAE and RMSE) put out by the best estimator of the hypertuning job and create a evalution.json that can be use by the ConditionLessThanOrEqualTo to create a minimum threashold for the model to be registred in to the Model Registry.\n", "\n", "[More information about regression metrics](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-model-quality-metrics.html)" ] }, { "cell_type": "code", "execution_count": null, "id": "51d44800-f9b4-4456-b82b-85e4a860e0e6", "metadata": { "tags": [] }, "outputs": [], "source": [ "%%writefile ./pipeline_scripts/evaluate/script.py\n", "import subprocess\n", "import sys\n", "subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'sagemaker', 'ray', 'modin[ray]', 'pydantic==1.10.10', 'xgboost_ray'])\n", "import os\n", "import time\n", "import tarfile\n", "import argparse\n", "import json\n", "import logging\n", "import boto3\n", "import sagemaker\n", "import glob\n", "\n", "import pathlib\n", "import numpy as np\n", "from math import sqrt\n", "from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score\n", "\n", "# Sagemaker specific imports\n", "from sagemaker.session import Session\n", "import pandas as pd\n", "import xgboost as xgb\n", "\n", "# Ray specific imports\n", "# import ray\n", "# from ray.air.checkpoint import Checkpoint\n", "# from ray.train.xgboost import XGBoostCheckpoint, XGBoostPredictor\n", "# import ray.cloudpickle as cloudpickle\n", "\n", "logger = logging.getLogger(__name__)\n", "logger.setLevel(logging.DEBUG)\n", "logger.addHandler(logging.StreamHandler(sys.stdout))\n", "\n", "if __name__ == \"__main__\":\n", " logger.debug('Starting evaluation.')\n", " \n", " model_dir = '/opt/ml/processing/model'\n", " for file in os.listdir(model_dir):\n", " logger.info(file)\n", " \n", " model_path = os.path.join(model_dir, 'model.tar.gz')\n", " with tarfile.open(model_path) as tar:\n", " tar.extractall(path=model_dir)\n", " \n", " for file in os.listdir(model_dir):\n", " logger.info(file)\n", " \n", " logger.debug('Loading sklearn model.')\n", " model = xgb.Booster()\n", " model.load_model(os.path.join(model_dir, 'model.xgb'))\n", "\n", " logger.debug('Reading test data.')\n", "\n", " test_path = \"/opt/ml/processing/test/\"\n", " # Get list of all csv files in folder\n", " csv_files = glob.glob(f'{test_path}*.csv')\n", " \n", " # Read each CSV file into DataFrame\n", " # This creates a list of dataframes\n", " df_list = (pd.read_csv(file, header=0) for file in csv_files)\n", "\n", " # Concatenate all DataFrames\n", " df = pd.concat(df_list, ignore_index=True)\n", " df.reset_index(drop=True, inplace=True)\n", " print(df.head(5))\n", " y_test = df[\"PRICE\"].to_numpy()\n", " df.drop(columns=[\"PRICE\"], axis=1, inplace=True)\n", "\n", " X_test = xgb.DMatrix(df.values)\n", " \n", " logger.info('Performing predictions against test data.')\n", " predictions = model.predict(X_test)\n", "\n", " # See the regression metrics\n", " # see: https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-model-quality-metrics.html\n", " logger.debug('Calculating metrics.')\n", " mae = mean_absolute_error(y_test, predictions)\n", " mse = mean_squared_error(y_test, predictions)\n", " rmse = sqrt(mse)\n", " std = np.std(y_test - predictions)\n", " report_dict = {\n", " 'regression_metrics': {\n", " 'mae': {\n", " 'value': mae,\n", " \"standard_deviation\": std\n", " },\n", " 'rmse': {\n", " 'value': rmse,\n", " \"standard_deviation\": std\n", " },\n", " },\n", " }\n", "\n", " output_dir = '/opt/ml/processing/evaluation'\n", " pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)\n", "\n", " logger.info('Writing out evaluation report with rmse: %f', rmse)\n", " evaluation_path = f'{output_dir}/evaluation.json'\n", " with open(evaluation_path, 'w') as f:\n", " f.write(json.dumps(report_dict))" ] }, { "cell_type": "code", "execution_count": null, "id": "40b3789c-e416-407a-8d2e-893998e34979", "metadata": { "tags": [] }, "outputs": [], "source": [ "evaluation_processor = SKLearnProcessor(\n", " framework_version='0.23-1',\n", " role=role_arn,\n", " instance_type='ml.m5.xlarge',\n", " instance_count=processing_instance_count,\n", " base_job_name='evaluation',\n", " sagemaker_session=session,\n", ")" ] }, { "cell_type": "code", "execution_count": null, "id": "a8dc41ce-2f02-44a1-ad06-fc0034d091aa", "metadata": { "tags": [] }, "outputs": [], "source": [ "# Specify where we'll store the model evaluation results so\n", "# that other steps can access those results\n", "evaluation_report = PropertyFile(\n", " name='EvaluationReport',\n", " output_name='evaluation',\n", " path='evaluation.json',\n", ")\n", "\n", "# A ProcessingStep is used to evaluate the performance of a selected model from the HPO step. \n", "# In this case, the top performing model is evaluated. \n", "evaluation_step = ProcessingStep(\n", " name='EvaluateModel',\n", " processor=evaluation_processor,\n", " inputs=[\n", " ProcessingInput(\n", " source=tuning_step.get_top_model_s3_uri(\n", " top_k=0, s3_bucket=bucket, prefix=s3_prefix\n", " ),\n", " destination='/opt/ml/processing/model',\n", " ),\n", " ProcessingInput(\n", " source=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs['test'].S3Output.S3Uri,\n", " destination='/opt/ml/processing/test',\n", " ),\n", " ],\n", " outputs=[\n", " ProcessingOutput(\n", " output_name='evaluation', source='/opt/ml/processing/evaluation'\n", " ),\n", " ],\n", " code='./pipeline_scripts/evaluate/script.py',\n", " property_files=[evaluation_report],\n", ")" ] }, { "cell_type": "code", "execution_count": null, "id": "473526c9-aea1-400d-8809-dba4fac0c7b5", "metadata": { "tags": [] }, "outputs": [], "source": [ "from sagemaker.xgboost.model import XGBoostModel\n", "from sagemaker.utils import unique_name_from_base\n", "\n", "model_package_group_name = unique_name_from_base('synthetic-housing-models-ray-')\n", "\n", "model_metrics = ModelMetrics(\n", " model_statistics=MetricsSource(\n", " s3_uri='{}/evaluation.json'.format(\n", " evaluation_step.arguments['ProcessingOutputConfig']['Outputs'][0]['S3Output'][\n", " 'S3Uri'\n", " ]\n", " ),\n", " content_type='application/json',\n", " )\n", ")\n", "\n", "# Based on the results of the evaluation, the model is registered into the Model Registry using a ConditionStep.\n", "best_model = XGBoostModel(\n", " #image_uri=estimator.training_image_uri(),\n", " model_data=tuning_step.get_top_model_s3_uri(top_k=0, s3_bucket=bucket, prefix=s3_prefix),\n", " # source_dir=estimator.source_dir,\n", " #entry_point=estimator.entry_point,\n", " entry_point=\"./pipeline_scripts/inference/script.py\",\n", " role=role_arn,\n", " sagemaker_session=session,\n", " framework_version=\"1.7-1\"\n", ")\n", "\n", "model_registry_args = best_model.register(\n", " content_types=['text/csv'],\n", " response_types=['application/json'],\n", " inference_instances=['ml.t2.medium', 'ml.m5.xlarge'],\n", " transform_instances=['ml.m5.xlarge'],\n", " model_package_group_name=model_package_group_name,\n", " approval_status='PendingManualApproval',\n", " description='XGBoost model to predict synthetic housing prices',\n", " model_metrics=model_metrics\n", ")\n", "\n", "register_step = ModelStep(\n", " name='RegisterTrainedModel',\n", " step_args=model_registry_args\n", ")" ] }, { "cell_type": "code", "execution_count": null, "id": "6c077ce3-3d75-4fdd-a813-f3ec4e38452a", "metadata": { "tags": [] }, "outputs": [], "source": [ "from sagemaker.workflow.fail_step import FailStep\n", "from sagemaker.workflow.functions import Join\n", "\n", "metrics_fail_step = FailStep(\n", " name=\"RMSEFail\",\n", " error_message=Join(on=\" \", values=[\"Execution failed due to RMSE >\", rmse_threshold]),\n", ")\n", "\n", "# Condition step for evaluating model quality and branching execution\n", "cond_lte = ConditionLessThanOrEqualTo(\n", " left=JsonGet(\n", " step_name=evaluation_step.name,\n", " property_file=evaluation_report,\n", " json_path='regression_metrics.rmse.value',\n", " ),\n", " right=rmse_threshold,\n", ")\n", "condition_step = ConditionStep(\n", " name='CheckEvaluation',\n", " conditions=[cond_lte],\n", " if_steps=[register_step],\n", " else_steps=[metrics_fail_step],\n", ")" ] }, { "cell_type": "code", "execution_count": null, "id": "f8eb3ff7-e490-4bfa-94aa-e11f980280dc", "metadata": { "tags": [] }, "outputs": [], "source": [ "# pipeline_name = 'synthetic-housing-training-pipeline-{}'.format(strftime('%d-%H-%M-%S', gmtime()))\n", "pipeline_name = 'synthetic-housing-training-sm-pipeline-ray'\n", "step_list = [\n", " feature_store_ingestion_step,\n", " preprocess_dataset_step,\n", " tuning_step,\n", " evaluation_step,\n", " condition_step\n", " ]\n", "\n", "training_pipeline = Pipeline(\n", " name=pipeline_name,\n", " parameters=[\n", " processing_instance_count,\n", " feature_group_name,\n", " train_size,\n", " val_size,\n", " test_size,\n", " bucket_prefix,\n", " rmse_threshold\n", " ],\n", " steps=step_list\n", ")\n", "\n", "# Note: If an existing pipeline has the same name it will be overwritten.\n", "training_pipeline.upsert(role_arn=role_arn)\n", "\n", "# Viewing the pipeline definition will all the string variables interpolated may help debug pipeline bugs. It is commented out here due to length.\n", "#json.loads(training_pipeline.definition())" ] }, { "cell_type": "code", "execution_count": null, "id": "ea2c9b73-5313-408a-83b4-a38ae7d7212e", "metadata": { "tags": [] }, "outputs": [], "source": [ "json.loads(training_pipeline.definition())" ] }, { "cell_type": "code", "execution_count": null, "id": "61e7059e-74fa-42dc-be57-bdb79c5f3056", "metadata": { "tags": [] }, "outputs": [], "source": [ "# This is where you could optionally override parameter defaults \n", "execution = training_pipeline.start()" ] }, { "cell_type": "code", "execution_count": null, "id": "9381d7d5-d9b1-4603-aa63-b80a56904201", "metadata": { "tags": [] }, "outputs": [], "source": [ "execution.describe()" ] }, { "cell_type": "code", "execution_count": null, "id": "0aae7749-b46c-47f8-8ca8-3e92d0c6e14f", "metadata": { "pycharm": { "name": "#%%\n" }, "tags": [] }, "outputs": [], "source": [ "execution.wait()" ] }, { "cell_type": "markdown", "id": "340e3571-c4a7-4673-9bd9-4c6f9dd5cee2", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "List the steps in the execution. These are the steps in the pipeline that have been resolved by the step executor service." ] }, { "cell_type": "code", "execution_count": null, "id": "654562dd-b494-4e10-b06c-b61e45c4b32e", "metadata": { "pycharm": { "name": "#%%\n" }, "tags": [] }, "outputs": [], "source": [ "execution.list_steps()" ] }, { "cell_type": "markdown", "id": "52d5f2c8-e163-4237-98a8-61768860a0d6", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "### Examining the Evaluation\n", "\n", "Examine the resulting model evaluation after the pipeline completes. Download the resulting `evaluation.json` file from S3 and print the report." ] }, { "cell_type": "code", "execution_count": null, "id": "63f57fd2-bd0f-4047-a26e-372e3df90eab", "metadata": { "pycharm": { "name": "#%%\n" }, "tags": [] }, "outputs": [], "source": [ "from pprint import pprint\n", "\n", "\n", "evaluation_json = sagemaker.s3.S3Downloader.read_file(\n", " \"{}/evaluation.json\".format(\n", " evaluation_step.arguments[\"ProcessingOutputConfig\"][\"Outputs\"][0][\"S3Output\"][\"S3Uri\"]\n", " )\n", ")\n", "pprint(json.loads(evaluation_json))" ] }, { "cell_type": "markdown", "id": "eaff8dba-3bcd-4cd3-bac1-cb011ef50aff", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "### Lineage\n", "\n", "Review the lineage of the artifacts generated by the pipeline." ] }, { "cell_type": "code", "execution_count": null, "id": "d69b71a4-df65-4014-8b0d-e0c6609d4e69", "metadata": { "pycharm": { "name": "#%%\n" }, "tags": [] }, "outputs": [], "source": [ "import time\n", "from sagemaker.lineage.visualizer import LineageTableVisualizer\n", "\n", "\n", "viz = LineageTableVisualizer(sagemaker.session.Session())\n", "for execution_step in reversed(execution.list_steps()):\n", " print(execution_step)\n", " display(viz.show(pipeline_execution_step=execution_step))\n", " time.sleep(5)" ] }, { "cell_type": "markdown", "id": "e99f119a-e712-483b-9cd5-5440e581a864", "metadata": {}, "source": [ "## Cleaning up resources\n", "\n", "Users are responsible for cleaning up resources created when running this notebook. Specify the ModelName, ModelPackageName, and ModelPackageGroupName that need to be deleted. The model names are generated by the CreateModel step of the Pipeline and the property values are available only in the Pipeline context. To delete the models created by this pipeline, navigate to the Model Registry and Console to find the models to delete." ] }, { "cell_type": "code", "execution_count": null, "id": "69e1cb0c-1df2-422f-9c03-4a967f692e49", "metadata": { "tags": [] }, "outputs": [], "source": [ "# # Create a SageMaker client\n", "# sm_client = boto3.client(\"sagemaker\")\n", "\n", "# # Delete SageMaker Models\n", "# sm_client.delete_model(ModelName=\"...\")\n", "\n", "# # Delete Model Packages\n", "# sm_client.delete_model_package(ModelPackageName=\"...\")\n", "\n", "# # Delete the Model Package Group\n", "# sm_client.delete_model_package_group(ModelPackageGroupName=\"...\")\n", "\n", "# # Delete the Pipeline\n", "# sm_client.delete_pipeline(PipelineName=pipeline_name)\n", "\n", "# # Delete created dataset\n", "# !rm -rf ./data/processed/*" ] }, { "cell_type": "code", "execution_count": null, "id": "349bf33c-59df-4192-9d54-719b3a3567b7", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "availableInstances": [ { "_defaultOrder": 0, "_isFastLaunch": true, "category": "General purpose", "gpuNum": 0, "memoryGiB": 4, "name": "ml.t3.medium", "vcpuNum": 2 }, { "_defaultOrder": 1, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "memoryGiB": 8, "name": "ml.t3.large", "vcpuNum": 2 }, { "_defaultOrder": 2, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "memoryGiB": 16, "name": "ml.t3.xlarge", "vcpuNum": 4 }, { "_defaultOrder": 3, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "memoryGiB": 32, "name": "ml.t3.2xlarge", "vcpuNum": 8 }, { "_defaultOrder": 4, "_isFastLaunch": true, "category": "General purpose", "gpuNum": 0, "memoryGiB": 8, "name": "ml.m5.large", "vcpuNum": 2 }, { "_defaultOrder": 5, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "memoryGiB": 16, "name": "ml.m5.xlarge", "vcpuNum": 4 }, { "_defaultOrder": 6, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "memoryGiB": 32, "name": "ml.m5.2xlarge", "vcpuNum": 8 }, { "_defaultOrder": 7, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "memoryGiB": 64, "name": "ml.m5.4xlarge", "vcpuNum": 16 }, { "_defaultOrder": 8, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "memoryGiB": 128, "name": "ml.m5.8xlarge", "vcpuNum": 32 }, { "_defaultOrder": 9, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "memoryGiB": 192, "name": "ml.m5.12xlarge", "vcpuNum": 48 }, { "_defaultOrder": 10, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "memoryGiB": 256, "name": "ml.m5.16xlarge", "vcpuNum": 64 }, { "_defaultOrder": 11, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "memoryGiB": 384, "name": "ml.m5.24xlarge", "vcpuNum": 96 }, { "_defaultOrder": 12, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "memoryGiB": 8, "name": "ml.m5d.large", "vcpuNum": 2 }, { "_defaultOrder": 13, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "memoryGiB": 16, "name": "ml.m5d.xlarge", "vcpuNum": 4 }, { "_defaultOrder": 14, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "memoryGiB": 32, "name": "ml.m5d.2xlarge", "vcpuNum": 8 }, { "_defaultOrder": 15, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "memoryGiB": 64, "name": "ml.m5d.4xlarge", "vcpuNum": 16 }, { "_defaultOrder": 16, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "memoryGiB": 128, "name": "ml.m5d.8xlarge", "vcpuNum": 32 }, { "_defaultOrder": 17, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "memoryGiB": 192, "name": "ml.m5d.12xlarge", "vcpuNum": 48 }, { "_defaultOrder": 18, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "memoryGiB": 256, "name": "ml.m5d.16xlarge", "vcpuNum": 64 }, { "_defaultOrder": 19, "_isFastLaunch": false, "category": "General purpose", "gpuNum": 0, "memoryGiB": 384, "name": "ml.m5d.24xlarge", "vcpuNum": 96 }, { "_defaultOrder": 20, "_isFastLaunch": true, "category": "Compute optimized", "gpuNum": 0, "memoryGiB": 4, "name": "ml.c5.large", "vcpuNum": 2 }, { "_defaultOrder": 21, "_isFastLaunch": false, "category": "Compute optimized", "gpuNum": 0, "memoryGiB": 8, "name": "ml.c5.xlarge", "vcpuNum": 4 }, { "_defaultOrder": 22, "_isFastLaunch": false, "category": "Compute optimized", "gpuNum": 0, "memoryGiB": 16, "name": "ml.c5.2xlarge", "vcpuNum": 8 }, { "_defaultOrder": 23, "_isFastLaunch": false, "category": "Compute optimized", "gpuNum": 0, "memoryGiB": 32, "name": "ml.c5.4xlarge", "vcpuNum": 16 }, { "_defaultOrder": 24, "_isFastLaunch": false, "category": "Compute optimized", "gpuNum": 0, "memoryGiB": 72, "name": "ml.c5.9xlarge", "vcpuNum": 36 }, { "_defaultOrder": 25, "_isFastLaunch": false, "category": "Compute optimized", "gpuNum": 0, "memoryGiB": 96, "name": "ml.c5.12xlarge", "vcpuNum": 48 }, { "_defaultOrder": 26, "_isFastLaunch": false, "category": "Compute optimized", "gpuNum": 0, "memoryGiB": 144, "name": "ml.c5.18xlarge", "vcpuNum": 72 }, { "_defaultOrder": 27, "_isFastLaunch": false, "category": "Compute optimized", "gpuNum": 0, "memoryGiB": 192, "name": "ml.c5.24xlarge", "vcpuNum": 96 }, { "_defaultOrder": 28, "_isFastLaunch": true, "category": "Accelerated computing", "gpuNum": 1, "memoryGiB": 16, "name": "ml.g4dn.xlarge", "vcpuNum": 4 }, { "_defaultOrder": 29, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 1, "memoryGiB": 32, "name": "ml.g4dn.2xlarge", "vcpuNum": 8 }, { "_defaultOrder": 30, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 1, "memoryGiB": 64, "name": "ml.g4dn.4xlarge", "vcpuNum": 16 }, { "_defaultOrder": 31, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 1, "memoryGiB": 128, "name": "ml.g4dn.8xlarge", "vcpuNum": 32 }, { "_defaultOrder": 32, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 4, "memoryGiB": 192, "name": "ml.g4dn.12xlarge", "vcpuNum": 48 }, { "_defaultOrder": 33, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 1, "memoryGiB": 256, "name": "ml.g4dn.16xlarge", "vcpuNum": 64 }, { "_defaultOrder": 34, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 1, "memoryGiB": 61, "name": "ml.p3.2xlarge", "vcpuNum": 8 }, { "_defaultOrder": 35, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 4, "memoryGiB": 244, "name": "ml.p3.8xlarge", "vcpuNum": 32 }, { "_defaultOrder": 36, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 8, "memoryGiB": 488, "name": "ml.p3.16xlarge", "vcpuNum": 64 }, { "_defaultOrder": 37, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 8, "memoryGiB": 768, "name": "ml.p3dn.24xlarge", "vcpuNum": 96 }, { "_defaultOrder": 38, "_isFastLaunch": false, "category": "Memory Optimized", "gpuNum": 0, "memoryGiB": 16, "name": "ml.r5.large", "vcpuNum": 2 }, { "_defaultOrder": 39, "_isFastLaunch": false, "category": "Memory Optimized", "gpuNum": 0, "memoryGiB": 32, "name": "ml.r5.xlarge", "vcpuNum": 4 }, { "_defaultOrder": 40, "_isFastLaunch": false, "category": "Memory Optimized", "gpuNum": 0, "memoryGiB": 64, "name": "ml.r5.2xlarge", "vcpuNum": 8 }, { "_defaultOrder": 41, "_isFastLaunch": false, "category": "Memory Optimized", "gpuNum": 0, "memoryGiB": 128, "name": "ml.r5.4xlarge", "vcpuNum": 16 }, { "_defaultOrder": 42, "_isFastLaunch": false, "category": "Memory Optimized", "gpuNum": 0, "memoryGiB": 256, "name": "ml.r5.8xlarge", "vcpuNum": 32 }, { "_defaultOrder": 43, "_isFastLaunch": false, "category": "Memory Optimized", "gpuNum": 0, "memoryGiB": 384, "name": "ml.r5.12xlarge", "vcpuNum": 48 }, { "_defaultOrder": 44, "_isFastLaunch": false, "category": "Memory Optimized", "gpuNum": 0, "memoryGiB": 512, "name": "ml.r5.16xlarge", "vcpuNum": 64 }, { "_defaultOrder": 45, "_isFastLaunch": false, "category": "Memory Optimized", "gpuNum": 0, "memoryGiB": 768, "name": "ml.r5.24xlarge", "vcpuNum": 96 }, { "_defaultOrder": 46, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 1, "memoryGiB": 16, "name": "ml.g5.xlarge", "vcpuNum": 4 }, { "_defaultOrder": 47, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 1, "memoryGiB": 32, "name": "ml.g5.2xlarge", "vcpuNum": 8 }, { "_defaultOrder": 48, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 1, "memoryGiB": 64, "name": "ml.g5.4xlarge", "vcpuNum": 16 }, { "_defaultOrder": 49, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 1, "memoryGiB": 128, "name": "ml.g5.8xlarge", "vcpuNum": 32 }, { "_defaultOrder": 50, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 1, "memoryGiB": 256, "name": "ml.g5.16xlarge", "vcpuNum": 64 }, { "_defaultOrder": 51, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 4, "memoryGiB": 192, "name": "ml.g5.12xlarge", "vcpuNum": 48 }, { "_defaultOrder": 52, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 4, "memoryGiB": 384, "name": "ml.g5.24xlarge", "vcpuNum": 96 }, { "_defaultOrder": 53, "_isFastLaunch": false, "category": "Accelerated computing", "gpuNum": 8, "memoryGiB": 768, "name": "ml.g5.48xlarge", "vcpuNum": 192 } ], "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science 3.0)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/sagemaker-data-science-310-v1" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.6" } }, "nbformat": 4, "nbformat_minor": 5 }