{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "\n", "import boto3\n", "import logging\n", "import sagemaker\n", "import sagemaker.session\n", "\n", "from sagemaker.estimator import Estimator\n", "from sagemaker.inputs import TrainingInput\n", "from sagemaker.model_metrics import (\n", " MetricsSource,\n", " ModelMetrics,\n", ")\n", "from sagemaker.processing import (\n", " ProcessingInput,\n", " ProcessingOutput,\n", " ScriptProcessor,\n", ")\n", "from sagemaker.sklearn.processing import SKLearnProcessor\n", "from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo\n", "from sagemaker.workflow.condition_step import (\n", " ConditionStep,\n", ")\n", "from sagemaker.workflow.functions import (\n", " JsonGet,\n", ")\n", "from sagemaker.workflow.parameters import (\n", " ParameterInteger,\n", " ParameterString,\n", ")\n", "from sagemaker.workflow.pipeline import Pipeline\n", "from sagemaker.workflow.properties import PropertyFile\n", "from sagemaker.workflow.steps import (\n", " ProcessingStep,\n", " TrainingStep,\n", ")\n", "from sagemaker.workflow.step_collections import RegisterModel\n", "\n", "from botocore.exceptions import ClientError" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "logger = logging.getLogger(__name__)\n", "\n", "\"\"\"Environment Variables\"\"\"\n", "proj_dir = \"TO_BE_DEFINED\"\n", "region= \"TO_BE_DEFINED\"\n", "model_artefact_bucket= \"TO_BE_DEFINED\"\n", "role = \"TO_BE_DEFINED\"\n", "project_name= \"TO_BE_DEFINED\"\n", "stage= \"test\"\n", "model_package_group_name=\"AbalonePackageGroup\",\n", "pipeline_name=\"AbalonePipeline\",\n", "base_job_prefix=\"Abalone\",\n", "project_id=\"SageMakerProjectId\",\n", "processing_image_uri=None\n", "training_image_uri=None\n", "inference_image_uri=None" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def get_session(region, default_bucket):\n", " \"\"\"Gets the sagemaker session based on the region.\n", "\n", " Args:\n", " region: the aws region to start the session\n", " default_bucket: the bucket to use for storing the artifacts\n", "\n", " Returns:\n", " `sagemaker.session.Session instance\n", " \"\"\"\n", "\n", " boto_session = boto3.Session(region_name=region)\n", "\n", " sagemaker_client = boto_session.client(\"sagemaker\")\n", " runtime_client = boto_session.client(\"sagemaker-runtime\")\n", " return sagemaker.session.Session(\n", " boto_session=boto_session,\n", " sagemaker_client=sagemaker_client,\n", " sagemaker_runtime_client=runtime_client,\n", " default_bucket=default_bucket,\n", " )" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sagemaker_session = get_session(region, model_artefact_bucket)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Feature Engineering\n", "This section describes the different steps involved in feature engineering which includes loading and transforming different data sources to build the features needed for the ML Use Case" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "processing_instance_count = ParameterInteger(name=\"ProcessingInstanceCount\", default_value=1)\n", "processing_instance_type = ParameterString(name=\"ProcessingInstanceType\", default_value=\"ml.m5.xlarge\")\n", "training_instance_type = ParameterString(name=\"TrainingInstanceType\", default_value=\"ml.m5.xlarge\")\n", "inference_instance_type = ParameterString(name=\"InferenceInstanceType\", default_value=\"ml.m5.xlarge\")\n", "model_approval_status = ParameterString(name=\"ModelApprovalStatus\", default_value=\"PendingManualApproval\")\n", "input_data = ParameterString(\n", " name=\"InputDataUrl\",\n", " default_value=f\"s3://sagemaker-servicecatalog-seedcode-{region}/dataset/abalone-dataset.csv\",\n", ")\n", "processing_image_name = \"sagemaker-{0}-processingimagebuild\".format(project_id)\n", "training_image_name = \"sagemaker-{0}-trainingimagebuild\".format(project_id)\n", "inference_image_name = \"sagemaker-{0}-inferenceimagebuild\".format(project_id)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# processing step for feature engineering\n", "try:\n", " processing_image_uri = sagemaker_session.sagemaker_client.describe_image_version(\n", " ImageName=processing_image_name\n", " )[\"ContainerImage\"]\n", "\n", "except (sagemaker_session.sagemaker_client.exceptions.ResourceNotFound):\n", " processing_image_uri = sagemaker.image_uris.retrieve(\n", " framework=\"xgboost\",\n", " region=region,\n", " version=\"1.0-1\",\n", " py_version=\"py3\",\n", " instance_type=processing_instance_type,\n", " )" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Define Script Processor\n", "script_processor = ScriptProcessor(\n", " image_uri=processing_image_uri,\n", " instance_type=processing_instance_type,\n", " instance_count=processing_instance_count,\n", " base_job_name=f\"{base_job_prefix}/sklearn-abalone-preprocess\",\n", " command=[\"python3\"],\n", " sagemaker_session=sagemaker_session,\n", " role=role,\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Define ProcessingStep\n", "step_process = ProcessingStep(\n", " name=\"PreprocessAbaloneData\",\n", " processor=script_processor,\n", " outputs=[\n", " ProcessingOutput(output_name=\"train\", source=\"/opt/ml/processing/train\"),\n", " ProcessingOutput(output_name=\"validation\", source=\"/opt/ml/processing/validation\"),\n", " ProcessingOutput(output_name=\"test\", source=\"/opt/ml/processing/test\"),\n", " ],\n", " code=\"source_scripts/preprocessing/prepare_abalone_data/main.py\", # we must figure out this path to get it from step_source directory\n", " job_arguments=[\"--input-data\", input_data],\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Training an XGBoost model" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# training step for generating model artifacts\n", "model_path = f\"s3://{sagemaker_session.default_bucket()}/{base_job_prefix}/AbaloneTrain\"\n", "\n", "try:\n", " training_image_uri = sagemaker_session.sagemaker_client.describe_image_version(ImageName=training_image_name)[\n", " \"ContainerImage\"\n", " ]\n", "except (sagemaker_session.sagemaker_client.exceptions.ResourceNotFound):\n", " training_image_uri = sagemaker.image_uris.retrieve(\n", " framework=\"xgboost\",\n", " region=region,\n", " version=\"1.0-1\",\n", " py_version=\"py3\",\n", " instance_type=training_instance_type,\n", " )\n", "\n", "xgb_train = Estimator(\n", " image_uri=training_image_uri,\n", " instance_type=training_instance_type,\n", " instance_count=1,\n", " output_path=model_path,\n", " base_job_name=f\"{base_job_prefix}/abalone-train\",\n", " sagemaker_session=sagemaker_session,\n", " role=role,\n", ")\n", "xgb_train.set_hyperparameters(\n", " objective=\"reg:linear\",\n", " num_round=50,\n", " max_depth=5,\n", " eta=0.2,\n", " gamma=4,\n", " min_child_weight=6,\n", " subsample=0.7,\n", " silent=0,\n", ")\n", "step_train = TrainingStep(\n", " name=\"TrainAbaloneModel\",\n", " estimator=xgb_train,\n", " inputs={\n", " \"train\": TrainingInput(\n", " s3_data=step_process.properties.ProcessingOutputConfig.Outputs[\"train\"].S3Output.S3Uri,\n", " content_type=\"text/csv\",\n", " ),\n", " \"validation\": TrainingInput(\n", " s3_data=step_process.properties.ProcessingOutputConfig.Outputs[\"validation\"].S3Output.S3Uri,\n", " content_type=\"text/csv\",\n", " ),\n", " },\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Evaluate the Model" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# processing step for evaluation\n", "script_eval = ScriptProcessor(\n", " image_uri=training_image_uri,\n", " command=[\"python3\"],\n", " instance_type=processing_instance_type,\n", " instance_count=1,\n", " base_job_name=f\"{base_job_prefix}/script-abalone-eval\",\n", " sagemaker_session=sagemaker_session,\n", " role=role,\n", ")\n", "evaluation_report = PropertyFile(\n", " name=\"AbaloneEvaluationReport\",\n", " output_name=\"evaluation\",\n", " path=\"evaluation.json\",\n", ")\n", "step_eval = ProcessingStep(\n", " name=\"EvaluateAbaloneModel\",\n", " processor=script_eval,\n", " inputs=[\n", " ProcessingInput(\n", " source=step_train.properties.ModelArtifacts.S3ModelArtifacts,\n", " destination=\"/opt/ml/processing/model\",\n", " ),\n", " ProcessingInput(\n", " source=step_process.properties.ProcessingOutputConfig.Outputs[\"test\"].S3Output.S3Uri,\n", " destination=\"/opt/ml/processing/test\",\n", " ),\n", " ],\n", " outputs=[\n", " ProcessingOutput(output_name=\"evaluation\", source=\"/opt/ml/processing/evaluation\"),\n", " ],\n", " code=\"source_scripts/evaluate/evaluate_xgboost/main.py\",\n", " property_files=[evaluation_report],\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Conditional step to push model to SageMaker Model Registry" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# register model step that will be conditionally executed\n", "model_metrics = ModelMetrics(\n", " model_statistics=MetricsSource(\n", " s3_uri=\"{}/evaluation.json\".format(\n", " step_eval.arguments[\"ProcessingOutputConfig\"][\"Outputs\"][0][\"S3Output\"][\"S3Uri\"]\n", " ),\n", " content_type=\"application/json\",\n", " )\n", ")\n", "\n", "try:\n", " inference_image_uri = sagemaker_session.sagemaker_client.describe_image_version(ImageName=inference_image_name)[\n", " \"ContainerImage\"\n", " ]\n", "except (sagemaker_session.sagemaker_client.exceptions.ResourceNotFound):\n", " inference_image_uri = sagemaker.image_uris.retrieve(\n", " framework=\"xgboost\",\n", " region=region,\n", " version=\"1.0-1\",\n", " py_version=\"py3\",\n", " instance_type=inference_instance_type,\n", " )\n", "step_register = RegisterModel(\n", " name=\"RegisterAbaloneModel\",\n", " estimator=xgb_train,\n", " image_uri=inference_image_uri,\n", " model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,\n", " content_types=[\"text/csv\"],\n", " response_types=[\"text/csv\"],\n", " inference_instances=[\"ml.t2.medium\", \"ml.m5.large\"],\n", " transform_instances=[\"ml.m5.large\"],\n", " model_package_group_name=model_package_group_name,\n", " approval_status=model_approval_status,\n", " model_metrics=model_metrics,\n", ")\n", "\n", "# condition step for evaluating model quality and branching execution\n", "cond_lte = ConditionLessThanOrEqualTo(\n", " left=JsonGet(\n", " step_name=step_eval.name, property_file=evaluation_report, json_path=\"regression_metrics.mse.value\"\n", " ),\n", " right=6.0,\n", ")\n", "step_cond = ConditionStep(\n", " name=\"CheckMSEAbaloneEvaluation\",\n", " conditions=[cond_lte],\n", " if_steps=[step_register],\n", " else_steps=[],\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Create and run the Pipeline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# pipeline instance\n", "pipeline = Pipeline(\n", " name=pipeline_name,\n", " parameters=[\n", " processing_instance_type,\n", " processing_instance_count,\n", " training_instance_type,\n", " model_approval_status,\n", " input_data,\n", " ],\n", " steps=[step_process, step_train, step_eval, step_cond],\n", " sagemaker_session=sagemaker_session,\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import json\n", "\n", "\n", "definition = json.loads(pipeline.definition())\n", "definition" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline.upsert(role_arn=role, description=f'{stage} pipelines for {project_name}')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline.start()" ] } ], "metadata": { "kernelspec": { "display_name": "conda_python3", "language": "python", "name": "conda_python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.13" } }, "nbformat": 4, "nbformat_minor": 4 }