import os import boto3 import sagemaker import sagemaker.session from sagemaker.estimator import Estimator from sagemaker.inputs import TrainingInput from sagemaker.metadata_properties import MetadataProperties from sagemaker.processing import ( ProcessingOutput, ) from sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.workflow.parameters import ( ParameterInteger, ParameterString, ) from sagemaker.workflow.pipeline import Pipeline from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig from sagemaker.workflow.step_collections import RegisterModel from sagemaker.workflow.steps import ( ProcessingStep, TrainingStep, ) BASE_DIR = os.path.dirname(os.path.realpath(__file__)) # 1 def get_session(region, default_bucket): boto_session = boto3.Session(region_name=region) sagemaker_client = boto_session.client("sagemaker") runtime_client = boto_session.client("sagemaker-runtime") return sagemaker.session.Session( boto_session=boto_session, sagemaker_client=sagemaker_client, sagemaker_runtime_client=runtime_client, default_bucket=default_bucket, ) def get_pipeline( region, model_package_group_name, pipeline_name, base_job_prefix, commit_id, role_arn, default_bucket=None, ): sagemaker_session = get_session(region, default_bucket) if role_arn is None: role_arn = sagemaker.session.get_execution_role(sagemaker_session) # parameters for pipeline execution processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1) processing_instance_type = ParameterString( name="ProcessingInstanceType", default_value="ml.m5.xlarge" ) training_instance_type = ParameterString( name="TrainingInstanceType", default_value="ml.m5.xlarge" ) model_approval_status = ParameterString( name="ModelApprovalStatus", default_value="PendingManualApproval" ) input_data = ParameterString( name="InputDataUrl", default_value=f"s3://sagemaker-servicecatalog-seedcode-{region}/dataset/abalone-dataset.csv", ) # processing step for feature engineering sklearn_processor = SKLearnProcessor( framework_version="0.23-1", instance_type=processing_instance_type, instance_count=processing_instance_count, base_job_name=f"{base_job_prefix}/sklearn-abalone-preprocess", sagemaker_session=sagemaker_session, role=role_arn, ) step_process = ProcessingStep( name="PreprocessAbaloneData", processor=sklearn_processor, outputs=[ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test"), ], code=os.path.join(BASE_DIR, "preprocess.py"), job_arguments=["--input-data", input_data], ) # training step for generating model artifacts model_path = f"s3://{sagemaker_session.default_bucket()}/{base_job_prefix}/AbaloneTrain" image_uri = sagemaker.image_uris.retrieve( framework="xgboost", region=region, version="1.0-1", py_version="py3", instance_type=training_instance_type, ) xgb_train = Estimator( image_uri=image_uri, instance_type=training_instance_type, instance_count=1, output_path=model_path, base_job_name=f"{base_job_prefix}/abalone-train", sagemaker_session=sagemaker_session, role=role_arn, ) xgb_train.set_hyperparameters( objective="reg:linear", num_round=1, max_depth=5, eta=0.2, gamma=4, min_child_weight=6, subsample=0.7, silent=0, ) step_train = TrainingStep( name="TrainAbaloneModel", estimator=xgb_train, inputs={ "train": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "train" ].S3Output.S3Uri, content_type="text/csv", ), "validation": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "validation" ].S3Output.S3Uri, content_type="text/csv", ), }, ) step_register = RegisterModel( name="RegisterAbaloneModel", estimator=xgb_train, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, content_types=["text/csv"], response_types=["text/csv"], inference_instances=["ml.t2.medium", "ml.m5.large"], transform_instances=["ml.m5.large"], model_package_group_name=model_package_group_name, approval_status=model_approval_status, description=f'commit_id={commit_id}', metadata_properties=MetadataProperties(commit_id=commit_id) ) # pipeline instance pipeline = Pipeline( name=pipeline_name, parameters=[ processing_instance_type, processing_instance_count, training_instance_type, model_approval_status, input_data, ], steps=[step_process, step_train, step_register], sagemaker_session=sagemaker_session, pipeline_experiment_config=PipelineExperimentConfig( experiment_name=model_package_group_name, trial_name=commit_id ) ) return pipeline