{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# End to End example to manage lifecycle of ML models deployed on the edge using SageMaker Edge Manager\n", "\n", "**SageMaker Studio Kernel**: Data Science\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Contents \n", "\n", "* Use Case\n", "* Workflow\n", "* Setup\n", "* Building and Deploying the ML Model\n", "* Running the fleet of Virtual Wind Turbines and Edge Devices\n", "* Cleanup\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Use Case\n", "\n", "The challenge we're trying to address here is to detect anomalies in the components of a Wind Turbine. Each wind turbine has many sensors that reads data like:\n", " - Internal & external temperature\n", " - Wind speed\n", " - Rotor speed\n", " - Air pressure\n", " - Voltage (or current) in the generator\n", " - Vibration in the GearBox (using an IMU -> Accelerometer + Gyroscope)\n", "\n", "So, depending on the types of the anomalies we want to detect, we need to select one or more features and then prepare a dataset that 'explains' the anomalies. We are interested in three types of anomalies:\n", " - Rotor speed (when the rotor is not in an expected speed)\n", " - Produced voltage (when the generator is not producing the expected voltage)\n", " - Gearbox vibration (when the vibration of the gearbox is far from the expected)\n", " \n", "All these three anomalies (or violations) depend on many variables while the turbine is working. Thus, in order to address that, let's use a ML model called [Autoencoder](https://en.wikipedia.org/wiki/Autoencoder), with correlated features. This model is unsupervised. It learns the latent representation of the dataset and tries to predict (regression) the same tensor given as input. The strategy then is to use a dataset collected from a normal turbine (without anomalies). The model will then learn **'what is a normal turbine'**. When the sensors readings of a malfunctioning turbine is used as input, the model will not be able to rebuild the input, predicting something with a high error and detected as an anomaly.\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Workflow" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "In this example, you will create a robust end-to-end solution that manages the lifecycle of ML models deployed to a wind turbine fleet to detect the anomalies in the operation using SageMaker Edge Manager.\n", "\n", " - Prepare a ML model\n", " - download a pre-trained model;\n", " - compile the ML model with SageMaker Neo for Linux x86_64;\n", " - create a deployment package using SageMaker Edge Manager;\n", " - download/unpack the deployment package;\n", " - Download/unpack a package with the IoT certificates, required by the agent; \n", " - Download/unpack **SageMaker Edge Agent** for Linux x86_64;\n", " - Generate the protobuf/grpc stubs (.py scripts) - with these files we will send requests via unix:// sockets to the agent; \n", " - Using some helper functions, we're going to interact with the agent and do some tests.\n", "\n", "The following diagram shows the resources, required to run this experiment and understand how the agent works and how to interact with it. \n", "![Pipeline](../imgs/EdgeManagerWorkshop_MinimalistArchitecture.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 1 - Setup " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Installing some required libraries" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!apt-get -y update && apt-get -y install build-essential procps\n", "!pip install --quiet -U numpy sysv_ipc boto3 grpcio-tools grpcio protobuf sagemaker\n", "!pip install --quiet -U matplotlib==3.4.1 seaborn==0.11.1\n", "!pip install --quiet -U grpcio-tools grpcio protobuf\n", "!pip install --quiet paho-mqtt\n", "!pip install --quiet ipywidgets" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import boto3\n", "import tarfile\n", "import os\n", "import stat\n", "import io\n", "import time\n", "import sagemaker\n", "import pandas as pd\n", "import matplotlib.pyplot as plt\n", "from datetime import datetime\n", "import numpy as np\n", "import glob" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Let's take a look at the dataset and its features\n", "Download the dataset " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%matplotlib inline\n", "%config InlineBackend.figure_format='retina'\n", "\n", "!mkdir -p data\n", "!curl https://aws-ml-blog.s3.amazonaws.com/artifacts/monitor-manage-anomaly-detection-model-wind-turbine-fleet-sagemaker-neo/dataset_wind_turbine.csv.gz -o data/dataset_wind.csv.gz\n", " \n", "parser = lambda date: datetime.strptime(date, '%Y-%m-%dT%H:%M:%S.%f+00:00')\n", "df = pd.read_csv('data/dataset_wind.csv.gz', compression=\"gzip\", sep=',', low_memory=False, parse_dates=[ 'eventTime'], date_parser=parser)\n", "\n", "df.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Features:\n", " - **nanoId**: id of the edge device that collected the data\n", " - **turbineId**: id of the turbine that produced this data\n", " - **arduino_timestamp**: timestamp of the arduino that was operating this turbine\n", " - **nanoFreemem**: amount of free memory in bytes\n", " - **eventTime**: timestamp of the row\n", " - **rps**: rotation of the rotor in Rotations Per Second\n", " - **voltage**: voltage produced by the generator in milivolts\n", " - **qw, qx, qy, qz**: quaternion angular acceleration\n", " - **gx, gy, gz**: gravity acceleration\n", " - **ax, ay, az**: linear acceleration\n", " - **gearboxtemp**: internal temperature\n", " - **ambtemp**: external temperature\n", " - **humidity**: air humidity\n", " - **pressure**: air pressure\n", " - **gas**: air quality\n", " - **wind_speed_rps**: wind speed in Rotations Per Second" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 2 - Deploying the pre-built ML Model\n", "\n", "\n", "In this below section you will :\n", "\n", " - Compile/Optimize your pre-trained model to your edge device (Linux X86_64) using [SageMaker NEO](https://docs.aws.amazon.com/sagemaker/latest/dg/neo.html)\n", " - Create a deployment package with a signed model + the runtime used by SageMaker Edge Agent to load and invoke the optimized model\n", " - Deploy the package using IoT Jobs\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "project_name='wind-turbine-farm'\n", "\n", "s3_client = boto3.client('s3')\n", "sm_client = boto3.client('sagemaker')\n", "\n", "project_id = sm_client.describe_project(ProjectName=project_name)['ProjectId']\n", "bucket_name = 'sagemaker-wind-turbine-farm-%s' % project_id\n", "\n", "prefix='wind_turbine_anomaly'\n", "sagemaker_session=sagemaker.Session(default_bucket=bucket_name)\n", "role = sagemaker.get_execution_role()\n", "print('Project name: %s' % project_name)\n", "print('Project id: %s' % project_id)\n", "print('Bucket name: %s' % bucket_name)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Compiling/Packaging/Deploying our ML model to our edge devices\n", "\n", "Invoking SageMaker NEO to compile the pre-trained model. To know how this model was trained please refer to the training notebook [here](https://github.com/aws-samples/amazon-sagemaker-edge-manager-workshop/tree/main/lab/02-Training). \n", "\n", "Upload the pre-trained model to S3 bucket" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model_file = open(\"model/model.tar.gz\", \"rb\")\n", "boto3.Session().resource(\"s3\").Bucket(bucket_name).Object('model/model.tar.gz').upload_fileobj(model_file)\n", "print(\"Model successfully uploaded!\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It will compile the model for targeted hardware and OS with SageMaker Neo service. It will also include the [deep learning runtime](https://github.com/neo-ai/neo-ai-dlr) in the model package." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "compilation_job_name = 'wind-turbine-anomaly-%d' % int(time.time()*1000)\n", "sm_client.create_compilation_job(\n", " CompilationJobName=compilation_job_name,\n", " RoleArn=role,\n", " InputConfig={\n", " 'S3Uri': 's3://%s/model/model.tar.gz' % sagemaker_session.default_bucket(),\n", " 'DataInputConfig': '{\"input0\":[1,6,10,10]}',\n", " 'Framework': 'PYTORCH'\n", " },\n", " OutputConfig={\n", " 'S3OutputLocation': 's3://%s/wind_turbine/optimized/' % sagemaker_session.default_bucket(), \n", " 'TargetPlatform': { 'Os': 'LINUX', 'Arch': 'X86_64' }\n", " },\n", " StoppingCondition={ 'MaxRuntimeInSeconds': 900 }\n", ")\n", "while True:\n", " resp = sm_client.describe_compilation_job(CompilationJobName=compilation_job_name) \n", " if resp['CompilationJobStatus'] in ['STARTING', 'INPROGRESS']:\n", " print('Running...')\n", " else:\n", " print(resp['CompilationJobStatus'], compilation_job_name)\n", " break\n", " time.sleep(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Building the Deployment Package SageMaker Edge Manager\n", "It will sign the model and create a deployment package with:\n", " - The optimized model\n", " - Model Metadata" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import time\n", "model_version = '1.0'\n", "model_name = 'WindTurbineAnomalyDetection'\n", "edge_packaging_job_name='wind-turbine-anomaly-%d' % int(time.time()*1000)\n", "resp = sm_client.create_edge_packaging_job(\n", " EdgePackagingJobName=edge_packaging_job_name,\n", " CompilationJobName=compilation_job_name,\n", " ModelName=model_name,\n", " ModelVersion=model_version,\n", " RoleArn=role,\n", " OutputConfig={\n", " 'S3OutputLocation': 's3://%s/%s/model/' % (bucket_name, prefix)\n", " }\n", ")\n", "while True:\n", " resp = sm_client.describe_edge_packaging_job(EdgePackagingJobName=edge_packaging_job_name) \n", " if resp['EdgePackagingJobStatus'] in ['STARTING', 'INPROGRESS']:\n", " print('Running...')\n", " else:\n", " print(resp['EdgePackagingJobStatus'], compilation_job_name)\n", " break\n", " time.sleep(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Deploy the package\n", "Using IoT Jobs, we will notify the Python application in the edge devices. The application will:\n", " - Download the deployment package\n", " - Unpack it\n", " - Load the new mode (unload previous versions if any)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import boto3\n", "import json\n", "import sagemaker\n", "import uuid\n", "\n", "iot_client = boto3.client('iot')\n", "sts_client = boto3.client('sts')\n", "\n", "model_version = '1.0'\n", "model_name = 'WindTurbineAnomalyDetection'\n", "sagemaker_session=sagemaker.Session()\n", "region_name = sagemaker_session.boto_session.region_name\n", "account_id = sts_client.get_caller_identity()[\"Account\"]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "resp = iot_client.create_job(\n", " jobId=str(uuid.uuid4()),\n", " targets=[\n", " 'arn:aws:iot:%s:%s:thinggroup/WindTurbineFarm-%s' % (region_name, account_id, project_id), \n", " ],\n", " document=json.dumps({\n", " 'type': 'new_model',\n", " 'model_version': model_version,\n", " 'model_name': model_name,\n", " 'model_package_bucket': bucket_name,\n", " 'model_package_key': \"%s/model/%s-%s.tar.gz\" % (prefix, model_name, model_version) \n", " }),\n", " targetSelection='SNAPSHOT'\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Alright! Now, the deployment process will start on the connected edge devices!\n", "\n", "## Step 3 - Running the fleet of Virtual Wind Turbines and Edge Devices" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this section you will run a local application written in Python3 that simulates 5 Wind Turbines and 5 edge devices. The SageMaker Edge Agent is deployed on the edge devices.\n", "\n", "Here you'll be the **Wind Turbine Farm Operator**. It's possible to visualize the data flowing from the sensors to the ML Model and analyze the anomalies. Also, you'll be able to inject noise (pressing some buttons) in the data to simulate potential anomalies with the equipment.\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
ARCHITECTUREPYTHON CLASS STRUCTURE in DEMO
\n", "\n", "The components of the applicationare:\n", " - Simulator:\n", " - [Simulator](app/simulator.py): Program that launches the virtual wind turbines and the edge devices. It uses Python Threads to run all the 10 processes\n", " - [Wind Farm](app/windfarm.py): This is the application that runs on the edge device. It is reponsible for reading the sensors, invoking the ML model and analyzing the anomalies \n", " - Edge Application:\n", " - [Turbine](app/turbine.py): Virtual Wind Turbine. It reads the raw data collected from the 3D Prited Mini Turbine and stream it as a circular buffer. It also has a graphical representation in **IPython Widgets** that is rendered by the Simulator/Dashboard.\n", " - [Over The Air](app/ota.py): This is a module integrated with **IoT Jobs**. In the previous exercise you created an IoT job to deploy the model. This module gets the document process it and deployes the model in each edge device and loads it via SageMaker Edge Manager.\n", " - [Edge client](app/edgeagentclient.py): An abstraction layer on top of the **generated stubs** (proto compilation). It makes it easy to integrate **Wind Farm** with the SageMaker Edge Agent" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "agent_config_package_prefix = 'wind_turbine_agent/config.tgz'\n", "agent_version = '1.20210512.96da6cc'\n", "agent_pkg_bucket = 'sagemaker-edge-release-store-us-west-2-linux-x64'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Prepare the edge devices\n", " 1. First download the deployment package that contains the IoT + CA certificates and the configuration file of the SageMaker Edge Agent. \n", " 2. Then, download the SageMaker Edge Manager package and complete the deployment process.\n", " \n", " > You can see all the artifacts that will be loaded/executed by the virtual Edge Device in **agent/**" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "if not os.path.isdir('agent'):\n", " s3_client = boto3.client('s3')\n", "\n", " # Get the configuration package with certificates and config files\n", " with io.BytesIO() as file:\n", " s3_client.download_fileobj(bucket_name, agent_config_package_prefix, file)\n", " file.seek(0)\n", " # Extract the files\n", " tar = tarfile.open(fileobj=file)\n", " tar.extractall('.')\n", " tar.close() \n", "\n", " # Download and install SageMaker Edge Manager\n", " agent_pkg_key = 'Releases/%s/%s.tgz' % (agent_version, agent_version)\n", " # get the agent package\n", " with io.BytesIO() as file:\n", " s3_client.download_fileobj(agent_pkg_bucket, agent_pkg_key, file)\n", " file.seek(0)\n", " # Extract the files\n", " tar = tarfile.open(fileobj=file)\n", " tar.extractall('agent')\n", " tar.close()\n", " # Adjust the permissions\n", " os.chmod('agent/bin/sagemaker_edge_agent_binary', stat.S_IXUSR|stat.S_IWUSR|stat.S_IXGRP|stat.S_IWGRP)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Finally, create the SageMaker Edge Agent client stubs, using the protobuffer compiler" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "SageMaker EdgeManager exposes a [gRPC API](https://grpc.io/docs/what-is-grpc/introduction/) to processes on device. In order to use gRPC API in your choice of language, you need to use the protobuf file `agent.proto` (the definition file for gRPC interface) to generate a stub in your preferred language. Our example was written in Python, therefore below is an example to generate Python EdgeManager gRPC stubs." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!python3 -m grpc_tools.protoc --proto_path=agent/docs/api --python_out=app/ --grpc_python_out=app/ agent/docs/api/agent.proto" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### SageMaker Edge Agent - local directory structure\n", "```\n", "agent\n", "└───certificates\n", "│ └───root\n", "│ │ <>.pem # CA certificate used by Edge Manager to sign the model\n", "│ │\n", "│ └───iot\n", "│ edge_device_<>_cert.pem # IoT certificate\n", "│ edge_device_<>_key.pem # IoT private key\n", "│ edge_device_<>_pub.pem # IoT public key\n", "│ ...\n", "│ \n", "└───conf\n", "│ config_edge_device_<>.json # Edge Manager config file\n", "│ ...\n", "│\n", "└───model \n", "│ └───<>\n", "│ └───<>\n", "│ └───<> # Artifacts from the Edge Manager model package\n", "│ sagemaker_edge_manifest\n", "│ ...\n", "│\n", "└───logs\n", "│ agent<>.log # Logs collected by the local application\n", "│ ...\n", "app\n", " agent_pb2_grpc.py # grpc stubs generated by protoc\n", " agent_pb2.py # agent stubs generated by protoc\n", " ...\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Simulating The Wind Turbine Farm\n", "Now its time to run our simulator and start playing with the turbines, agents and with the anomalies\n", " > After clicking on **Start**, each turbine will start buffering some data. It takes a few seconds but after completing this process, the application runs in real-time \n", " > Try to press some buttons while the simulation is running, to inject noise in the data and see some anomalies \n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sys\n", "sys.path.insert(1, 'app')\n", "import windfarm\n", "import edgeagentclient\n", "import turbine\n", "import simulator\n", "import ota\n", "import boto3\n", "from importlib import reload\n", "\n", "reload(simulator)\n", "reload(turbine)\n", "reload(edgeagentclient)\n", "reload(windfarm)\n", "reload(ota)\n", "\n", "# If there is an existing simulator running, halt it\n", "try:\n", " farm.halt()\n", "except:\n", " pass\n", "\n", "iot_client = boto3.client('iot')\n", "\n", "mqtt_host=iot_client.describe_endpoint(endpointType='iot:Data-ATS')['endpointAddress']\n", "mqtt_port=8883\n", "\n", "!mkdir -p agent/logs && rm -f agent/logs/*\n", "simulator = simulator.WindTurbineFarmSimulator(5)\n", "simulator.start()\n", "\n", "farm = windfarm.WindTurbineFarm(simulator, mqtt_host, mqtt_port)\n", "farm.start()\n", "\n", "simulator.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ " > If you want to experiment with the deployment process, with the wind farm running, go back to Step 2, replace the variable **model_version** by the constant (string) '2.0' in the Json document used by the IoT Job. Then, create a new IoT Job to simulate how to deploy new versions of the model. Go back to this exercise to see the results." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "try:\n", " farm.halt()\n", "except:\n", " pass\n", "\n", "print(\"Done\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Cleanup\n", "Run the next cell only if you already finished exploring/hacking the content of the workshop. \n", "This code will delete all the resouces created so far, including the **SageMaker Project** you've created" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# import boto3\n", "# import time\n", "# from shutil import rmtree\n", "\n", "# iot_client = boto3.client('iot')\n", "# sm_client = boto3.client('sagemaker')\n", "# s3_resource = boto3.resource('s3')\n", "\n", "# policy_name='WindTurbineFarmPolicy-%s' % project_id\n", "# thing_group_name='WindTurbineFarm-%s' % project_id\n", "# fleet_name='wind-turbine-farm-%s' % project_id\n", "\n", "# # Delete all files from the S3 Bucket\n", "# s3_resource.Bucket(bucket_name).objects.all().delete()\n", "\n", "# # now deregister the devices from the fleet\n", "# resp = sm_client.list_devices(DeviceFleetName=fleet_name)\n", "# devices = [d['DeviceName'] for d in resp['DeviceSummaries']]\n", "# if len(devices) > 0:\n", "# sm_client.deregister_devices(DeviceFleetName=fleet_name, DeviceNames=devices)\n", "\n", "# # now deregister the devices from the fleet\n", "# for i,cert_arn in enumerate(iot_client.list_targets_for_policy(policyName=policy_name)['targets']):\n", "# for t in iot_client.list_principal_things(principal=cert_arn)['things']:\n", "# iot_client.detach_thing_principal(thingName=t, principal=cert_arn)\n", "# iot_client.detach_policy(policyName=policy_name, target=cert_arn)\n", "# certificateId = cert_arn.split('/')[-1]\n", "\n", "# iot_client.delete_role_alias(roleAlias='SageMakerEdge-%s' % fleet_name)\n", "# iot_client.delete_thing_group(thingGroupName=thing_group_name)\n", "\n", "# if os.path.isdir('agent'): rmtree('agent')\n", "# sm_client.delete_project(ProjectName=project_name)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Mission Complete! " ] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-west-2:236514542706:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 4 }