{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Warmup - Testing a minimalist setup of SageMaker Edge Manager\n", "\n", "**SageMaker Studio Kernel**: Data Science\n", "\n", "In this example, you'll run **SageMaker Edge Agent** as a (local) background process and invoke it's API using Python3. For that, we're going to:\n", " - prepare a ML model\n", " - download a trained model;\n", " - compile the ML model with SageMaker Neo for Linux x86_64;\n", " - create a deployment package using SageMaker Edge Manager;\n", " - download/unpack the deployment package;\n", " - download/unpack a package with the IoT certificates, required by the agent; \n", " - download/unpack **SageMaker Edge Agent** for Linux x86_64;\n", " - generate the protobuf/grpc stubs (.py scripts) - with these files we will send requests via unix:// sockets to the agent; \n", " - using some helper functions, we're going to interact with the agent and do some tests.\n", "\n", "The following diagram shows the resources, required to run this experiment and understand how the agent works and how to interact with it. \n", "![Pipeline](../imgs/EdgeManagerWorkshop_MinimalistArchitecture.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Installing some required libraries" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!apt-get -y update && apt-get -y install build-essential procps\n", "!pip install -U numpy sysv_ipc boto3 grpcio-tools grpcio protobuf sagemaker" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import boto3\n", "import tarfile\n", "import os\n", "import stat\n", "import io\n", "import time\n", "import sagemaker" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "project_name='<>'\n", "\n", "sm_client = boto3.client('sagemaker')\n", "\n", "role = sagemaker.get_execution_role()\n", "project_id = sm_client.describe_project(ProjectName=project_name)['ProjectId']\n", "bucket_name = 'sagemaker-wind-turbine-farm-%s' % project_id\n", "model_version = '1.0'\n", "model_name = 'WindTurbineAnomalyDetection'\n", "prefix='wind_turbine_anomaly'\n", "\n", "!aws s3 cp s3://aws-ml-blog/artifacts/monitor-manage-anomaly-detection-model-wind-turbine-fleet-sagemaker-neo/model.tar.gz s3://$bucket_name/trained_model/\n", "\n", "agent_config_package_prefix = 'wind_turbine_agent/config.tgz'\n", "agent_version = '1.20210512.96da6cc'\n", "agent_pkg_bucket = 'sagemaker-edge-release-store-us-west-2-linux-x64'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### We need to prepare the ML model to test with the agent\n", "First let's compile the ML model with SageMaker Neo" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "compilation_job_name = 'wind-turbine-anomaly-%d' % int(time.time()*1000)\n", "sm_client.create_compilation_job(\n", " CompilationJobName=compilation_job_name,\n", " RoleArn=role,\n", " InputConfig={\n", " 'S3Uri': 's3://%s/trained_model/model.tar.gz' % bucket_name,\n", " 'DataInputConfig': '{\"input0\":[1,6,10,10]}',\n", " 'Framework': 'PYTORCH'\n", " },\n", " OutputConfig={\n", " 'S3OutputLocation': 's3://%s/wind_turbine/optimized/' % bucket_name,\n", " 'TargetPlatform': { 'Os': 'LINUX', 'Arch': 'X86_64' }\n", " },\n", " StoppingCondition={ 'MaxRuntimeInSeconds': 900 } \n", ")\n", "while True:\n", " resp = sm_client.describe_compilation_job(CompilationJobName=compilation_job_name) \n", " if resp['CompilationJobStatus'] in ['STARTING', 'INPROGRESS']:\n", " print('Running...')\n", " else:\n", " print(resp['CompilationJobStatus'], compilation_job_name)\n", " break\n", " time.sleep(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Now we need to create a deployment package with SageMaker Edge Manager\n", "This process will get the compilation job, sign the model and prepare a .tar.gz package that can be interpreted by the agent." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "edge_packaging_job_name='wind-turbine-anomaly-%d' % int(time.time()*1000)\n", "resp = sm_client.create_edge_packaging_job(\n", " EdgePackagingJobName=edge_packaging_job_name,\n", " CompilationJobName=compilation_job_name,\n", " ModelName=model_name,\n", " ModelVersion=model_version,\n", " RoleArn=role,\n", " OutputConfig={\n", " 'S3OutputLocation': 's3://%s/%s/model/' % (bucket_name, prefix)\n", " }\n", ")\n", "while True:\n", " resp = sm_client.describe_edge_packaging_job(EdgePackagingJobName=edge_packaging_job_name) \n", " if resp['EdgePackagingJobStatus'] in ['STARTING', 'INPROGRESS']:\n", " print('Running...')\n", " else:\n", " print(resp['EdgePackagingJobStatus'], compilation_job_name) \n", " break\n", " time.sleep(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Prepare the edge device\n", " 1. First download the deployment package that contains the IoT + CA certificates and the configuration file of the SageMaker Edge Agent. \n", " 2. Then, download the SageMaker Edge Manager package and complete the deployment process.\n", " \n", " > You can see all the artifacts that will be loaded/executed by the virtual Edge Device in **agent/**" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "if not os.path.isdir('agent'):\n", " s3_client = boto3.client('s3')\n", "\n", " # Get the configuration package with certificates and config files\n", " with io.BytesIO() as file:\n", " s3_client.download_fileobj(bucket_name, agent_config_package_prefix, file)\n", " file.seek(0)\n", " # Extract the files\n", " tar = tarfile.open(fileobj=file)\n", " tar.extractall('.')\n", " tar.close() \n", "\n", " # Download and install SageMaker Edge Manager\n", " agent_pkg_key = 'Releases/%s/%s.tgz' % (agent_version, agent_version)\n", " # get the agent package\n", " with io.BytesIO() as file:\n", " s3_client.download_fileobj(agent_pkg_bucket, agent_pkg_key, file)\n", " file.seek(0)\n", " # Extract the files\n", " tar = tarfile.open(fileobj=file)\n", " tar.extractall('agent')\n", " tar.close()\n", " # Adjust the permissions\n", " os.chmod('agent/bin/sagemaker_edge_agent_binary', stat.S_IXUSR|stat.S_IWUSR|stat.S_IXGRP|stat.S_IWGRP)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# by using protoc, we can generate stubs (client api) for connecting to the agent and invoking its API\n", "!python3 -m grpc_tools.protoc --proto_path=agent/docs/api --python_out=app/ --grpc_python_out=app/ agent/docs/api/agent.proto" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "s3_url = 's3://%s/%s/model/%s-%s.tar.gz' % (bucket_name, prefix, model_name, model_version)\n", "!mkdir -p agent/model/dev/$model_name/$model_version\n", "!aws s3 cp $s3_url /tmp/model.tar.gz\n", "!tar -xzvf /tmp/model.tar.gz -C agent/model/dev/$model_name/$model_version" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### SageMaker Edge Agent - local directory structure\n", "```\n", "agent\n", "└───certificates\n", "│ └───root\n", "│ │ <>.pem # CA certificate used by Edge Manager to sign the model\n", "│ │\n", "│ └───iot\n", "│ edge_device_<>_cert.pem # IoT certificate\n", "│ edge_device_<>_key.pem # IoT private key\n", "│ edge_device_<>_pub.pem # IoT public key\n", "│ ...\n", "│ \n", "└───conf\n", "│ config_edge_device_<>.json # Edge Manager config file\n", "│ ...\n", "│\n", "└───model \n", "│ └───<>\n", "│ └───<>\n", "│ └───<> # Artifacts from the Edge Manager model package\n", "│ sagemaker_edge_manifest\n", "│ ...\n", "│\n", "└───logs\n", "│ agent<>.log # Logs collected by the local application\n", "│ ...\n", "app\n", " agent_pb2_grpc.py # grpc stubs generated by protoc\n", " agent_pb2.py # agent stubs generated by protoc\n", " ...\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### SageMaker Edge Agent (device0) config file" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!pygmentize agent/conf/config_edge_device_0.json" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Alright. You have all the resources/libraries required for the experiments\n", "Let's get started" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sys\n", "sys.path.insert(1, 'app')\n", "import numpy as np\n", "import subprocess\n", "import grpc\n", "import time\n", "import os\n", "import uuid\n", "\n", "# Loading the stubs - agent python client\n", "import agent_pb2 as agent\n", "import agent_pb2_grpc as agent_grpc" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### The next cell will run the agent as a Linux process (in background)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "channel_path='/tmp/agent_dev'\n", "if os.path.exists(channel_path): os.remove(channel_path)\n", "cmd = './agent/bin/sagemaker_edge_agent_binary -c agent/conf/config_edge_device_0.json -a %s' % channel_path\n", "print(cmd)\n", "logs = open(\"agent/logs/agent0.log\", \"+w\")\n", "proc = subprocess.Popen(cmd.split(' '), stdout=logs)\n", "time.sleep(2)\n", "!ps aux --cols 300|grep sagemaker_edge_agent" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Connecting to the agent\n", "channel = grpc.insecure_channel('unix://%s' % channel_path )\n", "client = agent_grpc.AgentStub(channel)\n", "model_name='WindTurbineAnomalyDetection'\n", "model_path='agent/model/dev/WindTurbineAnomalyDetection/1.0'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Helper functions\n", "These functions will wrap the GRPC calls: \n", " - create a request\n", " - invoke the api\n", " - process the response and return" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def list_models(cli):\n", " resp = cli.ListModels(agent.ListModelsRequest())\n", " return {m.name:{'in': m.input_tensor_metadatas, 'out': m.output_tensor_metadatas} for m in resp.models}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def load_model(cli, model_name, model_path):\n", " \"\"\" Load a new model into the Edge Agent if not loaded yet\"\"\"\n", " try:\n", " req = agent.LoadModelRequest()\n", " req.url = model_path\n", " req.name = model_name\n", " return cli.LoadModel(req) \n", " except Exception as e:\n", " print(e) \n", " return None" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def unload_model(cli, model_name):\n", " \"\"\" UnLoad model from the Edge Agent\"\"\"\n", " try:\n", " req = agent.UnLoadModelRequest()\n", " req.name = model_name\n", " resp = cli.UnLoadModel(req)\n", " return resp\n", " except Exception as e:\n", " print(e)\n", " return None" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def predict(cli, model_name, x, shm=False):\n", " \"\"\"\n", " Invokes the model and get the predictions\n", " \"\"\"\n", " try:\n", " model_map = list_models(cli)\n", " if model_map.get(model_name) is None:\n", " raise Exception('Model %s not loaded' % model_name)\n", " # Create a request\n", " req = agent.PredictRequest()\n", " req.name = model_name\n", " # Then load the data into a temp Tensor\n", " tensor = agent.Tensor()\n", " meta = model_map[model_name]['in'][0]\n", " tensor.tensor_metadata.name = meta.name\n", " tensor.tensor_metadata.data_type = meta.data_type\n", " for s in meta.shape: tensor.tensor_metadata.shape.append(s)\n", " \n", " if shm:\n", " tensor.shared_memory_handle.offset = 0\n", " tensor.shared_memory_handle.segment_id = x\n", " else:\n", " tensor.byte_data = x.astype(np.float32).tobytes()\n", "\n", " req.tensors.append(tensor)\n", "\n", " # Invoke the model\n", " resp = cli.Predict(req)\n", "\n", " # Parse the output\n", " meta = model_map[model_name]['out'][0]\n", " tensor = resp.tensors[0]\n", " data = np.frombuffer(tensor.byte_data, dtype=np.float32)\n", " return data.reshape(tensor.tensor_metadata.shape)\n", " except Exception as e:\n", " print(e) \n", " return None" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def create_tensor(x, tensor_name):\n", " if (x.dtype != np.float32):\n", " raise Exception( \"It only supports numpy float32 arrays for this tensor\" ) \n", " tensor = agent.Tensor() \n", " tensor.tensor_metadata.name = tensor_name.encode('utf-8')\n", " tensor.tensor_metadata.data_type = agent.FLOAT32\n", " for s in x.shape: tensor.tensor_metadata.shape.append(s)\n", " tensor.byte_data = x.tobytes()\n", " return tensor" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def capture_data(cli, model_name, input_tensor, output_tensor):\n", " try:\n", " req = agent.CaptureDataRequest()\n", " req.model_name = model_name\n", " req.capture_id = str(uuid.uuid4())\n", " req.input_tensors.append( create_tensor(input_tensor, 'input') )\n", " req.output_tensors.append( create_tensor(output_tensor, 'output') )\n", " resp = cli.CaptureData(req)\n", " except Exception as e: \n", " print(e)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def write_to_shm(sm, payload):\n", " if sm.attached: sm.detach()\n", " # set mode read/write\n", " sm.mode = 0o0600\n", " sm.attach()\n", " sm.write(payload.astype(np.float32).tobytes())\n", " # set mode read only\n", " sm.mode = 0o0400" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Loading and listing models" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "## Loading a model in the agent\n", "load_model(client, model_name, model_path)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "## List the loaded models\n", "list_models(client)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Running some predictions" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "\n", "shape=(1,6,10,10)\n", "payload_size=4 # float32\n", "for i in shape: payload_size *= i\n", "x = np.random.rand(*shape).astype(np.float32)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "y = predict(client, model_name, x)\n", "capture_data(client, model_name, x, y)\n", "print(y.shape)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Shared Memory Predictions\n", "Protobuf/grpc does a good job by transporting the data from the client to the server. However, if this payload is larger than 4MB the performance decreases. So, using shared memory is a great alternative.\n", "\n", "In this example you'll see how to load the payload to a reserved space in the device's shared memory. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sysv_ipc as ipc\n", "key=42\n", "sm=None\n", "## create/reserve some space in the device's shared memory\n", "try:\n", " sm = ipc.SharedMemory(key, mode=0o600, size = payload_size)\n", "except ipc.ExistentialError as e:\n", " sm = ipc.SharedMemory(key, flags=ipc.IPC_CREX, size = payload_size)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "write_to_shm(sm, x)\n", "y = predict(client, model_name, sm.id, True)\n", "capture_data(client, model_name, x, y)\n", "print(y.shape)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Stress test to compare 'normal' predictions vs shared memory predictions" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "start_time=time.time()\n", "for i in range(1000):\n", " y = predict(client, model_name, x)\n", "print('Elapsed time normal prediction: %fs' % ((time.time()-start_time)/200))\n", "start_time=time.time()\n", "for i in range(1000):\n", " write_to_shm(sm, x)\n", " y = predict(client, model_name, sm.id, True)\n", "print('Elapsed time shared memory prediction: %fs' % ((time.time()-start_time)/200))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Retrieve Captured Data from S3" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import json\n", "import boto3\n", "import io\n", "import base64\n", "from datetime import datetime\n", "import re\n", "\n", "pattern = r'Tensor\\[(\\w+); (\\d+), (\\d+), (\\d+), (\\d+)\\]'\n", "s3_client = boto3.client('s3')\n", "config_file = json.load(open('agent/conf/config_edge_device_0.json', 'r'))\n", "\n", "device_fleet_name = config_file['sagemaker_edge_core_device_fleet_name']\n", "bucket_name = config_file['sagemaker_edge_provider_s3_bucket_name']\n", "bucket_prefix = config_file['sagemaker_edge_core_folder_prefix']\n", "\n", "s3_prefix = '%s/%s/%s/%s/' % (bucket_prefix, device_fleet_name, model_name, datetime.now().strftime('%Y/%m/%d/%H') )\n", "\n", "logs = s3_client.list_objects(Bucket=bucket_name, Prefix=s3_prefix)\n", "if logs.get('Contents') is not None: \n", " with io.BytesIO() as f:\n", " s3_client.download_fileobj(bucket_name, logs['Contents'][0]['Key'], f)\n", " f.seek(0)\n", " # each log is saved as a Json lines file\n", " try:\n", " log = json.loads(f.read())\n", "\n", " inputs = log['deviceFleetInputs'][0]\n", " outputs = log['deviceFleetOutputs'][0]\n", "\n", " # convert the data content back to bytes from base64\n", " input_data = base64.b64decode(inputs['data']) \n", " output_data = base64.b64decode(outputs['data'])\n", "\n", " # get the input/output shapes\n", " m=re.match(pattern, inputs['observedContentType'])\n", " input_shape = [int(m.group(i)) for i in range(2,6)]\n", " m=re.match(pattern, outputs['observedContentType'])\n", " output_shape = [int(m.group(i)) for i in range(2,6)]\n", "\n", " # rebuild a numpy array with the stored data in the correct shape\n", " x = np.frombuffer(input_data, dtype=np.float32).reshape(input_shape)\n", " y = np.frombuffer(output_data, dtype=np.float32).reshape(output_shape)\n", "\n", " print(inputs['observedContentType'], inputs['name'], x.shape)\n", " print(outputs['observedContentType'], outputs['name'], y.shape)\n", " except Exception as e:\n", " print('Try to run capture_data a few more times')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Unload the model and kill the process" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "unload_model(client, model_name)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "proc.kill()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "if sm is not None:\n", " sm.detach()\n", " sm.remove()\n", " sm = None" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!rm -rf agent/model/dev/*" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now that you know how SageMaker Edge Manager/Agent works, it's time to build an end-to-end solution for ML@Edge\n", "\n", "You can start exercise #1: visualizing the wind turbine data\n", "\n", " > [Exercise 01](01%20-%20Data%20Visualization.ipynb)" ] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 4 }