# Warmup - Testing a minimalist setup of SageMaker Edge Manager

**SageMaker Studio Kernel**: Data Science

In this example, you'll run **SageMaker Edge Agent** as a (local) background process and invoke it's API using Python3. For that, we're going to:
 - prepare a ML model
 - download a trained model;
 - compile the ML model with SageMaker Neo for Linux x86_64;
 - create a deployment package using SageMaker Edge Manager;
 - download/unpack the deployment package;
 - download/unpack a package with the IoT certificates, required by the agent; 
 - download/unpack **SageMaker Edge Agent** for Linux x86_64;
 - generate the protobuf/grpc stubs (.py scripts) - with these files we will send requests via unix:// sockets to the agent; 
 - using some helper functions, we're going to interact with the agent and do some tests.

The following diagram shows the resources, required to run this experiment and understand how the agent works and how to interact with it. 
![Pipeline](../imgs/EdgeManagerWorkshop_MinimalistArchitecture.png)

### Installing some required libraries

In [None]:
!apt-get -y update && apt-get -y install build-essential procps
!pip install -U numpy sysv_ipc boto3 grpcio-tools grpcio protobuf sagemaker

In [None]:
import boto3
import tarfile
import os
import stat
import io
import time
import sagemaker

In [None]:
project_name='<>'

sm_client = boto3.client('sagemaker')

role = sagemaker.get_execution_role()
project_id = sm_client.describe_project(ProjectName=project_name)['ProjectId']
bucket_name = 'sagemaker-wind-turbine-farm-%s' % project_id
model_version = '1.0'
model_name = 'WindTurbineAnomalyDetection'
prefix='wind_turbine_anomaly'

!aws s3 cp s3://aws-ml-blog/artifacts/monitor-manage-anomaly-detection-model-wind-turbine-fleet-sagemaker-neo/model.tar.gz s3://$bucket_name/trained_model/

agent_config_package_prefix = 'wind_turbine_agent/config.tgz'
agent_version = '1.20210512.96da6cc'
agent_pkg_bucket = 'sagemaker-edge-release-store-us-west-2-linux-x64'

### We need to prepare the ML model to test with the agent
First let's compile the ML model with SageMaker Neo

In [None]:
compilation_job_name = 'wind-turbine-anomaly-%d' % int(time.time()*1000)
sm_client.create_compilation_job(
 CompilationJobName=compilation_job_name,
 RoleArn=role,
 InputConfig={
 'S3Uri': 's3://%s/trained_model/model.tar.gz' % bucket_name,
 'DataInputConfig': '{"input0":[1,6,10,10]}',
 'Framework': 'PYTORCH'
 },
 OutputConfig={
 'S3OutputLocation': 's3://%s/wind_turbine/optimized/' % bucket_name,
 'TargetPlatform': { 'Os': 'LINUX', 'Arch': 'X86_64' }
 },
 StoppingCondition={ 'MaxRuntimeInSeconds': 900 } 
)
while True:
 resp = sm_client.describe_compilation_job(CompilationJobName=compilation_job_name) 
 if resp['CompilationJobStatus'] in ['STARTING', 'INPROGRESS']:
 print('Running...')
 else:
 print(resp['CompilationJobStatus'], compilation_job_name)
 break
 time.sleep(5)

### Now we need to create a deployment package with SageMaker Edge Manager
This process will get the compilation job, sign the model and prepare a .tar.gz package that can be interpreted by the agent.

In [None]:
edge_packaging_job_name='wind-turbine-anomaly-%d' % int(time.time()*1000)
resp = sm_client.create_edge_packaging_job(
 EdgePackagingJobName=edge_packaging_job_name,
 CompilationJobName=compilation_job_name,
 ModelName=model_name,
 ModelVersion=model_version,
 RoleArn=role,
 OutputConfig={
 'S3OutputLocation': 's3://%s/%s/model/' % (bucket_name, prefix)
 }
)
while True:
 resp = sm_client.describe_edge_packaging_job(EdgePackagingJobName=edge_packaging_job_name) 
 if resp['EdgePackagingJobStatus'] in ['STARTING', 'INPROGRESS']:
 print('Running...')
 else:
 print(resp['EdgePackagingJobStatus'], compilation_job_name) 
 break
 time.sleep(5)

### Prepare the edge device
 1. First download the deployment package that contains the IoT + CA certificates and the configuration file of the SageMaker Edge Agent. 
 2. Then, download the SageMaker Edge Manager package and complete the deployment process.
 
 > You can see all the artifacts that will be loaded/executed by the virtual Edge Device in **agent/**

In [None]:
if not os.path.isdir('agent'):
 s3_client = boto3.client('s3')

 # Get the configuration package with certificates and config files
 with io.BytesIO() as file:
 s3_client.download_fileobj(bucket_name, agent_config_package_prefix, file)
 file.seek(0)
 # Extract the files
 tar = tarfile.open(fileobj=file)
 tar.extractall('.')
 tar.close() 

 # Download and install SageMaker Edge Manager
 agent_pkg_key = 'Releases/%s/%s.tgz' % (agent_version, agent_version)
 # get the agent package
 with io.BytesIO() as file:
 s3_client.download_fileobj(agent_pkg_bucket, agent_pkg_key, file)
 file.seek(0)
 # Extract the files
 tar = tarfile.open(fileobj=file)
 tar.extractall('agent')
 tar.close()
 # Adjust the permissions
 os.chmod('agent/bin/sagemaker_edge_agent_binary', stat.S_IXUSR|stat.S_IWUSR|stat.S_IXGRP|stat.S_IWGRP)

In [None]:
# by using protoc, we can generate stubs (client api) for connecting to the agent and invoking its API
!python3 -m grpc_tools.protoc --proto_path=agent/docs/api --python_out=app/ --grpc_python_out=app/ agent/docs/api/agent.proto

In [None]:
s3_url = 's3://%s/%s/model/%s-%s.tar.gz' % (bucket_name, prefix, model_name, model_version)
!mkdir -p agent/model/dev/$model_name/$model_version
!aws s3 cp $s3_url /tmp/model.tar.gz
!tar -xzvf /tmp/model.tar.gz -C agent/model/dev/$model_name/$model_version

### SageMaker Edge Agent - local directory structure
```
agent
└───certificates
│ └───root
│ │ <>.pem # CA certificate used by Edge Manager to sign the model
│ │
│ └───iot
│ edge_device_<>_cert.pem # IoT certificate
│ edge_device_<>_key.pem # IoT private key
│ edge_device_<>_pub.pem # IoT public key
│ ...
│ 
└───conf
│ config_edge_device_<>.json # Edge Manager config file
│ ...
│
└───model 
│ └───<>
│ └───<>
│ └───<> # Artifacts from the Edge Manager model package
│ sagemaker_edge_manifest
│ ...
│
└───logs
│ agent<>.log # Logs collected by the local application
│ ...
app
 agent_pb2_grpc.py # grpc stubs generated by protoc
 agent_pb2.py # agent stubs generated by protoc
 ...
```

#### SageMaker Edge Agent (device0) config file

In [None]:
!pygmentize agent/conf/config_edge_device_0.json

### Alright. You have all the resources/libraries required for the experiments
Let's get started

In [None]:
import sys
sys.path.insert(1, 'app')
import numpy as np
import subprocess
import grpc
import time
import os
import uuid

# Loading the stubs - agent python client
import agent_pb2 as agent
import agent_pb2_grpc as agent_grpc

#### The next cell will run the agent as a Linux process (in background)

In [None]:
channel_path='/tmp/agent_dev'
if os.path.exists(channel_path): os.remove(channel_path)
cmd = './agent/bin/sagemaker_edge_agent_binary -c agent/conf/config_edge_device_0.json -a %s' % channel_path
print(cmd)
logs = open("agent/logs/agent0.log", "+w")
proc = subprocess.Popen(cmd.split(' '), stdout=logs)
time.sleep(2)
!ps aux --cols 300|grep sagemaker_edge_agent

In [None]:
# Connecting to the agent
channel = grpc.insecure_channel('unix://%s' % channel_path )
client = agent_grpc.AgentStub(channel)
model_name='WindTurbineAnomalyDetection'
model_path='agent/model/dev/WindTurbineAnomalyDetection/1.0'

### Helper functions
These functions will wrap the GRPC calls: 
 - create a request
 - invoke the api
 - process the response and return

In [None]:
def list_models(cli):
 resp = cli.ListModels(agent.ListModelsRequest())
 return {m.name:{'in': m.input_tensor_metadatas, 'out': m.output_tensor_metadatas} for m in resp.models}

In [None]:
def load_model(cli, model_name, model_path):
 """ Load a new model into the Edge Agent if not loaded yet"""
 try:
 req = agent.LoadModelRequest()
 req.url = model_path
 req.name = model_name
 return cli.LoadModel(req) 
 except Exception as e:
 print(e) 
 return None

In [None]:
def unload_model(cli, model_name):
 """ UnLoad model from the Edge Agent"""
 try:
 req = agent.UnLoadModelRequest()
 req.name = model_name
 resp = cli.UnLoadModel(req)
 return resp
 except Exception as e:
 print(e)
 return None

In [None]:
def predict(cli, model_name, x, shm=False):
 """
 Invokes the model and get the predictions
 """
 try:
 model_map = list_models(cli)
 if model_map.get(model_name) is None:
 raise Exception('Model %s not loaded' % model_name)
 # Create a request
 req = agent.PredictRequest()
 req.name = model_name
 # Then load the data into a temp Tensor
 tensor = agent.Tensor()
 meta = model_map[model_name]['in'][0]
 tensor.tensor_metadata.name = meta.name
 tensor.tensor_metadata.data_type = meta.data_type
 for s in meta.shape: tensor.tensor_metadata.shape.append(s)
 
 if shm:
 tensor.shared_memory_handle.offset = 0
 tensor.shared_memory_handle.segment_id = x
 else:
 tensor.byte_data = x.astype(np.float32).tobytes()

 req.tensors.append(tensor)

 # Invoke the model
 resp = cli.Predict(req)

 # Parse the output
 meta = model_map[model_name]['out'][0]
 tensor = resp.tensors[0]
 data = np.frombuffer(tensor.byte_data, dtype=np.float32)
 return data.reshape(tensor.tensor_metadata.shape)
 except Exception as e:
 print(e) 
 return None

In [None]:
def create_tensor(x, tensor_name):
 if (x.dtype != np.float32):
 raise Exception( "It only supports numpy float32 arrays for this tensor" ) 
 tensor = agent.Tensor() 
 tensor.tensor_metadata.name = tensor_name.encode('utf-8')
 tensor.tensor_metadata.data_type = agent.FLOAT32
 for s in x.shape: tensor.tensor_metadata.shape.append(s)
 tensor.byte_data = x.tobytes()
 return tensor

In [None]:
def capture_data(cli, model_name, input_tensor, output_tensor):
 try:
 req = agent.CaptureDataRequest()
 req.model_name = model_name
 req.capture_id = str(uuid.uuid4())
 req.input_tensors.append( create_tensor(input_tensor, 'input') )
 req.output_tensors.append( create_tensor(output_tensor, 'output') )
 resp = cli.CaptureData(req)
 except Exception as e: 
 print(e)

In [None]:
def write_to_shm(sm, payload):
 if sm.attached: sm.detach()
 # set mode read/write
 sm.mode = 0o0600
 sm.attach()
 sm.write(payload.astype(np.float32).tobytes())
 # set mode read only
 sm.mode = 0o0400

### Loading and listing models

In [None]:
## Loading a model in the agent
load_model(client, model_name, model_path)

In [None]:
## List the loaded models
list_models(client)

### Running some predictions

In [None]:
import numpy as np

shape=(1,6,10,10)
payload_size=4 # float32
for i in shape: payload_size *= i
x = np.random.rand(*shape).astype(np.float32)

In [None]:
y = predict(client, model_name, x)
capture_data(client, model_name, x, y)
print(y.shape)

### Shared Memory Predictions
Protobuf/grpc does a good job by transporting the data from the client to the server. However, if this payload is larger than 4MB the performance decreases. So, using shared memory is a great alternative.

In this example you'll see how to load the payload to a reserved space in the device's shared memory. 

In [None]:
import sysv_ipc as ipc
key=42
sm=None
## create/reserve some space in the device's shared memory
try:
 sm = ipc.SharedMemory(key, mode=0o600, size = payload_size)
except ipc.ExistentialError as e:
 sm = ipc.SharedMemory(key, flags=ipc.IPC_CREX, size = payload_size)

In [None]:
write_to_shm(sm, x)
y = predict(client, model_name, sm.id, True)
capture_data(client, model_name, x, y)
print(y.shape)

### Stress test to compare 'normal' predictions vs shared memory predictions

In [None]:
start_time=time.time()
for i in range(1000):
 y = predict(client, model_name, x)
print('Elapsed time normal prediction: %fs' % ((time.time()-start_time)/200))
start_time=time.time()
for i in range(1000):
 write_to_shm(sm, x)
 y = predict(client, model_name, sm.id, True)
print('Elapsed time shared memory prediction: %fs' % ((time.time()-start_time)/200))

### Retrieve Captured Data from S3

In [None]:
import json
import boto3
import io
import base64
from datetime import datetime
import re

pattern = r'Tensor\[(\w+); (\d+), (\d+), (\d+), (\d+)\]'
s3_client = boto3.client('s3')
config_file = json.load(open('agent/conf/config_edge_device_0.json', 'r'))

device_fleet_name = config_file['sagemaker_edge_core_device_fleet_name']
bucket_name = config_file['sagemaker_edge_provider_s3_bucket_name']
bucket_prefix = config_file['sagemaker_edge_core_folder_prefix']

s3_prefix = '%s/%s/%s/%s/' % (bucket_prefix, device_fleet_name, model_name, datetime.now().strftime('%Y/%m/%d/%H') )

logs = s3_client.list_objects(Bucket=bucket_name, Prefix=s3_prefix)
if logs.get('Contents') is not None: 
 with io.BytesIO() as f:
 s3_client.download_fileobj(bucket_name, logs['Contents'][0]['Key'], f)
 f.seek(0)
 # each log is saved as a Json lines file
 try:
 log = json.loads(f.read())

 inputs = log['deviceFleetInputs'][0]
 outputs = log['deviceFleetOutputs'][0]

 # convert the data content back to bytes from base64
 input_data = base64.b64decode(inputs['data']) 
 output_data = base64.b64decode(outputs['data'])

 # get the input/output shapes
 m=re.match(pattern, inputs['observedContentType'])
 input_shape = [int(m.group(i)) for i in range(2,6)]
 m=re.match(pattern, outputs['observedContentType'])
 output_shape = [int(m.group(i)) for i in range(2,6)]

 # rebuild a numpy array with the stored data in the correct shape
 x = np.frombuffer(input_data, dtype=np.float32).reshape(input_shape)
 y = np.frombuffer(output_data, dtype=np.float32).reshape(output_shape)

 print(inputs['observedContentType'], inputs['name'], x.shape)
 print(outputs['observedContentType'], outputs['name'], y.shape)
 except Exception as e:
 print('Try to run capture_data a few more times')

## Unload the model and kill the process

In [None]:
unload_model(client, model_name)

In [None]:
proc.kill()

In [None]:
if sm is not None:
 sm.detach()
 sm.remove()
 sm = None

In [None]:
!rm -rf agent/model/dev/*

Now that you know how SageMaker Edge Manager/Agent works, it's time to build an end-to-end solution for ML@Edge

You can start exercise #1: visualizing the wind turbine data

 > [Exercise 01](01%20-%20Data%20Visualization.ipynb)