import dlr import time import cv2 import base64 import mxnet as mx import numpy as np from mxnet import gluon from gluoncv import utils from matplotlib import pyplot as plt class SagemakerNeoEvaluator(object): def __init__(self, test_image_path, raw_model_params_full_path, raw_model_symbol_full_path, neo_optimized_model_root_dir, short_size=416): """ constructor of sagemaker neo evaluator :param test_image_path: full path of test image with RGB channels :param raw_model_params_full_path: full path of original object detector model's parameters :param raw_model_symbol_full_path: full path of original object detector model's symbol :param neo_optimized_model_root_dir: the directory of Sagemaker Neo optimized model, the .so, .params, .json, manifest, .meta files are stored in this directory, all these files are compiled and generated by Sagemaker Neo :param short_size: resized short size of object detector's input, default value is 416, the longer side will be resized according to image's original width/height ratio """ # load test image self._test_image_path = test_image_path self._test_image_base64 = self.get_base64_encoding(full_path=self._test_image_path) # mxnet model without Sagemaker neo optimization self._raw_model_params_full_path = raw_model_params_full_path self._raw_model_symbol_full_path = raw_model_symbol_full_path # optimized model root directory with Sagemaker neo self._neo_optimized_model_root_dir = neo_optimized_model_root_dir self._short_size = short_size self._height, self._width, self._channels = -1, -1, -1 # load detector model (without Sagemaker neo optimization) self._human_body_detector_without_neo_optimization = gluon.nn.SymbolBlock.imports( symbol_file=self._raw_model_symbol_full_path, input_names=['data'], param_file=self._raw_model_params_full_path, ctx=mx.gpu() ) # load Sagemaker neo optimized model self._human_body_detector_with_neo_optimization = dlr.DLRModel(self._neo_optimized_model_root_dir, 'gpu', 0) def get_model_input(self): """ prepare model input data: decode image data from base64 string, and resize it to 1x3x416x624 shape (for example, the original image shape is 1280x1920x3, then resize the shorter side to 416, the longer side will be 624 pixels). In addition, image normalization is performed before feed into inference engine. :return: input for original object detector, input for sagemaker neo optimized object detector """ img = mx.img.imdecode(base64.b64decode(self._test_image_base64)) self._height, self._width, self._channels = img.shape[0], img.shape[1], img.shape[2] mean, std = (0.485, 0.456, 0.406), (0.229, 0.224, 0.225) resized_img = mx.image.resize_short(img, size=self._short_size) resized_rescaled_img = mx.nd.image.to_tensor(resized_img) resized_rescaled_normalized_img = mx.nd.image.normalize(resized_rescaled_img, mean=mean, std=std) raw_model_input = resized_rescaled_normalized_img.expand_dims(0) if mx.context.num_gpus() != 0: raw_model_input = raw_model_input.copyto(mx.gpu()) neo_model_input = resized_rescaled_normalized_img.asnumpy() return raw_model_input, neo_model_input @staticmethod def get_base64_encoding(full_path): """ encode the input image as base64 string :param full_path: full path of test image :return: """ with open(full_path, "rb") as f: data = f.read() image_base64_enc = base64.b64encode(data) image_base64_enc = str(image_base64_enc, 'utf-8') return image_base64_enc def eval_without_neo(self, x): """ inference based on original object detector, which is not optimized by Sagemaker Neo :param x: input image data for object detector :return: a dictionary which contains all detected information, such as bounding boxes, confidences, etc. """ # inference class_ids, mx_scores, mx_bounding_boxes = self._human_body_detector_without_neo_optimization(x) # post-process response = self.post_process(class_ids=class_ids, mx_scores=mx_scores, mx_bounding_boxes=mx_bounding_boxes) return response def eval_with_neo(self, x): """ inference based on Sagemaker Neo optimized object detector :param x: input image data for object detector :return: a dictionary which contains all detected information, such as bounding boxes, confidences, etc. """ # inference class_ids, mx_scores, mx_bounding_boxes = self._human_body_detector_with_neo_optimization.run(input_values=x) # post-process response = self.post_process(class_ids=class_ids, mx_scores=mx_scores, mx_bounding_boxes=mx_bounding_boxes) return response def post_process(self, class_ids, mx_scores, mx_bounding_boxes): """ rescale the bounding boxes back to its original space :param class_ids: ids of detected objects :param mx_scores: confidences of bounding boxes :param mx_bounding_boxes: bounding boxes of detected objects :return: detection response (dictionary) """ if not isinstance(class_ids, np.ndarray): class_ids = class_ids.asnumpy() if not isinstance(mx_scores, np.ndarray): mx_scores = mx_scores.asnumpy() if not isinstance(mx_bounding_boxes, np.ndarray): mx_bounding_boxes = mx_bounding_boxes.asnumpy() # rescale detection results back to original image size scale_ratio = self._short_size / self._height if self._height < self._width else self._short_size / self._width bbox_coords, bbox_scores = list(), list() for index, bbox in enumerate(mx_bounding_boxes[0]): prob = float(mx_scores[0][index][0]) if prob < 0.0: continue [x_min, y_min, x_max, y_max] = bbox x_min = int(x_min / scale_ratio) y_min = int(y_min / scale_ratio) x_max = int(x_max / scale_ratio) y_max = int(y_max / scale_ratio) bbox_coords.append([x_min, y_min, x_max, y_max]) bbox_scores.append([prob]) body = { 'width': self._width, 'height': self._height, 'channels': self._channels, 'bbox_scores': bbox_scores, # shape = (N, 1) 'bbox_coords': bbox_coords, # shape = (N, 4) } return body @staticmethod def visualize(full_path, bbox_coords, bbox_scores, label_name, save_path): bbox_coords = np.array(bbox_coords) bbox_scores = np.array(bbox_scores) image = cv2.imread(full_path, cv2.IMREAD_COLOR) image = image[:, :, ::-1] class_ids = np.zeros(shape=(bbox_scores.shape[0],)) ax = utils.viz.plot_bbox(image, bbox_coords, bbox_scores, class_ids, class_names=[label_name], thresh=0.25) plt.axis('off') plt.savefig(save_path, dpi=200, bbox_inches='tight', pad_inches=0) def evaluate_inference_speed(self, repeat_times, neo_optimized=True): """ object detector's inference speed evaluation :param repeat_times: repeating times for test :param neo_optimized: whether to using sagemaker neo optimized model, boolean variable :return: None """ image_data_for_raw_model, image_data_for_neo_optimized_model = self.get_model_input() _, channels, height, width = image_data_for_raw_model.shape if not neo_optimized: # warmup for i in range(100): _ = self.eval_without_neo(x=image_data_for_raw_model) # inference test t_start = time.time() for i in range(repeat_times): _ = self.eval_without_neo(x=image_data_for_raw_model) t_end = time.time() print('[NEO Optimization Disabled] Time Cost per Frame (input size = 1x{}x{}x{}) = {} ms'.format( channels, height, width, 1000 * (t_end - t_start) / repeat_times)) else: # warmup for i in range(100): _ = self.eval_with_neo(x=image_data_for_neo_optimized_model) t_start = time.time() for i in range(repeat_times): _ = self.eval_with_neo(x=image_data_for_neo_optimized_model) t_end = time.time() print('[NEO Optimization Enabled] Time Cost per Frame (input size = 1x{}x{}x{}) = {} ms'.format( channels, height, width, 1000 * (t_end - t_start) / repeat_times)) def inference_with_visualization(self, neo_optimized=True): """ object detector inference and dump the image with bounding boxes and confidences plotted :return: None """ image_data_for_raw_model, image_data_for_neo_optimized_model = self.get_model_input() if not neo_optimized: response = self.eval_without_neo(x=image_data_for_raw_model) save_path = 'body_det_vis.jpg' else: response = self.eval_with_neo(x=image_data_for_neo_optimized_model) save_path = 'body_det_vis_with_neo.jpg' # save image with bounding boxes plotted self.visualize( full_path=self._test_image_path, bbox_coords=response['bbox_coords'], bbox_scores=response['bbox_scores'], label_name='Person', save_path=save_path) if __name__ == '__main__': evaluator = SagemakerNeoEvaluator( test_image_path='./test_1280x1920x3.jpg', raw_model_params_full_path='./models/human_body_detector/body_detector_yolo3_mobilenet1.0_coco-0000.params', raw_model_symbol_full_path='./models/human_body_detector/body_detector_yolo3_mobilenet1.0_coco-symbol.json', neo_optimized_model_root_dir='./models/human_body_detector_neo/' ) evaluator.evaluate_inference_speed(repeat_times=200, neo_optimized=False) evaluator.evaluate_inference_speed(repeat_times=200, neo_optimized=True) evaluator.inference_with_visualization(neo_optimized=False) evaluator.inference_with_visualization(neo_optimized=True)