# Defect Detection: Image Classification - Pipeline Execution

In this notebook, we will use the pipeline configured in the included python package under `pipelines` together with the defined code for preprocessing and training to automate the model training. It is easy to use such that you can simple drop in whatever input data for image classification you want and have it train a model automatically.

### Expected data format

The expected data format for image classification is .png or .jpg images sorted into a "normal" or "anomalous" prefix in S3. Thus, the `InputData` parameter of the pipeline needs to point to an S3 prefix which contains "folders" (S3 prefixes") named "normal" and "anomalous". These paths will be used by the preprocessing script to create a RecordIO training data set.

In [None]:
import boto3
import sagemaker
import time
import uuid
import json

iot_client = boto3.client('iot')
sts_client = boto3.client('sts')
sm_client = boto3.client('sagemaker')

# Get the account id
account_id = sts_client.get_caller_identity()["Account"]

# Project Name as defined in your CloudFormation template
PROJECT_NAME = ''

region = boto3.Session().region_name
role = sagemaker.get_execution_role()
bucket_name = 'sm-edge-workshop-%s-%s' % (PROJECT_NAME, account_id)

# Change these to reflect your project/business name or if you want to separate ModelPackageGroup/Pipeline from the rest of your team
model_package_group_name = 'defect-detection-img-classification-%s' % PROJECT_NAME
job_prefix = 'defect-detection-img-classification'
pipeline_name = 'defect-detection-img-clf-pipeline-%s' % PROJECT_NAME

### Getting the pipeline definition

We use the `get_pipeline` method to create a pipeline DAG definition with our provided input. The input provided here is fixed for each pipeline you create or update, you cannot change these parameters with each execution (see usage of parameters in the cell below).

In [None]:
from pipelines.image_classification.pipeline import get_pipeline

pipeline = get_pipeline(
 region=region,
 role=role,
 default_bucket=bucket_name,
 pipeline_name=pipeline_name,
 base_job_prefix=job_prefix
)

### Creating the pipeline

We create the pipeline (or update it in case it exists) with the previously defined DAG definition.

In [None]:
pipeline.upsert(role_arn=role)

### Starting the pipeline execution

We now start the exeuction of the pipeline with a given set of parameters which we can alter for every execution.

In [None]:
input_data_path = 's3://%s/' % bucket_name

execution = pipeline.start(
 parameters=dict(
 InputData=input_data_path,
 TrainingInstanceType="ml.p3.2xlarge",
 ModelApprovalStatus="Approved",
 ModelPackageGroupName=model_package_group_name,
 TargetImageSize="224",
 AugmentCountAnomalous="1000"
 )
)

### Check progress

After execution started, you can always check the progress of your pipeline execution either by looking at the processing and training jobs in the SageMaker Console, using the built-in SageMaker Studio Pipeline visualization tools or using SDK methods like below.

In [None]:
execution.describe()

## Preparing trained model for edge

Please proceed here only, if the execution of the training pipeline as successful. In this part of the workshop, we will prepare the model which you just trained in the pipeline for the deployment onto the edge device.

In [None]:
compilation_output_sub_folder = 'models/' + job_prefix + '/compilation-output'
edgepackaging_output_sub_folder = 'models/' + job_prefix + '/edge-packaging-output'

# S3 Location to save the model artifact after compilation
s3_compilation_output_location = 's3://{}/{}'.format(bucket_name, compilation_output_sub_folder)

# S3 Location to save the model artifact after edge packaging
s3_edgepackaging_output_location = 's3://{}/{}'.format(bucket_name, edgepackaging_output_sub_folder)

In [None]:
# Define some helper functions

def get_latest_approved_s3_model_location(client, model_package_group):
 """Returns the model location of the latest approved model version in a group"""
 response = client.list_model_packages(
 ModelPackageGroupName=model_package_group_name,
 ModelApprovalStatus='Approved'
 )
 latest_version = max(response['ModelPackageSummaryList'], key=lambda x:x['ModelPackageVersion'])
 model_artifact_location = sm_client.describe_model_package(ModelPackageName=latest_version['ModelPackageArn'])['InferenceSpecification']['Containers'][0]['ModelDataUrl']
 return model_artifact_location

def get_latest_approved_model_version(client, model_package_group):
 """Returns the model version of the latest approved model version in a group"""
 response = client.list_model_packages(
 ModelPackageGroupName=model_package_group_name,
 ModelApprovalStatus='Approved'
 )
 latest_version = max(response['ModelPackageSummaryList'], key=lambda x:x['ModelPackageVersion'])
 return latest_version['ModelPackageVersion']

### Run SageMaker Neo compilation job

In [None]:
# Retrieve some information on the model we just trained and registered in SageMaker Model Registry
s3_model_artifact_location = get_latest_approved_s3_model_location(sm_client, model_package_group_name)
print(s3_model_artifact_location)

model_name = 'img-classification'
compilation_job_name = '%s-%d' % (model_name, int(time.time()*1000))

# Lets start a compilation job for the target architecture
sm_client.create_compilation_job(
 CompilationJobName=compilation_job_name,
 RoleArn=role,
 InputConfig={
 'S3Uri': s3_model_artifact_location,
 'DataInputConfig': '{"data": [1,3,224,224]}',
 'Framework': 'MXNET'
 },
 OutputConfig={
 'S3OutputLocation': s3_compilation_output_location,
 'TargetPlatform': {'Os': 'LINUX', 'Arch': 'X86_64'}
 },
 StoppingCondition={ 'MaxRuntimeInSeconds': 900 }
)

# Poll the status of the job
print('Started compilation job .', end='')
while True:
 resp = sm_client.describe_compilation_job(CompilationJobName=compilation_job_name)
 if resp['CompilationJobStatus'] in ['STARTING', 'INPROGRESS']:
 print('.', end='')
 else:
 print(resp['CompilationJobStatus'], compilation_job_name)
 break
 time.sleep(5)
 
if resp['CompilationJobStatus'] == 'COMPLETED':
 s3_compiled_model_artifact_location_fullpath = resp['ModelArtifacts']['S3ModelArtifacts']
 print(f'Compiled artifact location in S3: {s3_compiled_model_artifact_location_fullpath}')

### Running the SageMaker Edge Packaging job

In [None]:
# Run the edge packaging job
edge_packaging_job_name='%s-%d' % (model_name, int(time.time()*1000))
model_version=str(get_latest_approved_model_version(sm_client, model_package_group_name))

# Start the edge packaging job
resp = sm_client.create_edge_packaging_job(
 EdgePackagingJobName=edge_packaging_job_name,
 CompilationJobName=compilation_job_name,
 ModelName=model_name,
 ModelVersion=model_version,
 RoleArn=role,
 OutputConfig={
 'S3OutputLocation': s3_edgepackaging_output_location
 }
)

# Poll the status of the job
print('Started edge packaging job .', end='')
while True:
 resp = sm_client.describe_edge_packaging_job(EdgePackagingJobName=edge_packaging_job_name)
 if resp['EdgePackagingJobStatus'] in ['STARTING', 'INPROGRESS']:
 print('.', end='')
 else:
 print(resp['EdgePackagingJobStatus'], compilation_job_name)
 break
 time.sleep(5)
 
if resp['EdgePackagingJobStatus'] == 'COMPLETED':
 s3_packaged_model_artifact_location_fullpath = resp['ModelArtifact']
 print(f'Packaged artifact location in S3: {s3_packaged_model_artifact_location_fullpath}')

### Running IoT Job for deplyoment onto the edge

In [None]:
def split_s3_path(s3_path):
 path_parts=s3_path.replace("s3://","").split("/")
 bucket=path_parts.pop(0)
 key="/".join(path_parts)
 return bucket, key

model_bucket, model_key = split_s3_path(s3_packaged_model_artifact_location_fullpath)

In [None]:
resp = iot_client.create_job(
 jobId=str(uuid.uuid4()),
 targets=[
 'arn:aws:iot:%s:%s:thinggroup/defect-detection-%s-group' % (region, account_id, PROJECT_NAME), 
 ],
 document=json.dumps({
 'type': 'new_model',
 'model_version': model_version,
 'model_name': model_name,
 'model_package_bucket': model_bucket,
 'model_package_key': model_key
 }),
 targetSelection='SNAPSHOT'
)