{ "cells": [ { "cell_type": "markdown", "source": [ "# Defect Detection: Image Classification - Pipeline Execution\n", "\n", "In this notebook, we will use the pipeline configured in the included python package under `pipelines` together with the defined code for preprocessing and training to automate the model training. It is easy to use such that you can simple drop in whatever input data for image classification you want and have it train a model automatically.\n", "\n", "### Expected data format\n", "\n", "The expected data format for image classification is .png or .jpg images sorted into a \"normal\" or \"anomalous\" prefix in S3. Thus, the `InputData` parameter of the pipeline needs to point to an S3 prefix which contains \"folders\" (S3 prefixes\") named \"normal\" and \"anomalous\". These paths will be used by the preprocessing script to create a RecordIO training data set." ], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "import boto3\n", "import sagemaker\n", "import time\n", "import uuid\n", "import json\n", "\n", "iot_client = boto3.client('iot')\n", "sts_client = boto3.client('sts')\n", "sm_client = boto3.client('sagemaker')\n", "\n", "# Get the account id\n", "account_id = sts_client.get_caller_identity()[\"Account\"]\n", "\n", "# Project Name as defined in your CloudFormation template\n", "PROJECT_NAME = ''\n", "\n", "region = boto3.Session().region_name\n", "role = sagemaker.get_execution_role()\n", "bucket_name = 'sm-edge-workshop-%s-%s' % (PROJECT_NAME, account_id)\n", "\n", "# Change these to reflect your project/business name or if you want to separate ModelPackageGroup/Pipeline from the rest of your team\n", "model_package_group_name = 'defect-detection-img-classification-%s' % PROJECT_NAME\n", "job_prefix = 'defect-detection-img-classification'\n", "pipeline_name = 'defect-detection-img-clf-pipeline-%s' % PROJECT_NAME" ], "outputs": [], "metadata": {} }, { "cell_type": "markdown", "source": [ "### Getting the pipeline definition\n", "\n", "We use the `get_pipeline` method to create a pipeline DAG definition with our provided input. The input provided here is fixed for each pipeline you create or update, you cannot change these parameters with each execution (see usage of parameters in the cell below)." ], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "from pipelines.image_classification.pipeline import get_pipeline\n", "\n", "pipeline = get_pipeline(\n", " region=region,\n", " role=role,\n", " default_bucket=bucket_name,\n", " pipeline_name=pipeline_name,\n", " base_job_prefix=job_prefix\n", ")" ], "outputs": [], "metadata": {} }, { "cell_type": "markdown", "source": [ "### Creating the pipeline\n", "\n", "We create the pipeline (or update it in case it exists) with the previously defined DAG definition." ], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "pipeline.upsert(role_arn=role)" ], "outputs": [], "metadata": {} }, { "cell_type": "markdown", "source": [ "### Starting the pipeline execution\n", "\n", "We now start the exeuction of the pipeline with a given set of parameters which we can alter for every execution." ], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "input_data_path = 's3://%s/' % bucket_name\n", "\n", "execution = pipeline.start(\n", " parameters=dict(\n", " InputData=input_data_path,\n", " TrainingInstanceType=\"ml.p3.2xlarge\",\n", " ModelApprovalStatus=\"Approved\",\n", " ModelPackageGroupName=model_package_group_name,\n", " TargetImageSize=\"224\",\n", " AugmentCountAnomalous=\"1000\"\n", " )\n", ")" ], "outputs": [], "metadata": {} }, { "cell_type": "markdown", "source": [ "### Check progress\n", "\n", "After execution started, you can always check the progress of your pipeline execution either by looking at the processing and training jobs in the SageMaker Console, using the built-in SageMaker Studio Pipeline visualization tools or using SDK methods like below." ], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "execution.describe()" ], "outputs": [], "metadata": {} }, { "cell_type": "markdown", "source": [ "## Preparing trained model for edge\n", "\n", "Please proceed here only, if the execution of the training pipeline as successful. In this part of the workshop, we will prepare the model which you just trained in the pipeline for the deployment onto the edge device." ], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "compilation_output_sub_folder = 'models/' + job_prefix + '/compilation-output'\n", "edgepackaging_output_sub_folder = 'models/' + job_prefix + '/edge-packaging-output'\n", "\n", "# S3 Location to save the model artifact after compilation\n", "s3_compilation_output_location = 's3://{}/{}'.format(bucket_name, compilation_output_sub_folder)\n", "\n", "# S3 Location to save the model artifact after edge packaging\n", "s3_edgepackaging_output_location = 's3://{}/{}'.format(bucket_name, edgepackaging_output_sub_folder)" ], "outputs": [], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "# Define some helper functions\n", "\n", "def get_latest_approved_s3_model_location(client, model_package_group):\n", " \"\"\"Returns the model location of the latest approved model version in a group\"\"\"\n", " response = client.list_model_packages(\n", " ModelPackageGroupName=model_package_group_name,\n", " ModelApprovalStatus='Approved'\n", " )\n", " latest_version = max(response['ModelPackageSummaryList'], key=lambda x:x['ModelPackageVersion'])\n", " model_artifact_location = sm_client.describe_model_package(ModelPackageName=latest_version['ModelPackageArn'])['InferenceSpecification']['Containers'][0]['ModelDataUrl']\n", " return model_artifact_location\n", "\n", "def get_latest_approved_model_version(client, model_package_group):\n", " \"\"\"Returns the model version of the latest approved model version in a group\"\"\"\n", " response = client.list_model_packages(\n", " ModelPackageGroupName=model_package_group_name,\n", " ModelApprovalStatus='Approved'\n", " )\n", " latest_version = max(response['ModelPackageSummaryList'], key=lambda x:x['ModelPackageVersion'])\n", " return latest_version['ModelPackageVersion']" ], "outputs": [], "metadata": {} }, { "cell_type": "markdown", "source": [ "### Run SageMaker Neo compilation job" ], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "# Retrieve some information on the model we just trained and registered in SageMaker Model Registry\n", "s3_model_artifact_location = get_latest_approved_s3_model_location(sm_client, model_package_group_name)\n", "print(s3_model_artifact_location)\n", "\n", "model_name = 'img-classification'\n", "compilation_job_name = '%s-%d' % (model_name, int(time.time()*1000))\n", "\n", "# Lets start a compilation job for the target architecture\n", "sm_client.create_compilation_job(\n", " CompilationJobName=compilation_job_name,\n", " RoleArn=role,\n", " InputConfig={\n", " 'S3Uri': s3_model_artifact_location,\n", " 'DataInputConfig': '{\"data\": [1,3,224,224]}',\n", " 'Framework': 'MXNET'\n", " },\n", " OutputConfig={\n", " 'S3OutputLocation': s3_compilation_output_location,\n", " 'TargetPlatform': {'Os': 'LINUX', 'Arch': 'X86_64'}\n", " },\n", " StoppingCondition={ 'MaxRuntimeInSeconds': 900 }\n", ")\n", "\n", "# Poll the status of the job\n", "print('Started compilation job .', end='')\n", "while True:\n", " resp = sm_client.describe_compilation_job(CompilationJobName=compilation_job_name)\n", " if resp['CompilationJobStatus'] in ['STARTING', 'INPROGRESS']:\n", " print('.', end='')\n", " else:\n", " print(resp['CompilationJobStatus'], compilation_job_name)\n", " break\n", " time.sleep(5)\n", " \n", "if resp['CompilationJobStatus'] == 'COMPLETED':\n", " s3_compiled_model_artifact_location_fullpath = resp['ModelArtifacts']['S3ModelArtifacts']\n", " print(f'Compiled artifact location in S3: {s3_compiled_model_artifact_location_fullpath}')" ], "outputs": [], "metadata": {} }, { "cell_type": "markdown", "source": [ "### Running the SageMaker Edge Packaging job" ], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "# Run the edge packaging job\n", "edge_packaging_job_name='%s-%d' % (model_name, int(time.time()*1000))\n", "model_version=str(get_latest_approved_model_version(sm_client, model_package_group_name))\n", "\n", "# Start the edge packaging job\n", "resp = sm_client.create_edge_packaging_job(\n", " EdgePackagingJobName=edge_packaging_job_name,\n", " CompilationJobName=compilation_job_name,\n", " ModelName=model_name,\n", " ModelVersion=model_version,\n", " RoleArn=role,\n", " OutputConfig={\n", " 'S3OutputLocation': s3_edgepackaging_output_location\n", " }\n", ")\n", "\n", "# Poll the status of the job\n", "print('Started edge packaging job .', end='')\n", "while True:\n", " resp = sm_client.describe_edge_packaging_job(EdgePackagingJobName=edge_packaging_job_name)\n", " if resp['EdgePackagingJobStatus'] in ['STARTING', 'INPROGRESS']:\n", " print('.', end='')\n", " else:\n", " print(resp['EdgePackagingJobStatus'], compilation_job_name)\n", " break\n", " time.sleep(5)\n", " \n", "if resp['EdgePackagingJobStatus'] == 'COMPLETED':\n", " s3_packaged_model_artifact_location_fullpath = resp['ModelArtifact']\n", " print(f'Packaged artifact location in S3: {s3_packaged_model_artifact_location_fullpath}')" ], "outputs": [], "metadata": {} }, { "cell_type": "markdown", "source": [ "### Running IoT Job for deplyoment onto the edge" ], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "def split_s3_path(s3_path):\n", " path_parts=s3_path.replace(\"s3://\",\"\").split(\"/\")\n", " bucket=path_parts.pop(0)\n", " key=\"/\".join(path_parts)\n", " return bucket, key\n", "\n", "model_bucket, model_key = split_s3_path(s3_packaged_model_artifact_location_fullpath)" ], "outputs": [], "metadata": {} }, { "cell_type": "code", "execution_count": null, "source": [ "resp = iot_client.create_job(\n", " jobId=str(uuid.uuid4()),\n", " targets=[\n", " 'arn:aws:iot:%s:%s:thinggroup/defect-detection-%s-group' % (region, account_id, PROJECT_NAME), \n", " ],\n", " document=json.dumps({\n", " 'type': 'new_model',\n", " 'model_version': model_version,\n", " 'model_name': model_name,\n", " 'model_package_bucket': model_bucket,\n", " 'model_package_key': model_key\n", " }),\n", " targetSelection='SNAPSHOT'\n", ")" ], "outputs": [], "metadata": {} } ], "metadata": { "instance_type": "ml.t3.medium", "interpreter": { "hash": "dca0ade3e726a953b501b15e8e990130d2b7799f14cfd9f4271676035ebe5511" }, "kernelspec": { "display_name": "Python 3.7.6 64-bit ('base': conda)", "name": "python3" }, "language_info": { "name": "python", "version": "" } }, "nbformat": 4, "nbformat_minor": 4 }