'''This module contains the available sensors for the sim app''' import logging import rospy import numpy as np from sensor_msgs.msg import Image as sensor_image from sensor_msgs.msg import LaserScan from PIL import Image from markov.sensors.utils import get_observation_space, get_front_camera_embedders, \ get_left_camera_embedders, get_stereo_camera_embedders, \ get_lidar_embedders, get_observation_embedder from markov.sensors.sensor_interface import SensorInterface, LidarInterface from markov.environments.constants import TRAINING_IMAGE_SIZE from markov.architecture.constants import Input from markov.log_handler.deepracer_exceptions import GenericRolloutException, GenericError from markov import utils from markov.log_handler.logger import Logger from markov.log_handler.constants import SIMAPP_SIMULATION_WORKER_EXCEPTION LOGGER = Logger(__name__, logging.INFO).get_logger() class SensorFactory(object): '''This class implements a sensot factory and is used to create sensors per agent. ''' @staticmethod def create_sensor(racecar_name, sensor_type, config_dict): '''Factory method for creating sensors type - String containing the desired sensor type kwargs - Meta data, usually containing the topics to subscribe to, the concrete sensor classes are responsible for checking the topics. ''' if sensor_type == Input.CAMERA.value: return Camera(racecar_name) elif sensor_type == Input.LEFT_CAMERA.value: return LeftCamera(racecar_name) elif sensor_type == Input.STEREO.value: return DualCamera(racecar_name) elif sensor_type == Input.LIDAR.value: return Lidar(racecar_name) elif sensor_type == Input.SECTOR_LIDAR.value: return SectorLidar(racecar_name) elif sensor_type == Input.DISCRETIZED_SECTOR_LIDAR.value: return DiscretizedSectorLidar(racecar_name, config_dict) elif sensor_type == Input.OBSERVATION.value: return Observation(racecar_name) else: raise GenericRolloutException("Unknown sensor") class Camera(SensorInterface): '''Single camera sensor''' def __init__(self, racecar_name, timeout=120.0): """Camera sensor constructor Args: racecar_name (str): racecar name timeout (float): time limit for buffer get method until error out. The reason why using 120.0 as default value for sensor is because virtual event dynamic spawning for sensor can take up to 60.0 seconds to be alive. """ self.image_buffer = utils.DoubleBuffer() rospy.Subscriber('/{}/camera/zed/rgb/image_rect_color'.format(racecar_name), sensor_image, self._camera_cb_) self.raw_data = None self.sensor_type = Input.CAMERA.value self.timeout = timeout def get_observation_space(self): try: return get_observation_space(Input.CAMERA.value) except GenericError as ex: ex.log_except_and_exit(SIMAPP_SIMULATION_WORKER_EXCEPTION) except Exception as ex: raise GenericRolloutException('{}'.format(ex)) def get_state(self, block=True): try: # Make sure the first image is the starting image image_data = self.image_buffer.get(block=block, timeout=self.timeout) # Read the image and resize to get the state image = Image.frombytes('RGB', (image_data.width, image_data.height), image_data.data, 'raw', 'RGB', 0, 1) image = image.resize(TRAINING_IMAGE_SIZE, resample=2) self.raw_data = image_data return {Input.CAMERA.value: np.array(image)} except utils.DoubleBuffer.Empty: return {} except Exception as ex: raise GenericRolloutException("Unable to set state: {}".format(ex)) def reset(self): self.image_buffer.clear() def get_input_embedders(self, network_type): try: return get_front_camera_embedders(network_type) except GenericError as ex: ex.log_except_and_exit(SIMAPP_SIMULATION_WORKER_EXCEPTION) except Exception as ex: raise GenericRolloutException('{}'.format(ex)) def _camera_cb_(self, data): ''' Callback for the single camera, this is triggered by ROS data - Image data from the gazebo plugin, it is a sensor message ''' try: self.image_buffer.put(data) except Exception as ex: LOGGER.info("Unable to retrieve frame: %s", ex) class Observation(SensorInterface): '''Single camera sensor that is compatible with simapp v1''' def __init__(self, racecar_name, timeout=120.0): """Observation sensor constructor Args: racecar_name (str): racecar name timeout (float): time limit for buffer get method until error out. The reason why using 120.0 as default value for sensor is because virtual event dynamic spawning for sensor can take up to 60.0 seconds to be alive. """ self.image_buffer = utils.DoubleBuffer() rospy.Subscriber('/{}/camera/zed/rgb/image_rect_color'.format(racecar_name), sensor_image, self._camera_cb_) self.sensor_type = Input.OBSERVATION.value self.raw_data = None self.timeout = timeout def get_observation_space(self): try: return get_observation_space(Input.OBSERVATION.value) except GenericError as ex: ex.log_except_and_exit(SIMAPP_SIMULATION_WORKER_EXCEPTION) except Exception as ex: raise GenericRolloutException('{}'.format(ex)) def get_state(self, block=True): try: # Make sure the first image is the starting image image_data = self.image_buffer.get(block=block, timeout=self.timeout) # Read the image and resize to get the state image = Image.frombytes('RGB', (image_data.width, image_data.height), image_data.data, 'raw', 'RGB', 0, 1) image = image.resize(TRAINING_IMAGE_SIZE, resample=2) self.raw_data = image_data return {Input.OBSERVATION.value: np.array(image)} except utils.DoubleBuffer.Empty: return {} except Exception as ex: raise GenericRolloutException("Unable to set state: {}".format(ex)) def reset(self): self.image_buffer.clear() def get_input_embedders(self, network_type): try: return get_observation_embedder() except GenericError as ex: ex.log_except_and_exit(SIMAPP_SIMULATION_WORKER_EXCEPTION) except Exception as ex: raise GenericRolloutException('{}'.format(ex)) def _camera_cb_(self, data): ''' Callback for the single camera, this is triggered by ROS data - Image data from the gazebo plugin, it is a sensor message ''' try: self.image_buffer.put(data) except Exception as ex: LOGGER.info("Unable to retrieve frame: %s", ex) class LeftCamera(SensorInterface): '''This class is specific to left camera's only, it used the same topic as the camera class but has a different observation space. If this changes in the future this class should be updated. ''' def __init__(self, racecar_name, timeout=120.0): """LeftCamera sensor constructor Args: racecar_name (str): racecar name timeout (float): time limit for buffer get method until error out. The reason why using 120.0 as default value for sensor is because virtual event dynamic spawning for sensor can take up to 60.0 seconds to be alive. """ self.image_buffer = utils.DoubleBuffer() rospy.Subscriber('/{}/camera/zed/rgb/image_rect_color'.format(racecar_name), sensor_image, self._camera_cb_) self.sensor_type = Input.LEFT_CAMERA.value self.raw_data = None self.timeout = timeout def get_observation_space(self): try: return get_observation_space(Input.LEFT_CAMERA.value) except GenericError as ex: ex.log_except_and_exit(SIMAPP_SIMULATION_WORKER_EXCEPTION) except Exception as ex: raise GenericRolloutException('{}'.format(ex)) def get_state(self, block=True): try: # Make sure the first image is the starting image image_data = self.image_buffer.get(block=block, timeout=self.timeout) # Read the image and resize to get the state image = Image.frombytes('RGB', (image_data.width, image_data.height), image_data.data, 'raw', 'RGB', 0, 1) image = image.resize(TRAINING_IMAGE_SIZE, resample=2) self.raw_data = image_data return {Input.LEFT_CAMERA.value: np.array(image)} except utils.DoubleBuffer.Empty: return {} except Exception as ex: raise GenericRolloutException("Unable to set state: {}".format(ex)) def reset(self): self.image_buffer.clear() def get_input_embedders(self, network_type): try: return get_left_camera_embedders(network_type) except GenericError as ex: ex.log_except_and_exit(SIMAPP_SIMULATION_WORKER_EXCEPTION) except Exception as ex: raise GenericRolloutException('{}'.format(ex)) def _camera_cb_(self, data): ''' Callback for the single camera, this is triggered by ROS data - Image data from the gazebo plugin, it is a sensor message ''' try: self.image_buffer.put(data) except Exception as ex: LOGGER.info("Unable to retrieve frame: %s", ex) class DualCamera(SensorInterface): '''This class handles the data for dual cameras''' def __init__(self, racecar_name, timeout=120.0): """DualCamera sensor constructor Args: racecar_name (str): racecar name timeout (float): time limit for buffer get method until error out. The reason why using 120.0 as default value for sensor is because virtual event dynamic spawning for sensor can take up to 60.0 seconds to be alive. """ # Queue used to maintain image consumption synchronicity self.image_buffer_left = utils.DoubleBuffer() self.image_buffer_right = utils.DoubleBuffer() # Set up the subscribers rospy.Subscriber('/{}/camera/zed/rgb/image_rect_color'.format(racecar_name), sensor_image, self._left_camera_cb_) rospy.Subscriber('/{}/camera/zed_right/rgb/image_rect_color_right'.format(racecar_name), sensor_image, self._right_camera_cb_) self.sensor_type = Input.STEREO.value self.timeout = timeout def get_observation_space(self): try: return get_observation_space(Input.STEREO.value) except GenericError as ex: ex.log_except_and_exit(SIMAPP_SIMULATION_WORKER_EXCEPTION) except Exception as ex: raise GenericRolloutException('{}'.format(ex)) def get_state(self, block=True): try: image_data = self.image_buffer_left.get(block=block, timeout=self.timeout) left_img = Image.frombytes('RGB', (image_data.width, image_data.height), image_data.data, 'raw', 'RGB', 0, 1) left_img = left_img.resize(TRAINING_IMAGE_SIZE, resample=2).convert('L') image_data = self.image_buffer_right.get(block=block, timeout=self.timeout) right_img = Image.frombytes('RGB', (image_data.width, image_data.height), image_data.data, 'raw', 'RGB', 0, 1) right_img = right_img.resize(TRAINING_IMAGE_SIZE, resample=2).convert('L') return {Input.STEREO.value: np.array(np.stack((left_img, right_img), axis=2))} except utils.DoubleBuffer.Empty: return {} except Exception as ex: raise GenericRolloutException("Unable to set state: {}".format(ex)) def reset(self): self.image_buffer_left.clear() self.image_buffer_right.clear() def get_input_embedders(self, network_type): try: return get_stereo_camera_embedders(network_type) except GenericError as ex: ex.log_except_and_exit(SIMAPP_SIMULATION_WORKER_EXCEPTION) except Exception as ex: raise GenericRolloutException('{}'.format(ex)) def _left_camera_cb_(self, data): ''' Callback for the left camera, this is triggered by ROS data - Image data from the gazebo plugin, it is a sensor message ''' try: self.image_buffer_left.put(data) except Exception as ex: LOGGER.info("Unable to retrieve frame: %s", ex) def _right_camera_cb_(self, data): ''' Callback for the right camera, this is triggered by ROS data - Image data from the gazebo plugin, it is a sensor message ''' try: self.image_buffer_right.put(data) except Exception as ex: LOGGER.info("Unable to retrieve frame: %s", ex) class Lidar(LidarInterface): '''This class handles the data collection for lidar''' def __init__(self, racecar_name, timeout=120.0): """Lidar sensor constructor Args: racecar_name (str): racecar name timeout (float): time limit for buffer get method until error out. The reason why using 120.0 as default value for sensor is because virtual event dynamic spawning for sensor can take up to 60.0 seconds to be alive. """ self.data_buffer = utils.DoubleBuffer(clear_data_on_get=False) rospy.Subscriber('/{}/scan'.format(racecar_name), LaserScan, self._scan_cb) self.sensor_type = Input.LIDAR.value self.timeout = timeout def get_observation_space(self): try: return get_observation_space(self.sensor_type) except GenericError as ex: ex.log_except_and_exit(SIMAPP_SIMULATION_WORKER_EXCEPTION) except Exception as ex: raise GenericRolloutException('{}'.format(ex)) def get_state(self, block=True): try: return {self.sensor_type: self.data_buffer.get(block=block, timeout=self.timeout)} except utils.DoubleBuffer.Empty: # For Lidar, we always call non-blocking get_state instead of # block-waiting for new state, and the expectation is # we always have outdated state data to be read in the worst case as # we don't clear the data on get for DoubleBuffer (Refer to __init__). # Thus, the expectation is utils.DoubleBuffer.Empty will be never raised. # However, there can be an edge case for first call of get_state as DoubleBuffer may be # Empty, in such case, this may cause issue for the inference due to # incompatible input to NN. Thus, we should get sensor data with blocking if # DoubleBuffer.Empty is raised. return {self.sensor_type: self.data_buffer.get(block=True, timeout=self.timeout)} except Exception as ex: raise GenericRolloutException("Unable to set state: {}".format(ex)) def reset(self): self.data_buffer.clear() def get_input_embedders(self, network_type): try: return get_lidar_embedders(network_type, self.sensor_type) except GenericError as ex: ex.log_except_and_exit(SIMAPP_SIMULATION_WORKER_EXCEPTION) except Exception as ex: raise GenericRolloutException('{}'.format(ex)) def _scan_cb(self, data): try: self.data_buffer.put(np.array(data.ranges)) except Exception as ex: LOGGER.info("Unable to retrieve state: %s", ex) class SectorLidar(LidarInterface): '''This class handles the data collection for sector lidar''' def __init__(self, racecar_name, timeout=120.0): """SectorLidar sensor constructor Args: racecar_name (str): racecar name timeout (float): time limit for buffer get method until error out. The reason why using 120.0 as default value for sensor is because virtual event dynamic spawning for sensor can take up to 60.0 seconds to be alive. """ self.data_buffer = utils.DoubleBuffer(clear_data_on_get=False) rospy.Subscriber('/{}/scan'.format(racecar_name), LaserScan, self._scan_cb) self.sensor_type = Input.SECTOR_LIDAR.value self.timeout = timeout def get_observation_space(self): try: return get_observation_space(self.sensor_type) except GenericError as ex: ex.log_except_and_exit(SIMAPP_SIMULATION_WORKER_EXCEPTION) except Exception as ex: raise GenericRolloutException('{}'.format(ex)) def get_state(self, block=True): try: return {self.sensor_type: self.data_buffer.get(block=block, timeout=self.timeout)} except utils.DoubleBuffer.Empty: # For Lidar, we always call non-blocking get_state instead of # block-waiting for new state, and the expectation is # we always have outdated state data to be read in the worst case as # we don't clear the data on get for DoubleBuffer (Refer to __init__). # Thus, the expectation is utils.DoubleBuffer.Empty will be never raised. # However, there can be an edge case for first call of get_state as DoubleBuffer may be # Empty, in such case, this may cause issue for the inference due to # incompatible input to NN. Thus, we should get sensor data with blocking if # DoubleBuffer.Empty is raised. return {self.sensor_type: self.data_buffer.get(block=True, timeout=self.timeout)} except Exception as ex: raise GenericRolloutException("Unable to set state: {}".format(ex)) def reset(self): self.data_buffer.clear() def get_input_embedders(self, network_type): try: return get_lidar_embedders(network_type, self.sensor_type) except GenericError as ex: ex.log_except_and_exit(SIMAPP_SIMULATION_WORKER_EXCEPTION) except Exception as ex: raise GenericRolloutException('{}'.format(ex)) def _scan_cb(self, data): try: self.data_buffer.put(np.array(data.ranges)) except Exception as ex: LOGGER.info("Unable to retrieve state: %s", ex) class DiscretizedSectorLidar(LidarInterface): '''This class handles the data collection for sector lidar''' def __init__(self, racecar_name, config, timeout=120.0): """DiscretizedSectorLidar sensor constructor Args: racecar_name (str): racecar name timeout (float): time limit for buffer get method until error out. The reason why using 120.0 as default value for sensor is because virtual event dynamic spawning for sensor can take up to 60.0 seconds to be alive. """ self.data_buffer = utils.DoubleBuffer(clear_data_on_get=False) rospy.Subscriber('/{}/scan'.format(racecar_name), LaserScan, self._scan_cb) self.sensor_type = Input.DISCRETIZED_SECTOR_LIDAR.value self.model_metadata = config["model_metadata"] self.timeout = timeout def get_observation_space(self): try: return get_observation_space(self.sensor_type, self.model_metadata) except GenericError as ex: ex.log_except_and_exit(SIMAPP_SIMULATION_WORKER_EXCEPTION) except Exception as ex: raise GenericRolloutException('{}'.format(ex)) def get_state(self, block=True): try: return {self.sensor_type: self.data_buffer.get(block=block, timeout=self.timeout)} except utils.DoubleBuffer.Empty: # For Lidar, we always call non-blocking get_state instead of # block-waiting for new state, and the expectation is # we always have outdated state data to be read in the worst case as # we don't clear the data on get for DoubleBuffer (Refer to __init__). # Thus, the expectation is utils.DoubleBuffer.Empty will be never raised. # However, there can be an edge case for first call of get_state as DoubleBuffer may be # Empty, in such case, this may cause issue for the inference due to # incompatible input to NN. Thus, we should get sensor data with blocking if # DoubleBuffer.Empty is raised. return {self.sensor_type: self.data_buffer.get(block=True, timeout=self.timeout)} except Exception as ex: raise GenericRolloutException("Unable to set state: {}".format(ex)) def reset(self): self.data_buffer.clear() def get_input_embedders(self, network_type): try: return get_lidar_embedders(network_type, self.sensor_type) except GenericError as ex: ex.log_except_and_exit(SIMAPP_SIMULATION_WORKER_EXCEPTION) except Exception as ex: raise GenericRolloutException('{}'.format(ex)) def _scan_cb(self, data): try: self.data_buffer.put(np.array(data.ranges)) except Exception as ex: LOGGER.info("Unable to retrieve state: %s", ex)