options: parameters: author: pankovvp category: '[GRC Hier Blocks]' cmake_opt: '' comment: '' copyright: '' description: '' gen_cmake: 'On' gen_linking: dynamic generate_options: qt_gui hier_block_src_path: '.:' id: mqtt_tester max_nouts: '0' output_language: python placement: (0,0) qt_qss_theme: '' realtime_scheduling: '1' run: 'True' run_command: '{python} -u {filename}' run_options: prompt sizing_mode: fixed thread_safe_setters: '' title: mqtt_tester window_size: '' states: bus_sink: false bus_source: false bus_structure: null coordinate: [8, 12.0] rotation: 0 state: enabled blocks: - name: debug id: variable parameters: comment: '' value: 'False' states: bus_sink: false bus_source: false bus_structure: null coordinate: [1192, 28.0] rotation: 0 state: enabled - name: srs3_payload_size_bytes id: variable parameters: comment: '' value: '217' states: bus_sink: false bus_source: false bus_structure: null coordinate: [8, 148.0] rotation: 0 state: enabled - name: blocks_message_debug_0 id: blocks_message_debug parameters: affinity: '' alias: '' comment: '' states: bus_sink: false bus_source: false bus_structure: null coordinate: [360, 368.0] rotation: 180 state: enabled - name: blocks_message_strobe_0 id: blocks_message_strobe parameters: affinity: '' alias: '' comment: generate test data maxoutbuf: '0' minoutbuf: '0' msg: pmt.intern("000000000011111111110000000000111111111100000000001111111111000000000011111111110000000000111111111100000000001111111111000000000011111111110000000000111111111188888888") period: '1000' states: bus_sink: false bus_source: false bus_structure: null coordinate: [256, 236.0] rotation: 0 state: enabled - name: blocks_random_pdu_0 id: blocks_random_pdu parameters: affinity: '' alias: '' comment: '' length_modulo: '1' mask: '0xFF' maxoutbuf: '0' maxsize: srs3_payload_size_bytes minoutbuf: '0' minsize: srs3_payload_size_bytes states: bus_sink: false bus_source: false bus_structure: null coordinate: [576, 220.0] rotation: 0 state: enabled - name: mqtt_broker_credentials_name id: parameter parameters: alias: '' comment: '' hide: none label: mqtt_broker_credentials_name short_id: '' type: str value: MqttCredentials states: bus_sink: false bus_source: false bus_structure: null coordinate: [832, 20.0] rotation: 0 state: true - name: mqtt_broker_credentials_region id: parameter parameters: alias: '' comment: '' hide: none label: mqtt_broker_credentials_region short_id: '' type: str value: eu-west-1 states: bus_sink: false bus_source: false bus_structure: null coordinate: [1000, 20.0] rotation: 0 state: true - name: mqtt_broker_ip id: parameter parameters: alias: '' comment: '' hide: none label: mqtt_broker_ip short_id: '' type: str value: mqtt-broker-nlb-private-7c02e81562f22ed0.elb.eu-west-1.amazonaws.com states: bus_sink: false bus_source: false bus_structure: null coordinate: [208, 20.0] rotation: 0 state: true - name: mqtt_downlink_topic id: parameter parameters: alias: '' comment: '' hide: none label: mqtt_downlink_topic short_id: '' type: str value: downlink states: bus_sink: false bus_source: false bus_structure: null coordinate: [400, 20.0] rotation: 0 state: true - name: mqtt_pub_0 id: epy_block parameters: _source_code: "\nimport paho.mqtt.client as mqtt \nimport boto3\nimport base64\n\ from botocore.exceptions import ClientError\nimport logging\n\nimport pmt\n\ import json\nfrom datetime import datetime\nfrom gnuradio import gr\nimport\ \ numpy as np\n\nimport sys\n\n\ndef on_connect(client, userdata, flags, rc):\ \ \n if str(rc)=='0':\n gr.log.info(\"[mqtt publish] Connection successful\"\ )\n if str(rc)=='1':\n gr.log.info(\"[mqtt publish] Connection refused\ \ - incorrect protocol version\")\n if str(rc)=='2':\n gr.log.info(\"\ [mqtt publish] Connection refused - invalid client indentifier\") \n if\ \ str(rc)=='3':\n gr.log.info(\"[mqtt publish] Connection refused - server\ \ unavailable\")\n if str(rc)=='4':\n gr.log.info(\"[mqtt publish]\ \ Connection refused - bad username or password\")\n if str(rc)=='5':\n\ \ gr.log.info(\"[mqtt publish] Connection refused - not authorised\"\ )\n\ndef on_disconnect(client, userdata, rc):\n if rc !=0:\n gr.log.info(\"\ [mqtt publish] Unexpected disconnect. Attempting to reconnect.\")\n\n\ndef get_secret(secret_name,\ \ region_name):\n\n # Create a Secrets Manager client\n session = boto3.session.Session()\n\ \ client = session.client(service_name='secretsmanager', region_name=region_name)\n\ \n try:\n get_secret_value_response = client.get_secret_value(\n \ \ SecretId=secret_name\n )\n except ClientError as e:\n\ \ if e.response['Error']['Code'] == 'DecryptionFailureException':\n \ \ gr.log.info(\"[mqtt publish] Secrets Manager can't decrypt the protected\ \ secret text using the provided KMS key\")\n raise e\n elif\ \ e.response['Error']['Code'] == 'InternalServiceErrorException':\n \ \ gr.log.info(\"[mqtt publish] An error occurred on the server side.\")\n\ \ raise e\n elif e.response['Error']['Code'] == 'InvalidParameterException':\n\ \ gr.log.info(\"[mqtt publish] You provided an invalid value for\ \ a parameter.\")\n raise e\n elif e.response['Error']['Code']\ \ == 'InvalidRequestException':\n gr.log.info(\"[mqtt publish] You\ \ provided a parameter value that is not valid for the current state of the\ \ resource.\")\n raise e\n elif e.response['Error']['Code']\ \ == 'ResourceNotFoundException':\n gr.log.info(\"[mqtt publish]\ \ We can't find the resource that you asked for.\")\n raise e\n \ \ else:\n # Decrypts secret using the associated KMS CMK.\n \ \ # Depending on whether the secret is a string or binary, one of these fields\ \ will be populated.\n if 'SecretString' in get_secret_value_response:\n\ \ secret = get_secret_value_response['SecretString']\n \ \ return secret\n else:\n decoded_binary_secret = base64.b64decode(get_secret_value_response['SecretBinary'])\n\ \ return decoded_binary_secret\n\n\nclass mqtt_pub(gr.basic_block):\n\ \ \"\"\"\n docstring for block mqtt_pub\n \n make(str hostname,\ \ int port, str topic, int qos, str secret_name, str region_name, bool debug)\n\ \ >mqtt subscribe \n Publishes to topic on port with specified qos. \n\ \ Pulls credentials from secret_name/ region_name. \n debug enables logging\ \ of messages before publication on topic. \n Args:\n hostname:\n \ \ port:\n topic:\n qos:\n secret_name:\n region_name:\n\ \ debug: \n \"\"\"\n def __init__(self, hostname=\"172.31.1.87\"\ , port=1883, topic=\"downlink\", qos=1, secret_name=\"MqttCredentials\", region_name=\"\ eu-west-1\", debug=False):\n gr.sync_block.__init__(self, name=\"mqtt\ \ publish\", in_sig=None, out_sig=None)\n self.topic = topic\n \ \ self.secret_name = secret_name\n self.region_name = region_name\n\ \ self.qos = qos\n self.port = port\n self.debug = debug\n\ \n #logging.basicConfig(level=logging.DEBUG) \n #self.logger =\ \ logging.getLogger(__name__) \n \n self.client = mqtt.Client()\n\ \ #self.client.enable_logger(self.logger)\n gr.log.info(\"[mqtt\ \ publish] Getting broker credentials from AWS Secrets Manager.\")\n \ \ secret = get_secret(secret_name, region_name)\n username = secret.split(\"\ ,\")[0].split(\":\")[1].strip('\"')\n password = secret.split(\",\")[1].split(\"\ :\")[1].strip(\"}\").strip('\"')\n self.client.username_pw_set(username,\ \ password)\n gr.log.info(\"[mqtt publish] Connecting to hostname: \"\ \ + hostname + \" on port \" + str(port))\n self.client.connect_async(hostname,\ \ port)\n self.client.on_connect = on_connect\n self.client.on_disconnect\ \ = on_disconnect\n self.client.loop_start()\n\n\n #Register message\ \ ports and handler function\n self.message_port_register_in(pmt.intern('pdu_in'))\n\ \ self.set_msg_handler(pmt.intern('pdu_in'), self.handle_msg)\n\n\n \ \ def handle_msg(self, msg):\n # transform from GNU Radio polymorphic\ \ data type to numpy array\n msg = pmt.to_python(msg)[1]\n \n\ \ # serialize numpy array to preprare for json \n payload = msg.tolist()\n\ \n # create json message to send to MQTT broker\n msg_str = json.dumps({\ \ \n \"payload\" : payload,\n \"time\": datetime.strftime(datetime.utcnow(),\ \ \"%y-%m-%dT%H:%M:%SZ\")\n })\n \n if self.debug:\n \ \ gr.log.info(\"[mqtt publish] Message to publish: \" + msg_str)\n \ \ \n #print(\"[mqtt publish]\" + msg_str)\n self.client.publish(topic=self.topic,\ \ payload=msg_str, qos=self.qos)\n" affinity: '' alias: '' comment: '' debug: debug hostname: mqtt_broker_ip maxoutbuf: '0' minoutbuf: '0' port: '1883' qos: mqtt_qos region_name: mqtt_broker_credentials_region secret_name: mqtt_broker_credentials_name topic: mqtt_downlink_topic states: _io_cache: '(''mqtt publish'', ''mqtt_pub'', [(''hostname'', "''172.31.1.87''"), (''port'', ''1883''), (''topic'', "''downlink''"), (''qos'', ''1''), (''secret_name'', "''MqttCredentials''"), (''region_name'', "''eu-west-1''"), (''debug'', ''False'')], [(''pdu_in'', ''message'', 1)], [], ''\n docstring for block mqtt_pub\n \n make(str hostname, int port, str topic, int qos, str secret_name, str region_name, bool debug)\n >mqtt subscribe \n Publishes to topic on port with specified qos. \n Pulls credentials from secret_name/ region_name. \n debug enables logging of messages before publication on topic. \n Args:\n hostname:\n port:\n topic:\n qos:\n secret_name:\n region_name:\n debug: \n '', [''debug'', ''port'', ''qos'', ''region_name'', ''secret_name'', ''topic''])' bus_sink: false bus_source: false bus_structure: null coordinate: [864, 196.0] rotation: 0 state: enabled - name: mqtt_qos id: parameter parameters: alias: '' comment: '' hide: none label: mqtt_qos short_id: '' type: intx value: '1' states: bus_sink: false bus_source: false bus_structure: null coordinate: [720, 20.0] rotation: 0 state: true - name: mqtt_sub_0 id: epy_block parameters: _source_code: "\nimport paho.mqtt.client as mqtt \nimport boto3\nimport base64\n\ from botocore.exceptions import ClientError\nimport logging\n\nimport pmt\n\ import json\nfrom datetime import datetime\nfrom gnuradio import gr\nimport\ \ numpy as np\n\n\ndef get_secret(secret_name, region_name):\n\n # Create\ \ a Secrets Manager client\n session = boto3.session.Session()\n client\ \ = session.client(service_name='secretsmanager', region_name=region_name)\n\ \n try:\n get_secret_value_response = client.get_secret_value(\n \ \ SecretId=secret_name\n )\n except ClientError as e:\n\ \ if e.response['Error']['Code'] == 'DecryptionFailureException':\n \ \ gr.log.info(\"[mqtt subscribe] Secrets Manager can't decrypt the\ \ protected secret text using the provided KMS key\")\n raise e\n\ \ elif e.response['Error']['Code'] == 'InternalServiceErrorException':\n\ \ gr.log.info(\"[mqtt subscribe] An error occurred on the server\ \ side.\")\n raise e\n elif e.response['Error']['Code'] ==\ \ 'InvalidParameterException':\n gr.log.info(\"[mqtt subscribe] You\ \ provided an invalid value for a parameter.\")\n raise e\n \ \ elif e.response['Error']['Code'] == 'InvalidRequestException':\n \ \ gr.log.info(\"[mqtt subscribe] You provided a parameter value that is\ \ not valid for the current state of the resource.\")\n raise e\n\ \ elif e.response['Error']['Code'] == 'ResourceNotFoundException':\n\ \ gr.log.info(\"[mqtt subscribe] We can't find the resource that\ \ you asked for.\")\n raise e\n else:\n # Decrypts secret\ \ using the associated KMS CMK.\n # Depending on whether the secret is\ \ a string or binary, one of these fields will be populated.\n if 'SecretString'\ \ in get_secret_value_response:\n secret = get_secret_value_response['SecretString']\n\ \ return secret\n else:\n decoded_binary_secret\ \ = base64.b64decode(get_secret_value_response['SecretBinary'])\n \ \ return decoded_binary_secret\n\n\n\nclass mqtt_sub(gr.basic_block):\n \ \ \"\"\"\n docstring for block mqtt_sbu\n \n make(str hostname, int\ \ port, str topic, int qos, str secret_name, str region_name, bool debug)\n\ \ >mqtt subscribe \n Subscribes to topic on port with specified qos. \n\ \ Pulls credentials from secret_name/ region_name. \n debug enables logging\ \ of received messages on topic. \n Args:\n hostname:\n port:\n\ \ topic:\n qos:\n secret_name:\n region_name:\n debug:\ \ \n \n \"\"\"\n def __init__(self, hostname=\"172.31.1.87\", port=1883,\ \ topic=\"uplink\", qos=1, secret_name=\"MqttCredentials\", region_name=\"eu-west-1\"\ , debug=False):\n gr.sync_block.__init__(self, name=\"mqtt subscribe\"\ , in_sig=None, out_sig=None)\n self.topic = topic\n self.secret_name\ \ = secret_name\n self.region_name = region_name\n self.qos =\ \ qos\n self.port = port\n self.debug = debug\n \n \ \ #logging.basicConfig(level=logging.DEBUG) \n #self.logger = logging.getLogger(__name__)\ \ \n \n self.client = mqtt.Client()\n gr.log.info(\"[mqtt\ \ subscribe] Getting broker credentials from AWS Secrets Manager.\")\n \ \ secret = get_secret(secret_name, region_name)\n username = secret.split(\"\ ,\")[0].split(\":\")[1].strip('\"')\n password = secret.split(\",\")[1].split(\"\ :\")[1].strip(\"}\").strip('\"')\n self.client.username_pw_set(username,\ \ password)\n gr.log.info(\"[mqtt subscribe] Connecting to hostname:\ \ \" + hostname + \" on port \" + str(port))\n self.client.connect_async(hostname,\ \ port)\n self.client.on_connect = self.on_connect\n self.client.on_disconnect\ \ = self.on_disconnect\n self.client.on_message=self.on_message\n \ \ self.client.loop_start()\n\n #Register message ports \n \ \ self.message_port_register_out(pmt.intern('pdu_out'))\n \n \ \ \n\n def on_message(self, client, userdata, message):\n # decode\ \ payload data\n payload_dict = json.loads(message.payload.decode(\"\ utf-8\"))\n\n # make np array out of payload object\n payload\ \ = np.array(payload_dict[\"payload\"])\n \n # transform np array\ \ to unsigned 8 bit numbers \n payload = payload.astype(np.uint8)\n\n\ \ if self.debug:\n gr.log.info(\"[mqtt subscribe] Received\ \ payload: \" + str(payload))\n #print(\"[mqtt subscribe] \" , payload)\n\ \ # tranform np array to GNU Radio polymorphic data type\n pmt_payload\ \ = pmt.to_pmt(payload)\n \n # send PDU on pdu_out port. Insert\ \ empty dict for PDU metadata\n self.message_port_pub(pmt.intern(\"pdu_out\"\ ), pmt.cons(pmt.make_dict(), pmt_payload))\n\n\n def on_connect(self, client,\ \ userdata, flags, rc): \n if str(rc)=='0':\n gr.log.info(\"\ [mqtt subscribe] Connection successful\")\n\n # Subscribing in on_connect()\ \ means that if we lose the connection and \n # reconnect the subscriptions\ \ will be renewed\n gr.log.info(\"[mqtt subscribe] Subscribing to\ \ topic: \" + self.topic)\n client.subscribe(topic=self.topic, qos=self.qos)\n\ \ if str(rc)=='1':\n gr.log.info(\"[mqtt subscribe] Connection\ \ refused - incorrect protocol version\")\n if str(rc)=='2':\n \ \ gr.log.info(\"[mqtt subscribe] Connection refused - invalid client indentifier\"\ ) \n if str(rc)=='3':\n gr.log.info(\"[mqtt subscribe] Connection\ \ refused - server unavailable\")\n if str(rc)=='4':\n gr.log.info(\"\ [mqtt subscribe] Connection refused - bad username or password\")\n if\ \ str(rc)=='5':\n gr.log.info(\"[mqtt subscribe] Connection refused\ \ - not authorised\")\n\n def on_disconnect(self, client, userdata, rc):\n\ \ if rc !=0:\n gr.log.info(\"[mqtt subscribe] Unexpected disconnect.\ \ Attempting to reconnect.\")\n\n\n \n" affinity: '' alias: '' comment: '' debug: debug hostname: mqtt_broker_ip maxoutbuf: '0' minoutbuf: '0' port: '1883' qos: mqtt_qos region_name: mqtt_broker_credentials_region secret_name: mqtt_broker_credentials_name topic: mqtt_downlink_topic states: _io_cache: '(''mqtt subscribe'', ''mqtt_sub'', [(''hostname'', "''172.31.1.87''"), (''port'', ''1883''), (''topic'', "''uplink''"), (''qos'', ''1''), (''secret_name'', "''MqttCredentials''"), (''region_name'', "''eu-west-1''"), (''debug'', ''False'')], [], [(''pdu_out'', ''message'', 1)], ''\n docstring for block mqtt_sbu\n \n make(str hostname, int port, str topic, int qos, str secret_name, str region_name, bool debug)\n >mqtt subscribe \n Subscribes to topic on port with specified qos. \n Pulls credentials from secret_name/ region_name. \n debug enables logging of received messages on topic. \n Args:\n hostname:\n port:\n topic:\n qos:\n secret_name:\n region_name:\n debug: \n \n '', [''debug'', ''port'', ''qos'', ''region_name'', ''secret_name'', ''topic''])' bus_sink: false bus_source: false bus_structure: null coordinate: [696, 380.0] rotation: 180 state: enabled - name: mqtt_uplink_topic id: parameter parameters: alias: '' comment: '' hide: none label: mqtt_uplink_topic short_id: '' type: str value: uplink states: bus_sink: false bus_source: false bus_structure: null coordinate: [560, 20.0] rotation: 0 state: true connections: - [blocks_message_strobe_0, strobe, blocks_random_pdu_0, generate] - [blocks_random_pdu_0, pdus, mqtt_pub_0, pdu_in] - [mqtt_sub_0, pdu_out, blocks_message_debug_0, print_pdu] metadata: file_format: 1