#***************************************************** # * # Copyright 2019 Amazon.com, Inc. or its affiliates. * # All Rights Reserved. * # * #***************************************************** from threading import Thread, Event import os import json import numpy as np import awscam import cv2 import greengrasssdk import mo from utils import LocalDisplay # Create an IoT client for sending to messages to the cloud. client = greengrasssdk.client('iot-data') iot_topic = '$aws/things/{}/infer'.format(os.environ['AWS_IOT_THING_NAME']) def inRestrictedSection(ImShape = None, R1 = None, restricted_region = None, kclass = None, score = None, threshold = None): statement = 'Person Not Detected in Restricted Zone' if (kclass == 1) and (score > threshold): Im1 = np.zeros((ImShape[0],ImShape[1],3), np.int32) cv2.fillPoly(Im1, [R1], 255) Im2 = np.zeros((ImShape[0],ImShape[1],3), np.int32) if restricted_region is None: restricted_region = np.array([[0,ImShape[0]],[ImShape[1],ImShape[0]],[ImShape[1],0], [0,0]], np.int32) cv2.fillPoly(Im2, [restricted_region], 255) Im = Im1 * Im2 if np.sum(np.greater(Im, 0))>0: statement = 'Person Detected in Restricted Zone' else: statement = statement return statement def infinite_infer_run(): """ Entry point of the lambda function""" try: # This object detection model is implemented as single shot detector (ssd), since # the number of labels is small we create a dictionary that will help us convert # the machine labels to human readable labels. model_type = 'ssd' with open('labels.txt', 'r') as labels_file: output_map = [class_label for class_label in labels_file] # Create a local display instance that will dump the image bytes to a FIFO # file that the image can be rendered locally. local_display = LocalDisplay('480p') local_display.start() # The height and width of the training set images INPUT_W = 300 INPUT_H = 300 client.publish(topic=iot_topic, payload='Optimizing model...') ret, model_path = mo.optimize('deploy_ssd_resnet50_300', INPUT_W, INPUT_H) # Load the model onto the GPU. client.publish(topic=iot_topic, payload='Loading model...') model = awscam.Model(model_path, {'GPU': 1}) client.publish(topic=iot_topic, payload='Object detection model loaded') # Set the threshold for detection detection_threshold = 0.20 # Do inference until the lambda is killed. while True: # Get a frame from the video stream ret, frame = awscam.getLastFrame() if not ret: raise Exception('Failed to get frame from the stream') # Resize frame to the same size as the training set. frame_resize = cv2.resize(frame, (INPUT_H, INPUT_W)) # Run the images through the inference engine and parse the results using # the parser API, note it is possible to get the output of doInference # and do the parsing manually, but since it is a ssd model, # a simple API is provided. parsed_inference_results = model.parseResult(model_type, model.doInference(frame_resize)) # Compute the scale in order to draw bounding boxes on the full resolution # image. yscale = float(frame.shape[0]) / float(INPUT_H) xscale = float(frame.shape[1]) / float(INPUT_W) # Dictionary to be filled with labels and probabilities for MQTT cloud_output = {} # Get the detected objects and probabilities #If you want to add custom region of restricted region, add coordinates of the vertices of the region #coordinates are defined by X-axis,Y-axis and can be clockwise or counter-clockwise. restricted_region = None #restricted_region = np.array([[0,200],[100,200],[100,0], [10,10]], np.int32) for obj in parsed_inference_results[model_type]: if obj['prob'] > detection_threshold: # Add bounding boxes to full resolution frame xmin = int(xscale * obj['xmin']) ymin = int(yscale * obj['ymin']) xmax = int(xscale * obj['xmax']) ymax = int(yscale * obj['ymax']) # See https://docs.opencv.org/3.4.1/d6/d6e/group__imgproc__draw.html # for more information about the cv2.rectangle method. # Method signature: image, point1, point2, color, and tickness. cv2.rectangle(frame, (xmin, ymin), (xmax, ymax), (255, 165, 20), 10) R1 = np.array([[xmin,ymin],[xmax,ymin],[xmax,ymax], [xmin,ymax]], np.int32) pedestrian_safety_alarm = inRestrictedSection(frame.shape,R1 = R1, restricted_region= restricted_region, kclass = obj['label'], score = obj['prob'], threshold=detection_threshold) # Amount to offset the label/probability text above the bounding box. text_offset = 15 # See https://docs.opencv.org/3.4.1/d6/d6e/group__imgproc__draw.html # for more information about the cv2.putText method. # Method signature: image, text, origin, font face, font scale, color, # and tickness cv2.putText(frame, "{}: {:.2f}%".format(output_map[obj['label']], obj['prob'] * 100), (xmin, ymin-text_offset), cv2.FONT_HERSHEY_SIMPLEX, 2.5, (255, 165, 20), 6) # Store label and probability to send to cloud if pedestrian_safety_alarm == 'Person Detected in Restricted Zone': cloud_output[output_map[obj['label']]] = obj['prob']*100 else: cloud_output[output_map[obj['label']]] = 0.0 # Set the next frame in the local display stream. local_display.set_frame_data(frame) # Send results to the cloud client.publish(topic=iot_topic, payload=json.dumps(cloud_output)) except Exception as ex: client.publish(topic=iot_topic, payload='Error in object detection lambda: {}'.format(ex)) infinite_infer_run() # This is a dummy handler and will not be invoked # Instead the code above will be executed in an infinite loop for our example def lambda_handler(event, context): return