# Using SageMaker pipelines for MLOps workflows

This notebook contains end-to-end code to construct and execute a secure MLOps pipeline in your data science environment. It contains all necessary all code in one place. You can use and modify this code for your experiments and tests.
  


In [58]:
if False:
    !pip install --disable-pip-version-check -q sagemaker==2.47.1

In [None]:
!python --version

In [60]:
if False:
    !pip install -U sagemaker

In [None]:
import boto3
import sagemaker
import sagemaker.session
import json

print(f"SageMaker version: {sagemaker.__version__}")

In [30]:
sm = boto3.client("sagemaker")
ssm = boto3.client("ssm")

def get_environment(project_name, ssm_params):
    r = sm.describe_domain(
            DomainId=sm.describe_project(
                ProjectName=project_name
                )["CreatedBy"]["DomainId"]
        )
    del r["ResponseMetadata"]
    del r["CreationTime"]
    del r["LastModifiedTime"]
    r = {**r, **r["DefaultUserSettings"]}
    del r["DefaultUserSettings"]

    i = {
        **r,
        **{t["Key"]:t["Value"] 
            for t in sm.list_tags(ResourceArn=r["DomainArn"])["Tags"] 
            if t["Key"] in ["EnvironmentName", "EnvironmentType"]}
    }

    for p in ssm_params:
        try:
            i[p["VariableName"]] = ssm.get_parameter(Name=f"{i['EnvironmentName']}-{i['EnvironmentType']}-{p['ParameterName']}")["Parameter"]["Value"]
        except:
            i[p["VariableName"]] = ""

    return i


<div class="alert alert-info"> ðŸ’¡ <strong> Get environment variables </strong>

Set the <b>`project_name`</b> to the name of the current SageMaker project.
Various environment data is loaded and shown:
</div>

In [None]:
# set to the specific project name to setup the environment
project_name = <PROJECT NAME>
project_id = sm.describe_project(ProjectName=project_name)['ProjectId']

# Dynamically load environmental SSM parameters - provide the list of the variables to load from SSM parameter store
ssm_parameters = [
    {"VariableName":"DataBucketName", "ParameterName":"data-bucket-name"},
    {"VariableName":"ModelBucketName", "ParameterName":"model-bucket-name"},
    {"VariableName":"S3KmsKeyId", "ParameterName":"kms-s3-key-arn"},
    {"VariableName":"EbsKmsKeyArn", "ParameterName":"kms-ebs-key-arn"},
    {"VariableName":"PipelineExecutionRole", "ParameterName":"sm-pipeline-execution-role-arn"},
]

env_data = get_environment(project_name=project_name, ssm_params=ssm_parameters)
print(f"Environment data:\n{json.dumps(env_data, indent=2)}")

In [None]:
from pipelines.abalone.pipeline import get_session

sagemaker_session = get_session(boto3.Session().region_name, env_data["DataBucketName"])

region = boto3.Session().region_name
pipeline_role = env_data["PipelineExecutionRole"]
processing_role = env_data["ExecutionRole"]
training_role = env_data["ExecutionRole"]
data_bucket = sagemaker_session.default_bucket()
model_bucket = env_data["ModelBucketName"]
ebs_kms_id = env_data["EbsKmsKeyArn"]
s3_kms_id = env_data["S3KmsKeyId"]

print(f"SageMaker version: {sagemaker.__version__}")
print(f"Region: {region}")

# Change these to reflect your project/business name
model_package_group_name = f"{project_name}-{project_id}"
pipeline_name = f"{project_name}-{project_id}"

In [33]:
import os

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
from sagemaker.model_metrics import (
    MetricsSource,
    ModelMetrics,
)
from sagemaker.processing import (
    ProcessingInput,
    ProcessingOutput,
    ScriptProcessor,
)
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.functions import JsonGet
from sagemaker.workflow.condition_step import (
    ConditionStep
)
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.steps import (
    ProcessingStep,
    TrainingStep,
)
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.network import NetworkConfig

BASE_DIR="./pipelines/abalone/"

In [34]:
 # parameters for pipeline execution
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType", default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType", default_value="ml.m5.xlarge"
)
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)
input_data = ParameterString(
    name="InputDataUrl",
    default_value=f"s3://{sagemaker_session.default_bucket()}/datasets/abalone-dataset.csv",
)

In [35]:
network_config = NetworkConfig(
        enable_network_isolation=False, 
        security_group_ids=env_data["SecurityGroups"],
        subnets=env_data["SubnetIds"],
        encrypt_inter_container_traffic=True)

In [36]:
base_job_prefix="Abalone"

# processing step for feature engineering
sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name=f"{base_job_prefix}/sklearn-abalone-preprocess",
    sagemaker_session=sagemaker_session,
    role=processing_role,
    network_config=network_config,
    volume_kms_key=ebs_kms_id,
    output_kms_key=s3_kms_id
)

In [37]:
 step_process = ProcessingStep(
        name="PreprocessAbaloneData",
        processor=sklearn_processor,
        outputs=[
            ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
            ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
            ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
        ],
        code=os.path.join(BASE_DIR, "preprocess.py"),
        job_arguments=["--input-data", input_data],
    )

In [38]:
# training step for generating model artifacts
model_path = f"s3://{model_bucket}/{base_job_prefix}/AbaloneTrain"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type=training_instance_type,
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    base_job_name=f"{base_job_prefix}/abalone-train",
    sagemaker_session=sagemaker_session,
    role=training_role,
    subnets=network_config.subnets,
    security_group_ids=network_config.security_group_ids,
    encrypt_inter_container_traffic=True,
    enable_network_isolation=False,
    volume_kms_key=ebs_kms_id,
    output_kms_key=s3_kms_id
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
    silent=0,
    )

In [39]:
step_train = TrainingStep(
    name="TrainAbaloneModel",
    estimator=xgb_train,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    },
)

In [40]:
# processing step for evaluation
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name=f"{base_job_prefix}/script-abalone-eval",
    sagemaker_session=sagemaker_session,
    role=processing_role,
    network_config=network_config,
    volume_kms_key=ebs_kms_id,
    output_kms_key=s3_kms_id
)

In [41]:
evaluation_report = PropertyFile(
        name="AbaloneEvaluationReport",
        output_name="evaluation",
        path="evaluation.json",
    )

In [42]:
step_eval = ProcessingStep(
        name="EvaluateAbaloneModel",
        processor=script_eval,
        inputs=[
            ProcessingInput(
                source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
                destination="/opt/ml/processing/model",
            ),
            ProcessingInput(
                source=step_process.properties.ProcessingOutputConfig.Outputs[
                    "test"
                ].S3Output.S3Uri,
                destination="/opt/ml/processing/test",
            ),
        ],
        outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
        ],
        code=os.path.join(BASE_DIR, "evaluate.py"),
        property_files=[evaluation_report],
    )

In [43]:
# register model step that will be conditionally executed
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json"
    )
)

In [44]:
vpc_config = {
    "Subnets":network_config.subnets,
    "SecurityGroupIds":network_config.security_group_ids
}

In [45]:
"""
There is a bug in RegisterModel implementation
The RegisterModel step is implemented in the SDK as two steps, a _RepackModelStep and a _RegisterModelStep. 
The _RepackModelStep runs a SKLearn training step in order to repack the model.tar.gz to include any custom inference code in the archive. 
The _RegisterModelStep then registers the repacked model.

The problem is that the _RepackModelStep does not propagate VPC configuration from the Estimator object:
https://github.com/aws/sagemaker-python-sdk/blob/cdb633b3ab02398c3b77f5ecd2c03cdf41049c78/src/sagemaker/workflow/_utils.py#L88

This cause the AccessDenied exception because repacker cannot access S3 bucket (all access which is not via VPC endpoint is blocked by the bucket policy)

The issue is opened against SageMaker module: https://github.com/aws/sagemaker-python-sdk/issues/2302
"""

step_register = RegisterModel(
    name="RegisterAbaloneModel",
    estimator=xgb_train,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.large"],
    transform_instances=["ml.m5.large"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
    vpc_config_override=vpc_config
)

In [None]:
xgb_train.get_vpc_config()

In [51]:
# condition step for evaluating model quality and branching execution
cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value"
    ),
    right=6.0,
)
step_cond = ConditionStep(
    name="CheckMSEAbaloneEvaluation",
    conditions=[cond_lte],
    if_steps=[step_register],
    else_steps=[],
)

In [52]:
# pipeline instance
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type,
        processing_instance_count,
        training_instance_type,
        model_approval_status,
        input_data,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
    sagemaker_session=sagemaker_session,
)

In [None]:
pipeline.upsert(role_arn=pipeline_role)

In [None]:
parsed = json.loads(pipeline.definition())
print(json.dumps(parsed, indent=2, sort_keys=True))

The following line starts the pipeline execution. In this specific example it runs for about 13 minutes.

In [55]:
execution = pipeline.start()

In [None]:
execution.describe()

In [57]:
execution.wait()

## Clean up

### Delete SageMaker project
This will delete the associated CloudFormation stack and CodeCommit repository

In [None]:
print(f"Deleting project {project_name}:{sm.delete_project(ProjectName=project_name)}")

### Delete project S3 bucket 
This will remove all files and S3 bucket

In [None]:
!aws s3 rb s3://sm-mlops-cp-{project_name}-{project_id} --force

## Release resources

In [None]:
%%html

<p><b>Shutting down your kernel for this notebook to release resources.</b></p>
<button class="sm-command-button" data-commandlinker-command="kernelmenu:shutdown" style="display:none;">Shutdown Kernel</button>
        
<script>
try {
    els = document.getElementsByClassName("sm-command-button");
    els[0].click();
}
catch(err) {
    // NoOp
}    
</script>