{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Using SageMaker pipelines for MLOps workflows\n", "\n", "This notebook contains end-to-end code to construct and execute a secure MLOps pipeline in your data science environment. It contains all necessary all code in one place. You can use and modify this code for your experiments and tests.\n", " \n" ] }, { "cell_type": "code", "execution_count": 58, "metadata": {}, "outputs": [], "source": [ "if False:\n", " !pip install --disable-pip-version-check -q sagemaker==2.47.1" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!python --version" ] }, { "cell_type": "code", "execution_count": 60, "metadata": {}, "outputs": [], "source": [ "if False:\n", " !pip install -U sagemaker" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import boto3\n", "import sagemaker\n", "import sagemaker.session\n", "import json\n", "\n", "print(f\"SageMaker version: {sagemaker.__version__}\")" ] }, { "cell_type": "code", "execution_count": 30, "metadata": {}, "outputs": [], "source": [ "sm = boto3.client(\"sagemaker\")\n", "ssm = boto3.client(\"ssm\")\n", "\n", "def get_environment(project_name, ssm_params):\n", " r = sm.describe_domain(\n", " DomainId=sm.describe_project(\n", " ProjectName=project_name\n", " )[\"CreatedBy\"][\"DomainId\"]\n", " )\n", " del r[\"ResponseMetadata\"]\n", " del r[\"CreationTime\"]\n", " del r[\"LastModifiedTime\"]\n", " r = {**r, **r[\"DefaultUserSettings\"]}\n", " del r[\"DefaultUserSettings\"]\n", "\n", " i = {\n", " **r,\n", " **{t[\"Key\"]:t[\"Value\"] \n", " for t in sm.list_tags(ResourceArn=r[\"DomainArn\"])[\"Tags\"] \n", " if t[\"Key\"] in [\"EnvironmentName\", \"EnvironmentType\"]}\n", " }\n", "\n", " for p in ssm_params:\n", " try:\n", " i[p[\"VariableName\"]] = ssm.get_parameter(Name=f\"{i['EnvironmentName']}-{i['EnvironmentType']}-{p['ParameterName']}\")[\"Parameter\"][\"Value\"]\n", " except:\n", " i[p[\"VariableName\"]] = \"\"\n", "\n", " return i\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
💡 Get environment variables \n", "\n", "Set the `project_name` to the name of the current SageMaker project.\n", "Various environment data is loaded and shown:\n", "
" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# set to the specific project name to setup the environment\n", "project_name = \n", "project_id = sm.describe_project(ProjectName=project_name)['ProjectId']\n", "\n", "# Dynamically load environmental SSM parameters - provide the list of the variables to load from SSM parameter store\n", "ssm_parameters = [\n", " {\"VariableName\":\"DataBucketName\", \"ParameterName\":\"data-bucket-name\"},\n", " {\"VariableName\":\"ModelBucketName\", \"ParameterName\":\"model-bucket-name\"},\n", " {\"VariableName\":\"S3KmsKeyId\", \"ParameterName\":\"kms-s3-key-arn\"},\n", " {\"VariableName\":\"EbsKmsKeyArn\", \"ParameterName\":\"kms-ebs-key-arn\"},\n", " {\"VariableName\":\"PipelineExecutionRole\", \"ParameterName\":\"sm-pipeline-execution-role-arn\"},\n", "]\n", "\n", "env_data = get_environment(project_name=project_name, ssm_params=ssm_parameters)\n", "print(f\"Environment data:\\n{json.dumps(env_data, indent=2)}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pipelines.abalone.pipeline import get_session\n", "\n", "sagemaker_session = get_session(boto3.Session().region_name, env_data[\"DataBucketName\"])\n", "\n", "region = boto3.Session().region_name\n", "pipeline_role = env_data[\"PipelineExecutionRole\"]\n", "processing_role = env_data[\"ExecutionRole\"]\n", "training_role = env_data[\"ExecutionRole\"]\n", "data_bucket = sagemaker_session.default_bucket()\n", "model_bucket = env_data[\"ModelBucketName\"]\n", "ebs_kms_id = env_data[\"EbsKmsKeyArn\"]\n", "s3_kms_id = env_data[\"S3KmsKeyId\"]\n", "\n", "print(f\"SageMaker version: {sagemaker.__version__}\")\n", "print(f\"Region: {region}\")\n", "\n", "# Change these to reflect your project/business name\n", "model_package_group_name = f\"{project_name}-{project_id}\"\n", "pipeline_name = f\"{project_name}-{project_id}\"" ] }, { "cell_type": "code", "execution_count": 33, "metadata": {}, "outputs": [], "source": [ "import os\n", "\n", "from sagemaker.estimator import Estimator\n", "from sagemaker.inputs import TrainingInput\n", "from sagemaker.model_metrics import (\n", " MetricsSource,\n", " ModelMetrics,\n", ")\n", "from sagemaker.processing import (\n", " ProcessingInput,\n", " ProcessingOutput,\n", " ScriptProcessor,\n", ")\n", "from sagemaker.sklearn.processing import SKLearnProcessor\n", "from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo\n", "from sagemaker.workflow.functions import JsonGet\n", "from sagemaker.workflow.condition_step import (\n", " ConditionStep\n", ")\n", "from sagemaker.workflow.parameters import (\n", " ParameterInteger,\n", " ParameterString,\n", ")\n", "from sagemaker.workflow.pipeline import Pipeline\n", "from sagemaker.workflow.properties import PropertyFile\n", "from sagemaker.workflow.steps import (\n", " ProcessingStep,\n", " TrainingStep,\n", ")\n", "from sagemaker.workflow.step_collections import RegisterModel\n", "from sagemaker.network import NetworkConfig\n", "\n", "BASE_DIR=\"./pipelines/abalone/\"" ] }, { "cell_type": "code", "execution_count": 34, "metadata": {}, "outputs": [], "source": [ " # parameters for pipeline execution\n", "processing_instance_count = ParameterInteger(name=\"ProcessingInstanceCount\", default_value=1)\n", "processing_instance_type = ParameterString(\n", " name=\"ProcessingInstanceType\", default_value=\"ml.m5.xlarge\"\n", ")\n", "training_instance_type = ParameterString(\n", " name=\"TrainingInstanceType\", default_value=\"ml.m5.xlarge\"\n", ")\n", "model_approval_status = ParameterString(\n", " name=\"ModelApprovalStatus\", default_value=\"PendingManualApproval\"\n", ")\n", "input_data = ParameterString(\n", " name=\"InputDataUrl\",\n", " default_value=f\"s3://{sagemaker_session.default_bucket()}/datasets/abalone-dataset.csv\",\n", ")" ] }, { "cell_type": "code", "execution_count": 35, "metadata": {}, "outputs": [], "source": [ "network_config = NetworkConfig(\n", " enable_network_isolation=False, \n", " security_group_ids=env_data[\"SecurityGroups\"],\n", " subnets=env_data[\"SubnetIds\"],\n", " encrypt_inter_container_traffic=True)" ] }, { "cell_type": "code", "execution_count": 36, "metadata": {}, "outputs": [], "source": [ "base_job_prefix=\"Abalone\"\n", "\n", "# processing step for feature engineering\n", "sklearn_processor = SKLearnProcessor(\n", " framework_version=\"0.23-1\",\n", " instance_type=processing_instance_type,\n", " instance_count=processing_instance_count,\n", " base_job_name=f\"{base_job_prefix}/sklearn-abalone-preprocess\",\n", " sagemaker_session=sagemaker_session,\n", " role=processing_role,\n", " network_config=network_config,\n", " volume_kms_key=ebs_kms_id,\n", " output_kms_key=s3_kms_id\n", ")" ] }, { "cell_type": "code", "execution_count": 37, "metadata": {}, "outputs": [], "source": [ " step_process = ProcessingStep(\n", " name=\"PreprocessAbaloneData\",\n", " processor=sklearn_processor,\n", " outputs=[\n", " ProcessingOutput(output_name=\"train\", source=\"/opt/ml/processing/train\"),\n", " ProcessingOutput(output_name=\"validation\", source=\"/opt/ml/processing/validation\"),\n", " ProcessingOutput(output_name=\"test\", source=\"/opt/ml/processing/test\"),\n", " ],\n", " code=os.path.join(BASE_DIR, \"preprocess.py\"),\n", " job_arguments=[\"--input-data\", input_data],\n", " )" ] }, { "cell_type": "code", "execution_count": 38, "metadata": {}, "outputs": [], "source": [ "# training step for generating model artifacts\n", "model_path = f\"s3://{model_bucket}/{base_job_prefix}/AbaloneTrain\"\n", "image_uri = sagemaker.image_uris.retrieve(\n", " framework=\"xgboost\",\n", " region=region,\n", " version=\"1.0-1\",\n", " py_version=\"py3\",\n", " instance_type=training_instance_type,\n", ")\n", "xgb_train = Estimator(\n", " image_uri=image_uri,\n", " instance_type=training_instance_type,\n", " instance_count=1,\n", " output_path=model_path,\n", " base_job_name=f\"{base_job_prefix}/abalone-train\",\n", " sagemaker_session=sagemaker_session,\n", " role=training_role,\n", " subnets=network_config.subnets,\n", " security_group_ids=network_config.security_group_ids,\n", " encrypt_inter_container_traffic=True,\n", " enable_network_isolation=False,\n", " volume_kms_key=ebs_kms_id,\n", " output_kms_key=s3_kms_id\n", ")\n", "xgb_train.set_hyperparameters(\n", " objective=\"reg:linear\",\n", " num_round=50,\n", " max_depth=5,\n", " eta=0.2,\n", " gamma=4,\n", " min_child_weight=6,\n", " subsample=0.7,\n", " silent=0,\n", " )" ] }, { "cell_type": "code", "execution_count": 39, "metadata": {}, "outputs": [], "source": [ "step_train = TrainingStep(\n", " name=\"TrainAbaloneModel\",\n", " estimator=xgb_train,\n", " inputs={\n", " \"train\": TrainingInput(\n", " s3_data=step_process.properties.ProcessingOutputConfig.Outputs[\n", " \"train\"\n", " ].S3Output.S3Uri,\n", " content_type=\"text/csv\",\n", " ),\n", " \"validation\": TrainingInput(\n", " s3_data=step_process.properties.ProcessingOutputConfig.Outputs[\n", " \"validation\"\n", " ].S3Output.S3Uri,\n", " content_type=\"text/csv\",\n", " ),\n", " },\n", ")" ] }, { "cell_type": "code", "execution_count": 40, "metadata": {}, "outputs": [], "source": [ "# processing step for evaluation\n", "script_eval = ScriptProcessor(\n", " image_uri=image_uri,\n", " command=[\"python3\"],\n", " instance_type=processing_instance_type,\n", " instance_count=1,\n", " base_job_name=f\"{base_job_prefix}/script-abalone-eval\",\n", " sagemaker_session=sagemaker_session,\n", " role=processing_role,\n", " network_config=network_config,\n", " volume_kms_key=ebs_kms_id,\n", " output_kms_key=s3_kms_id\n", ")" ] }, { "cell_type": "code", "execution_count": 41, "metadata": {}, "outputs": [], "source": [ "evaluation_report = PropertyFile(\n", " name=\"AbaloneEvaluationReport\",\n", " output_name=\"evaluation\",\n", " path=\"evaluation.json\",\n", " )" ] }, { "cell_type": "code", "execution_count": 42, "metadata": {}, "outputs": [], "source": [ "step_eval = ProcessingStep(\n", " name=\"EvaluateAbaloneModel\",\n", " processor=script_eval,\n", " inputs=[\n", " ProcessingInput(\n", " source=step_train.properties.ModelArtifacts.S3ModelArtifacts,\n", " destination=\"/opt/ml/processing/model\",\n", " ),\n", " ProcessingInput(\n", " source=step_process.properties.ProcessingOutputConfig.Outputs[\n", " \"test\"\n", " ].S3Output.S3Uri,\n", " destination=\"/opt/ml/processing/test\",\n", " ),\n", " ],\n", " outputs=[\n", " ProcessingOutput(output_name=\"evaluation\", source=\"/opt/ml/processing/evaluation\"),\n", " ],\n", " code=os.path.join(BASE_DIR, \"evaluate.py\"),\n", " property_files=[evaluation_report],\n", " )" ] }, { "cell_type": "code", "execution_count": 43, "metadata": {}, "outputs": [], "source": [ "# register model step that will be conditionally executed\n", "model_metrics = ModelMetrics(\n", " model_statistics=MetricsSource(\n", " s3_uri=\"{}/evaluation.json\".format(\n", " step_eval.arguments[\"ProcessingOutputConfig\"][\"Outputs\"][0][\"S3Output\"][\"S3Uri\"]\n", " ),\n", " content_type=\"application/json\"\n", " )\n", ")" ] }, { "cell_type": "code", "execution_count": 44, "metadata": {}, "outputs": [], "source": [ "vpc_config = {\n", " \"Subnets\":network_config.subnets,\n", " \"SecurityGroupIds\":network_config.security_group_ids\n", "}" ] }, { "cell_type": "code", "execution_count": 45, "metadata": {}, "outputs": [], "source": [ "\"\"\"\n", "There is a bug in RegisterModel implementation\n", "The RegisterModel step is implemented in the SDK as two steps, a _RepackModelStep and a _RegisterModelStep. \n", "The _RepackModelStep runs a SKLearn training step in order to repack the model.tar.gz to include any custom inference code in the archive. \n", "The _RegisterModelStep then registers the repacked model.\n", "\n", "The problem is that the _RepackModelStep does not propagate VPC configuration from the Estimator object:\n", "https://github.com/aws/sagemaker-python-sdk/blob/cdb633b3ab02398c3b77f5ecd2c03cdf41049c78/src/sagemaker/workflow/_utils.py#L88\n", "\n", "This cause the AccessDenied exception because repacker cannot access S3 bucket (all access which is not via VPC endpoint is blocked by the bucket policy)\n", "\n", "The issue is opened against SageMaker module: https://github.com/aws/sagemaker-python-sdk/issues/2302\n", "\"\"\"\n", "\n", "step_register = RegisterModel(\n", " name=\"RegisterAbaloneModel\",\n", " estimator=xgb_train,\n", " model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,\n", " content_types=[\"text/csv\"],\n", " response_types=[\"text/csv\"],\n", " inference_instances=[\"ml.t2.medium\", \"ml.m5.large\"],\n", " transform_instances=[\"ml.m5.large\"],\n", " model_package_group_name=model_package_group_name,\n", " approval_status=model_approval_status,\n", " model_metrics=model_metrics,\n", " vpc_config_override=vpc_config\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "xgb_train.get_vpc_config()" ] }, { "cell_type": "code", "execution_count": 51, "metadata": {}, "outputs": [], "source": [ "# condition step for evaluating model quality and branching execution\n", "cond_lte = ConditionLessThanOrEqualTo(\n", " left=JsonGet(\n", " step_name=step_eval.name,\n", " property_file=evaluation_report,\n", " json_path=\"regression_metrics.mse.value\"\n", " ),\n", " right=6.0,\n", ")\n", "step_cond = ConditionStep(\n", " name=\"CheckMSEAbaloneEvaluation\",\n", " conditions=[cond_lte],\n", " if_steps=[step_register],\n", " else_steps=[],\n", ")" ] }, { "cell_type": "code", "execution_count": 52, "metadata": {}, "outputs": [], "source": [ "# pipeline instance\n", "pipeline = Pipeline(\n", " name=pipeline_name,\n", " parameters=[\n", " processing_instance_type,\n", " processing_instance_count,\n", " training_instance_type,\n", " model_approval_status,\n", " input_data,\n", " ],\n", " steps=[step_process, step_train, step_eval, step_cond],\n", " sagemaker_session=sagemaker_session,\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline.upsert(role_arn=pipeline_role)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "parsed = json.loads(pipeline.definition())\n", "print(json.dumps(parsed, indent=2, sort_keys=True))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The following line starts the pipeline execution. In this specific example it runs for about 13 minutes." ] }, { "cell_type": "code", "execution_count": 55, "metadata": {}, "outputs": [], "source": [ "execution = pipeline.start()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "execution.describe()" ] }, { "cell_type": "code", "execution_count": 57, "metadata": {}, "outputs": [], "source": [ "execution.wait()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Clean up" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Delete SageMaker project\n", "This will delete the associated CloudFormation stack and CodeCommit repository" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(f\"Deleting project {project_name}:{sm.delete_project(ProjectName=project_name)}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Delete project S3 bucket \n", "This will remove all files and S3 bucket" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!aws s3 rb s3://sm-mlops-cp-{project_name}-{project_id} --force" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Release resources" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%html\n", "\n", "

Shutting down your kernel for this notebook to release resources.

\n", "\n", " \n", "" ] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 4 }