# Build and Register an MXNet Image Classification Model via SageMaker Pipelines

In [None]:
!pip install --upgrade pip
!pip install --upgrade sagemaker

## 1. Create SageMaker session and client

In [None]:
import os
import logging
import boto3
import sagemaker

sess = sagemaker.Session()
bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = boto3.Session().region_name
boto_session = boto3.Session(region_name=region)

sm_client = boto3.Session().client(service_name="sagemaker", region_name=region)

## 2. Define SageMaker Pipeline parameters

In [None]:
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat
)


# Setting a SageMaker Pipeline Session is important to avoid pipeline steps from running before the pipeline is ready
sm_pipeline_session = PipelineSession(boto_session=boto_session, sagemaker_client=sm_client, default_bucket=bucket)

model_package_group_name = "MXNet-Image-Classification"  # Model name in model registry
prefix = "mxnet-image-classification-pipeline"
pipeline_name = "MXNetImageClassificationPipeline" 

###### TODO ######
input_img_data_s3_uri = "s3://TODO" # e.g. "s3://my-image-bucket"

input_data = ParameterString(
    name="InputData",
    default_value=input_img_data_s3_uri
)

processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)

processing_instance_type = ParameterString(name="ProcessingInstanceType", default_value="ml.t3.medium")

training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.p3.2xlarge")

train_split_percentage = ParameterFloat(
    name="TrainSplitPercentage",
    default_value=0.75
)

validation_split_percentage = ParameterFloat(
    name="ValidationSplitPercentage",
    default_value=0.10
)

test_split_percentage = ParameterFloat(
    name="TestSplitPercentage",
    default_value=0.15
)

## 3. Define image preprocessing step

In this ML workflow step, you will be converting the raw .jpg image files in the S3 input bucket to the Apache MXNet RecordIO format, which is the recommended input format for the Amazon SageMaker image classification algorithm. Upon completion, the newly generated .rec files will be split into train, validation, and test sets and uploaded back to the original S3 input bucket under a newly created `recordIO` folder. This step relies on the `preprocess.py` script found in the `scripts` directory.

In [None]:
from sagemaker.mxnet import MXNetProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep


mxnet_processor_preprocess = MXNetProcessor(
    framework_version="1.8.0",
    py_version="py37",
    instance_type=processing_instance_type.default_value,
    instance_count=processing_instance_count.default_value,
    base_job_name=f"{prefix}/preprocess-image-data",
    sagemaker_session=sm_pipeline_session,
    role=role
)

processing_inputs = [
    ProcessingInput(
        input_name="input_img_data", 
        source=input_data,
        destination="/opt/ml/processing/input/data/"
    )
]

processing_outputs = [
    ProcessingOutput(
        output_name="train", 
        source="/opt/ml/processing/train",
        destination=f"{input_data.default_value}/recordIO/train"
    ),
    ProcessingOutput(
        output_name="validation", 
        source="/opt/ml/processing/validation",
        destination=f"{input_data.default_value}/recordIO/validation"
    ),
    ProcessingOutput(
        output_name="test", 
        source="/opt/ml/processing/test",
        destination=f"{input_data.default_value}/recordIO/test"
    )
]

step_args = mxnet_processor_preprocess.run(
    code="./scripts/preprocess.py",
    inputs=processing_inputs,
    outputs=processing_outputs,
    arguments=[
        "--input-s3-bucket", 
        input_data.default_value,
        "--train-split-percentage",
        str(train_split_percentage.default_value),
        "--validation-split-percentage",
        str(validation_split_percentage.default_value),
        "--test-split-percentage",
        str(test_split_percentage.default_value)
    ]
)

step_preprocess = ProcessingStep(
    name="Preprocess-Image-Data",
    step_args=step_args,
)

## 4. Define model training step

In this ML workflow step, you will train an MXNet image classification model using the train and validation .rec files that were created in the previous step. For more information regarding the specific image classification algorithm, refer to [Image Classification - MXNet](https://docs.aws.amazon.com/sagemaker/latest/dg/image-classification.html) from the Amazon SageMaker documentation.

In [None]:
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep


model_output_path = f"s3://{bucket}/{prefix}/model-output"

image_uri = sagemaker.image_uris.retrieve(
    region=region,
    framework="image-classification"
)

mxnet_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    volume_size=50,
    max_run=360000,
    output_path=model_output_path,
    sagemaker_session=sm_pipeline_session,
    role=role
)

# Feel free to edit these model hyperparameters based on domain expertise
# For more information regarding image classification hyperparameters, refer to https://docs.aws.amazon.com/sagemaker/latest/dg/IC-Hyperparameter.html
mxnet_train.set_hyperparameters(
    use_pretrained_model=1,
    image_shape='3,224,224',
    num_classes=2,
    num_training_samples=750, ### TODO ### (total number of images uploaded * train_split_percentage) (1000 * 0.75)
    learning_rate=0.1,
    mini_batch_size=25,
    epochs=30,
    early_stopping=True
)

train_data = sagemaker.inputs.TrainingInput(
    s3_data=step_preprocess.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, 
    distribution='FullyReplicated', 
    content_type='application/x-recordio', 
    s3_data_type='S3Prefix'
)

validation_data = sagemaker.inputs.TrainingInput(
    s3_data=step_preprocess.properties.ProcessingOutputConfig.Outputs["validation"].S3Output.S3Uri, 
    distribution='FullyReplicated', 
    content_type='application/x-recordio', 
    s3_data_type='S3Prefix'
)

step_args = mxnet_train.fit(
    inputs={
        "train": train_data, 
        "validation": validation_data
    },
    logs=True
)

step_train = TrainingStep(
    name="Train-Image-Classification-Model",
    step_args=step_args
)

## 5. Define model evaluation step

In this ML workflow step, you will be evaluating the trained model (from the previous step) on the test .rec file. Specifically, you will be measuring the accuracy and F1 score on the test set. This step relies on the `evaluate.py` script found in the `scripts` directory.

In [None]:
from sagemaker.workflow.properties import PropertyFile

mxnet_processor_eval = MXNetProcessor(
    framework_version="1.8.0",
    py_version="py37",
    instance_type=processing_instance_type.default_value,
    instance_count=processing_instance_count.default_value,
    base_job_name=f"{prefix}/model-evaluation",
    sagemaker_session=sm_pipeline_session,
    role=role
    
)

processing_inputs = [
    ProcessingInput(
        source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
        destination="/opt/ml/processing/model"
    ),
    ProcessingInput(
        source=step_preprocess.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
        destination="/opt/ml/processing/test")
]

processing_outputs = [
    ProcessingOutput(
        output_name="evaluation", 
        source="/opt/ml/processing/evaluation",
        destination=f"s3://{bucket}/{prefix}/model-evaluation")
]

step_args=mxnet_processor_eval.run(
    code="./scripts/evaluate.py",
    inputs=processing_inputs,
    outputs=processing_outputs
)

evaluation_report = PropertyFile(
    name="Image-Classification-Model-Evaluation-Report",
    output_name="evaluation",
    path="evaluation.json"
)

step_eval = ProcessingStep(
    name="Evaluate-Image-Classification-Model",
    step_args=step_args,
    property_files=[evaluation_report]
)

## 6. Define model registering step

In this ML workflow step, you will conditionally register the trained model into SageMaker model registry only if the trained model accuracy (on the test set) is greater than or equal to 70%.



In [None]:
from sagemaker.model import Model
from sagemaker.model_metrics import MetricsSource, ModelMetrics 
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet


model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json"
    )
)

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sm_pipeline_session,
    role=role,
)

step_args = model.register(
    content_types=["image/png"],
    response_types=["application/json"],
    inference_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    model_metrics=model_metrics
)

step_register = ModelStep(
    name="Register-Image-Classification-Model",
    step_args=step_args
)

# Create a condition to register the model if the model accuracy is greater than 0.70
cond_gte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.accuracy.value"
    ),
    right=0.70,
)

# This step encompasses 'step_register' and only performs the 'step_register' if the model accuracy is greater than 0.70
step_cond = ConditionStep(
    name="Check-Accuracy-Image-Classification-Model",
    conditions=[cond_gte],
    if_steps=[step_register],
    else_steps=[],
)

## 7. Create SageMaker Pipeline

In this step, you define a SageMaker pipeline encompassing all the above ML workflow steps.

In [None]:
from sagemaker.workflow.pipeline import Pipeline
import json


pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        input_data,
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        train_split_percentage,
        validation_split_percentage,
        test_split_percentage
    ],
    steps=[
        step_preprocess, 
        step_train, 
        step_eval, 
        step_cond
    ],
    sagemaker_session=sm_pipeline_session
)

## 8. Start SageMaker Pipeline execution

In [None]:
# Submit the pipeline definition to the SageMaker Pipelines service to create a pipeline if it doesn't exist, or update the pipeline if it does
pipeline.upsert(role_arn=role)

# Start a pipeline execution
execution = pipeline.start()