# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0. import argparse import time import uuid import json import os import threading import logging from concurrent.futures import Future from awscrt import io, mqtt, auth, http from awscrt.io import LogLevel from awscrt.mqtt import Connection, Client, QoS from awsiot.greengrass_discovery import DiscoveryClient, DiscoverResponse from awsiot import mqtt_connection_builder from .utils import readsensors, command_handler, get_ip, run_on_tap with open(os.path.join(os.path.dirname(__file__), './utils/config.json'), "r") as f: config = json.load(f) ''' This module uses the Message Broker for AWS IoT to send and receive messages through an MQTT connection. On startup, the device connects to the server, subscribes to a topic, and begins publishing messages to that topic. The device should receive those same messages back from the message broker, since it is subscribed to that same topic. After startup the device will call the read_sensors() function every five seconds, and send the sensor reading as an mqtt message to the topic variable defined below. The device will also listen for messages from the command_topic and when a command is received wil call command_handler() withe the payload. This composes the control loop for sending and receiving iot messages. - To change sensor outputs modify readsensors.py - To change command handling modify command_handler.py ''' logging.basicConfig(filename='/etc/energykit-embedded/logs/pubsub.log', encoding='utf-8', level=logging.DEBUG, format="%(levelname)s | %(asctime)s | %(message)s") port = 8883 certificate_path = config["certificate_path"] private_key_path = config["private_key_path"] root_ca_path = config["root_ca_path"] thing_name = config["thing_name"] topic = config["read_topic"] command_topic = config["command_topic"] count = 0 use_websocket= False proxy_host = False proxy_port = False region = config["region"] verbosity = 'Warn' mode = 'both' received_count = 0 received_all_event = threading.Event() # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0. allowed_actions = ['both', 'publish', 'subscribe'] io.init_logging(getattr(LogLevel, verbosity), 'stderr') event_loop_group = io.EventLoopGroup(1) host_resolver = io.DefaultHostResolver(event_loop_group) client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver) tls_options = io.TlsContextOptions.create_client_with_mtls_from_path( certificate_path, private_key_path ) if root_ca_path: tls_options.override_default_trust_store_from_path(None, root_ca_path) tls_context = io.ClientTlsContext(tls_options) socket_options = io.SocketOptions() logging.info('Performing greengrass discovery...') discovery_client = DiscoveryClient(client_bootstrap, socket_options, tls_context, region) resp_future = discovery_client.discover(thing_name) discover_response = resp_future.result() logging.info(discover_response) def on_connection_interupted(connection, error, **kwargs): logging.info('connection interrupted with error {}'.format(error)) # Callback when an interrupted connection is re-established. def on_connection_resumed(connection, return_code, session_present, **kwargs): logging.info("Connection resumed. return_code: {} session_present: {}".format(return_code, session_present)) if return_code == mqtt.ConnectReturnCode.ACCEPTED and not session_present: logging.info("Session did not persist. Resubscribing to existing topics...") resubscribe_future, _ = connection.resubscribe_existing_topics() # Cannot synchronously wait for resubscribe result because we're on the connection's event-loop thread, # evaluate result with a callback instead. resubscribe_future.add_done_callback(on_resubscribe_complete) def on_resubscribe_complete(resubscribe_future): resubscribe_results = resubscribe_future.result() logging.info("Resubscribe results: {}".format(resubscribe_results)) for topic, qos in resubscribe_results['topics']: if qos is None: sys.exit("Server rejected resubscribe to topic: {}".format(topic)) def try_iot_endpoints(): for gg_group in discover_response.gg_groups: logging.info(gg_group) for gg_core in gg_group.cores: for connectivity_info in gg_core.connectivity: try: logging.info('Trying core {} at host {} port {}'.format(gg_core.thing_arn, connectivity_info.host_address, connectivity_info.port)) mqtt_connection = mqtt_connection_builder.mtls_from_path( endpoint=connectivity_info.host_address, port=connectivity_info.port, cert_filepath=certificate_path, pri_key_filepath=private_key_path, client_bootstrap=client_bootstrap, ca_bytes=gg_group.certificate_authorities[0].encode('utf-8'), on_connection_interrupted=on_connection_interupted, on_connection_resumed=on_connection_resumed, client_id=thing_name, clean_session=False, keep_alive_secs=30) connect_future = mqtt_connection.connect() connect_future.result() logging.info('Connected!') return mqtt_connection except Exception as e: logging.info('Connection failed with exception {}'.format(e)) continue exit('All connection attempts failed') mqtt_connection = try_iot_endpoints() if mode == 'both' or mode == 'subscribe': def on_publish(topic, payload, dup, qos, retain, **kwargs): logging.info('Publish received on topic {}'.format(topic)) logging.info(payload) def on_message_received(topic, payload, **kwargs): logging.info("Received message from topic '{}': {}".format(topic, payload)) payload = json.loads(payload) handle_command = threading.Thread(target=command_handler.handleCommand(payload), daemon=True) logging.info(f"Handled incoming command with payload {payload}") handle_command.start() handle_command.join() global received_count received_count += 1 logging.info(f"Received messages:{received_count}") # Subscribe logging.info("Subscribing to topic {}...".format(command_topic)) subscribe_future, packet_id = mqtt_connection.subscribe( topic=command_topic, qos=mqtt.QoS.AT_LEAST_ONCE, callback=on_message_received) subscribe_result = subscribe_future.result() logging.info("Subscribed with {}".format(str(subscribe_result['qos']))) def pubsub(): publish_count = 0 logging.info(f"Publishing to topic {topic}") print(f"Publishing to topic {topic}") while True: if mode == 'both' or mode == 'publish': message = readsensors.read_sensors(0.5) # TO DO save this in memory so that it can be accessed dynamically messageJson = json.dumps(message) pub_future, _ = mqtt_connection.publish(topic, messageJson, QoS.AT_MOST_ONCE) pub_future.result() print("############################################") print(message) print("############################################") print("############################################") publish_count += 1 time.sleep(5) def pubsub_telemetry(): telemetry = threading.Thread(target=pubsub) telemetry.start()