{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Use SageMaker Pipelines to track and manage multiple trials\n", "\n", "In this notebook, we aim to show how to organize the manual steps involved in a ML lifecycle into an automated SageMaker Pipeline. It is recommended that you review the `01-Experiments.pynb` to understand the context of this exercise if you haven't.\n", "\n", "[Amazon SageMaker Pipelines](https://aws.amazon.com/sagemaker/pipelines/) is a purpose built, fully managed machine learning workflow orchestration that helps customers build, automate and manage their entire machine learning lifecycle. For our use case, Amazon SageMaker Pipelines provides a faster path to move from a manual experimentation phase with notebooks to a controlled automated ML workflow orchestration. It can be accessed via [Amazon SageMaker Studio](https://aws.amazon.com/sagemaker/studio/) for effective visual workflow tracking and monitoring. \n", "\n", "[Amazon SageMaker Pipelines](https://aws.amazon.com/sagemaker/pipelines/) is integrated with [Amazon SageMaker Python SDK](https://sagemaker.readthedocs.io/en/stable/) which helps you to create your workflows programmatically. A Pipeline consists of a series of interconnected steps. Each step defines an action that the Pipeline takes and the dependency between steps is defined by a direct acyclic graph (DAG) in the form of a JSON definition.\n", "\n", "\n", "At the time of writing, the `sagemaker` SDK version tested is `2.73.0`, while the `sagemaker-experiment` SDK library is `0.1.35`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sys\n", "import subprocess\n", "subprocess.check_call([sys.executable, '-m', 'pip', 'install', '-Uq', 'pip'])\n", "subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'sagemaker==2.73.0', '-Uq'])\n", "subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'sagemaker-experiments==0.1.35', '-Uq'])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Set up\n", "\n", "* The S3 bucket and prefix that you want to use for training and model data. This should be within the same region as the notebook instance, training, and hosting.\n", "* The IAM role arn that has access to your data and training jobs\n", "* The experiment name as the logical entity to keep our tests grouped and organized." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import json\n", "import boto3\n", "import time\n", "from time import strftime\n", "\n", "import sagemaker\n", "\n", "# Set up execution role, client, session, and default bucket\n", "\n", "role = sagemaker.get_execution_role()\n", "\n", "region = sagemaker.Session().boto_region_name\n", "sm_client = boto3.client(\"sagemaker\")\n", "boto_session = boto3.Session(region_name=region)\n", "\n", "sagemaker_session = sagemaker.session.Session(\n", " boto_session=boto_session, sagemaker_client=sm_client\n", ")\n", "\n", "bucket = sagemaker_session.default_bucket()\n", "experiment_name = 'DEMO-sagemaker-experiments-pipelines'\n", "prefix = experiment_name\n", "\n", "print(f\"bucket: {bucket}\")\n", "print(f\"region: {region}\")\n", "print(f\"role: {role}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Define the pipeline\n", "Here we define some configuration variables like instance count and instance type as pipeline parameters to set it dynamically during runtime. These variables will be populated when the pipeline is kick-started at the end of the notebook." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.workflow.execution_variables import ExecutionVariables\n", "from sagemaker.workflow.parameters import ParameterString, ParameterInteger\n", "from sagemaker.workflow.steps import CacheConfig\n", "\n", "base_job_prefix = \"pipeline-experiment-sample\"\n", "\n", "processing_instance_count = ParameterInteger(\n", " name=\"ProcessingInstanceCount\", default_value=1\n", ")\n", "\n", "training_instance_count = ParameterInteger(\n", " name=\"TrainingInstanceCount\", default_value=1\n", ")\n", "\n", "processing_instance_type = ParameterString(\n", " name=\"ProcessingInstanceType\", default_value=\"ml.m5.xlarge\"\n", ")\n", "training_instance_type = ParameterString(\n", " name=\"TrainingInstanceType\", default_value=\"ml.m5.xlarge\"\n", ")\n", "\n", "# Cache Pipeline steps to reduce execution time on subsequent executions\n", "cache_config = CacheConfig(enable_caching=True, expire_after=\"30d\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We use the Sklearn processor library to run our preprocessing logic using the SageMaker Processing job. The preprocessing step refers to the `california-housing-preprocessing.py` script which basically splits the input dataset into training, validation and test datasets on a predefined ratio." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!pygmentize 'california-housing-preprocessing.py'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Please note that we append the pipeline execution id to the dataset name to make it easy for tracking and monitoring the experiment runs." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.sklearn.processing import SKLearnProcessor\n", "from sagemaker.processing import ProcessingInput, ProcessingOutput\n", "\n", "from sagemaker.workflow.steps import ProcessingStep\n", "from sagemaker.workflow.functions import Join\n", "\n", "framework_version = \"0.23-1\"\n", "\n", "sklearn_processor = SKLearnProcessor(\n", " framework_version=framework_version,\n", " instance_type=processing_instance_type,\n", " instance_count=processing_instance_count,\n", " base_job_name=\"sklearn-ca-housing\",\n", " role=role,\n", ")\n", "\n", "process_step = ProcessingStep(\n", " name=\"ca-housing-preprocessing\",\n", " processor=sklearn_processor,\n", " outputs=[\n", " ProcessingOutput(\n", " output_name=\"train\",\n", " source=\"/opt/ml/processing/train\",\n", " destination=Join(\n", " on=\"/\",\n", " values=[\n", " \"s3://{}\".format(bucket),\n", " prefix,\n", " ExecutionVariables.PIPELINE_EXECUTION_ID,\n", " \"train\",\n", " ],\n", " ),\n", " ),\n", " ProcessingOutput(\n", " output_name=\"validation\",\n", " source=\"/opt/ml/processing/validation\",\n", " destination=Join(\n", " on=\"/\",\n", " values=[\n", " \"s3://{}\".format(bucket),\n", " prefix,\n", " ExecutionVariables.PIPELINE_EXECUTION_ID,\n", " \"validation\",\n", " ],\n", " )\n", " ),\n", " ProcessingOutput(\n", " output_name=\"test\",\n", " source=\"/opt/ml/processing/test\",\n", " destination=Join(\n", " on=\"/\",\n", " values=[\n", " \"s3://{}\".format(bucket),\n", " prefix,\n", " ExecutionVariables.PIPELINE_EXECUTION_ID,\n", " \"test\",\n", " ],\n", " )\n", " ),\n", " ],\n", " code=\"california-housing-preprocessing.py\",\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We use the [SageMaker XGBoost](https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html) built-in algorithm by referring the container image as specified below for training. After configuring the training instance, count and path, we go on to set static values for the hyperparameters that are determined as optimal during our manual run of experiments. The hyperparameter tuning step uses the training and validation datasets to find the best model based on several iterations.\n", "\n", "Our aim is to minimize RMSE with this tuning which is defined as the objective metric with the objective type minimize. We also define the output path for the model artifacts from the Hyperparameter Tuning Job." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.estimator import Estimator\n", "from sagemaker.inputs import TrainingInput\n", "\n", "from sagemaker.tuner import ContinuousParameter, HyperparameterTuner\n", "from sagemaker.workflow.steps import TuningStep\n", "\n", "model_path = f\"s3://{bucket}/{prefix}/{base_job_prefix}/ca-housing-experiment-pipeline\"\n", "\n", "image_uri = sagemaker.image_uris.retrieve(\n", " framework=\"xgboost\",\n", " region=region,\n", " version=\"1.2-2\",\n", " py_version=\"py3\",\n", " instance_type=training_instance_type,\n", ")\n", "\n", "xgb_train = Estimator(\n", " image_uri=image_uri,\n", " instance_type=training_instance_type,\n", " instance_count=training_instance_count,\n", " output_path=model_path,\n", " base_job_name=f\"{base_job_prefix}/ca-housing-train\",\n", " sagemaker_session=sagemaker_session,\n", " role=role,\n", ")\n", "\n", "xgb_train.set_hyperparameters(\n", " eval_metric=\"rmse\",\n", " objective=\"reg:squarederror\", # Define the object metric for the training job\n", " num_round=50,\n", " max_depth=5,\n", " eta=0.2,\n", " gamma=4,\n", " min_child_weight=6,\n", " subsample=0.7,\n", " verbosity=1\n", ")\n", "\n", "objective_metric_name = \"validation:rmse\"\n", "\n", "hyperparameter_ranges = {\n", " \"lambda\": ContinuousParameter(0.01, 10, scaling_type=\"Logarithmic\")\n", "}\n", "\n", "tuner_log = HyperparameterTuner(\n", " xgb_train,\n", " objective_metric_name,\n", " hyperparameter_ranges,\n", " max_jobs=10,\n", " max_parallel_jobs=3,\n", " strategy=\"Bayesian\",\n", " objective_type=\"Minimize\",\n", ")\n", "\n", "tune_step = TuningStep(\n", " name=\"HPTuning\",\n", " tuner=tuner_log,\n", " inputs={\n", " \"train\": TrainingInput(\n", " s3_data=process_step.properties.ProcessingOutputConfig.Outputs[\n", " \"train\"\n", " ].S3Output.S3Uri,\n", " content_type=\"text/csv\",\n", " ),\n", " \"validation\": TrainingInput(\n", " s3_data=process_step.properties.ProcessingOutputConfig.Outputs[\n", " \"validation\"\n", " ].S3Output.S3Uri,\n", " content_type=\"text/csv\",\n", " ),\n", " },\n", " cache_config=cache_config\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "After the training completes, model artifacts for multiple iterations get stored in s3. This step creates a model from the best artifact that was generated in hyperparameter tuning. We can safely assume this model provides the lowest RMSE and highly optimized." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.model import Model\n", "from sagemaker.xgboost import XGBoostPredictor\n", "from sagemaker.workflow.steps import CreateModelStep\n", "\n", "## Create model\n", "model_bucket_key = f\"{bucket}/{prefix}/{base_job_prefix}/ca-housing-experiment-pipeline\"\n", "model_candidate = Model(\n", " image_uri=image_uri,\n", " model_data=tune_step.get_top_model_s3_uri(top_k=0, s3_bucket=model_bucket_key),\n", " sagemaker_session=sagemaker_session,\n", " role=role,\n", " predictor_cls=XGBoostPredictor,\n", ")\n", "\n", "create_model_step = CreateModelStep(\n", " name=\"CreateTopModel\",\n", " model=model_candidate,\n", " inputs=sagemaker.inputs.CreateModelInput(instance_type=\"ml.m4.large\"),\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now, we focus on integrating our pipeline with SageMaker Experiments. We therefore assemble the steps into a pipeline associated with an experiment name. If the experiment already exists, we just add the trial runs to it. If not,we create a new experiment as shown below." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from smexperiments.experiment import Experiment\n", "from smexperiments.trial import Trial\n", "from smexperiments.trial_component import TrialComponent\n", "\n", "# create an experiment if it doesnt exist\n", "try:\n", " demo_experiment = Experiment.load(experiment_name=experiment_name)\n", " print(\"existing experiment loaded\")\n", "except Exception as ex:\n", " if \"ResourceNotFound\" in str(ex):\n", " demo_experiment = Experiment.create(\n", " experiment_name=experiment_name,\n", " description = \"Demo experiment\",\n", " tags = [{'Key': 'demo-experiments', 'Value': 'demo1'}]\n", " )\n", " print(\"new experiment created\")\n", " else:\n", " print(f\"Unexpected {ex}, {type(ex)}\")\n", " print(\"Dont go forward!\")\n", " raise" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`PipelineExperimentConfig` allows customers to refer and link an experiment to a specific Pipeline execution " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.workflow.pipeline import PipelineExperimentConfig\n", "\n", "pipeline_name = f\"CAHousingExperimentsPipeline\"\n", "\n", "#Pipeline experiment config\n", "ca_housing_experiment_config = PipelineExperimentConfig(\n", " experiment_name,\n", " Join(\n", " on=\"-\",\n", " values=[\n", " \"pipeline-execution\",\n", " ExecutionVariables.PIPELINE_EXECUTION_ID\n", " ],\n", " )\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now that we have an experiment config object and Pipeline steps configured, we create a pipeline with a sequence of steps defined above and pass the parameters as below." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.workflow.pipeline import Pipeline\n", "\n", "pipeline = Pipeline(\n", " name=pipeline_name,\n", " pipeline_experiment_config=ca_housing_experiment_config,\n", " parameters=[\n", " processing_instance_count,\n", " processing_instance_type,\n", " training_instance_count,\n", " training_instance_type\n", " ],\n", " steps=[process_step,tune_step,create_model_step],\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Run the pipeline to create a model" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline.upsert(role_arn=sagemaker.get_execution_role())\n", "execution = pipeline.start()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We wait for the pipeline execution to complete. It should take around ~20 minutes." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time \n", "\n", "execution.wait()\n", "execution.describe()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Explore the results of hyperparameter tuning\n", "Plot the metrics against the hyperparameters.\n", "\n", "Get the trial hyperparameters and metrics." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.analytics import ExperimentAnalytics\n", "from smexperiments.search_expression import Filter, Operator, SearchExpression\n", "\n", "# SM Pipeline injects the Execution ID into trial component names\n", "execution_id = execution.describe()['PipelineExecutionArn'].split('/')[-1]\n", "source_arn_filter = Filter(\n", " name=\"TrialComponentName\", operator=Operator.CONTAINS, value=execution_id\n", ")\n", "\n", "source_type_filter = Filter(\n", " name=\"Source.SourceType\", operator=Operator.EQUALS, value=\"SageMakerTrainingJob\"\n", ")\n", "\n", "search_expression = SearchExpression(\n", " filters=[source_arn_filter, source_type_filter]\n", ")\n", "\n", "trial_component_analytics = ExperimentAnalytics(\n", " sagemaker_session=sagemaker_session,\n", " experiment_name=experiment_name,\n", " search_expression=search_expression.to_boto()\n", ")\n", "analytic_table = trial_component_analytics.dataframe()\n", "analytic_table.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Plot the last validation dataset RMSE value of each trial run against the lambda the run used." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ax = analytic_table.plot.scatter(\"lambda\", \"validation:rmse - Last\", grid=True)\n", "analytic_table[\"TrialComponentID\"] = [str(int(x.split('-')[3])) for x in analytic_table[\"TrialComponentName\"]]\n", "for _, v in analytic_table[[\"TrialComponentID\", \"lambda\", \"validation:rmse - Last\"]].iterrows():\n", " ax.annotate(v.TrialComponentID, v[1:])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Clean up" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Uncomment the cell below to remove all Experiments, and their associated Trials and TrialsComponents." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#demo_experiment.delete_all(action=\"--force\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Next steps\n", "\n", "As a next step, plan to use these three Amazon SageMaker features, Amazon SageMaker Studio, Amazon SageMaker Experiments and Amazon SageMaker Pipelines, for your next ML project." ] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:eu-west-1:470317259841:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 4 }