# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # # SPDX-License-Identifier: MIT-0 # # Permission is hereby granted, free of charge, to any person obtaining a copy of this # software and associated documentation files (the "Software"), to deal in the Software # without restriction, including without limitation the rights to use, copy, modify, # merge, publish, distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A # PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """Example workflow pipeline script for abalone pipeline. . -RegisterModel . Process-> Train -> Evaluate -> Condition . . . -(stop) Implements a get_pipeline(**kwargs) method. """ import os import boto3 import logging import sagemaker import sagemaker.session from sagemaker.estimator import Estimator from sagemaker.inputs import TrainingInput from sagemaker.model_metrics import ( MetricsSource, ModelMetrics, ) from sagemaker.processing import ( ProcessingInput, ProcessingOutput, ScriptProcessor, ) from sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo from sagemaker.workflow.condition_step import ( ConditionStep, ) from sagemaker.workflow.functions import ( JsonGet, ) from sagemaker.workflow.parameters import ( ParameterInteger, ParameterString, ) from sagemaker.workflow.pipeline import Pipeline from sagemaker.workflow.properties import PropertyFile from sagemaker.workflow.steps import ( ProcessingStep, TrainingStep, ) from sagemaker.workflow.step_collections import RegisterModel from botocore.exceptions import ClientError from sagemaker.network import NetworkConfig # BASE_DIR = os.path.dirname(os.path.realpath(__file__)) logger = logging.getLogger(__name__) def get_session(region, default_bucket): """Gets the sagemaker session based on the region. Args: region: the aws region to start the session default_bucket: the bucket to use for storing the artifacts Returns: `sagemaker.session.Session instance """ boto_session = boto3.Session(region_name=region) sagemaker_client = boto_session.client("sagemaker") runtime_client = boto_session.client("sagemaker-runtime") session = sagemaker.session.Session( boto_session=boto_session, sagemaker_client=sagemaker_client, sagemaker_runtime_client=runtime_client, default_bucket=default_bucket, ) return session def get_pipeline( region, role=None, default_bucket=None, bucket_kms_id=None, model_package_group_name="AbalonePackageGroup", pipeline_name="AbalonePipeline", base_job_prefix="Abalone", project_id="SageMakerProjectId", git_hash="", ecr_repo_uri="", default_input_data="", ): """Gets a SageMaker ML Pipeline instance working with on abalone data. Args: region: AWS region to create and run the pipeline. role: IAM role to create and run steps and pipeline. default_bucket: the bucket to use for storing the artifacts git_hash: the hash id of the current commit. Used to determine which docker image version to use ecr_repo_uri: uri of the ECR repository used by this project default_input_data: s3 location with data to be used by pipeline Returns: an instance of a pipeline """ sagemaker_session = get_session(region, default_bucket) if role is None: role = sagemaker.session.get_execution_role(sagemaker_session) # parameters for pipeline execution processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1) processing_instance_type = ParameterString(name="ProcessingInstanceType", default_value="ml.m5.xlarge") training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge") inference_instance_type = ParameterString(name="InferenceInstanceType", default_value="ml.m5.xlarge") model_approval_status = ParameterString(name="ModelApprovalStatus", default_value="PendingManualApproval") input_data = ParameterString( name="InputDataUrl", default_value=default_input_data, ) processing_image_uri = f"{ecr_repo_uri}:processing-{git_hash}" training_image_uri = f"{ecr_repo_uri}:training-{git_hash}" inference_image_uri = f"{ecr_repo_uri}:training-{git_hash}" # network_config = NetworkConfig( # enable_network_isolation=True, # security_group_ids=security_group_ids, # subnets=subnets, # encrypt_inter_container_traffic=True, # ) script_processor = ScriptProcessor( image_uri=processing_image_uri, instance_type=processing_instance_type, instance_count=processing_instance_count, base_job_name=f"{base_job_prefix}/byoc-abalone-preprocess", command=["Rscript"], sagemaker_session=sagemaker_session, role=role, output_kms_key=bucket_kms_id, ) step_process = ProcessingStep( name="PreprocessAbaloneData", processor=script_processor, inputs=[ProcessingInput(source=input_data, destination="/opt/ml/processing/input")], outputs=[ ProcessingOutput(output_name="train", source="/opt/ml/processing/output/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/output/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/output/test"), ], code="source_scripts/preprocessing/prepare_abalone_data/preprocessing.R", # we must figure out this path to get it from step_source directory ) # training step for generating model artifacts model_path = f"s3://{default_bucket}/{base_job_prefix}/AbaloneTrain" train_estimator = Estimator( image_uri=training_image_uri, instance_type=training_instance_type, instance_count=1, output_path=model_path, base_job_name=f"{base_job_prefix}/abalone-train", sagemaker_session=sagemaker_session, role=role, output_kms_key=bucket_kms_id, source_dir="source_scripts/training/", entry_point="train.R", metric_definitions=[{"Name": "rmse-validation", "Regex": "Calculated validation RMSE: ([0-9.]+);.*$"}], ) step_train = TrainingStep( name="TrainAbaloneModel", estimator=train_estimator, inputs={ "train": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, content_type="text/csv", ), "validation": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs["validation"].S3Output.S3Uri, content_type="text/csv", ), }, ) # processing step for evaluation script_eval = ScriptProcessor( image_uri=training_image_uri, command=["Rscript"], instance_type=processing_instance_type, instance_count=1, base_job_name=f"{base_job_prefix}/script-abalone-eval", sagemaker_session=sagemaker_session, role=role, output_kms_key=bucket_kms_id, ) evaluation_report = PropertyFile( name="AbaloneEvaluationReport", output_name="evaluation", path="evaluation.json", ) step_eval = ProcessingStep( name="EvaluateAbaloneModel", processor=script_eval, inputs=[ ProcessingInput( source=step_train.properties.ModelArtifacts.S3ModelArtifacts, destination="/opt/ml/processing/model", ), ProcessingInput( source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri, destination="/opt/ml/processing/test", ), ], outputs=[ ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"), ], code="source_scripts/evaluate/evaluation.R", property_files=[evaluation_report], ) # register model step that will be conditionally executed model_metrics = ModelMetrics( model_statistics=MetricsSource( s3_uri="{}/evaluation.json".format( step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"] ), content_type="application/json", ) ) step_register = RegisterModel( name="RegisterAbaloneModel", estimator=train_estimator, image_uri=inference_image_uri, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, content_types=["application/json"], response_types=["application/json"], inference_instances=["ml.t2.medium", "ml.m5.large"], transform_instances=["ml.m5.large"], model_package_group_name=model_package_group_name, approval_status=model_approval_status, model_metrics=model_metrics, ) # condition step for evaluating model quality and branching execution cond_lte = ConditionLessThanOrEqualTo( left=JsonGet( step_name=step_eval.name, property_file=evaluation_report, json_path="regression_metrics.rmse.value" ), right=6.0, ) step_cond = ConditionStep( name="CheckMSEAbaloneEvaluation", conditions=[cond_lte], if_steps=[step_register], else_steps=[], ) # pipeline instance pipeline = Pipeline( name=pipeline_name, parameters=[ processing_instance_type, processing_instance_count, training_instance_type, model_approval_status, input_data, ], steps=[step_process, step_train, step_eval, step_cond], sagemaker_session=sagemaker_session, ) return pipeline