# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0. from awsiot import mqtt5_client_builder from awscrt import mqtt5, http import threading from concurrent.futures import Future import time import json from utils.command_line_utils import CommandLineUtils TIMEOUT = 100 topic_filter = "test/topic" # cmdData is the arguments/input from the command line placed into a single struct for # use in this sample. This handles all of the command line parsing, validating, etc. # See the Utils/CommandLineUtils for more information. cmdData = CommandLineUtils.parse_sample_input_mqtt5_pubsub() received_count = 0 received_all_event = threading.Event() future_stopped = Future() future_connection_success = Future() # Callback when any publish is received def on_publish_received(publish_packet_data): publish_packet = publish_packet_data.publish_packet assert isinstance(publish_packet, mqtt5.PublishPacket) print("Received message from topic'{}':{}".format(publish_packet.topic, publish_packet.payload)) global received_count received_count += 1 if received_count == cmdData.input_count: received_all_event.set() # Callback for the lifecycle event Stopped def on_lifecycle_stopped(lifecycle_stopped_data: mqtt5.LifecycleStoppedData): print("Lifecycle Stopped") global future_stopped future_stopped.set_result(lifecycle_stopped_data) # Callback for the lifecycle event Connection Success def on_lifecycle_connection_success(lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData): print("Lifecycle Connection Success") global future_connection_success future_connection_success.set_result(lifecycle_connect_success_data) # Callback for the lifecycle event Connection Failure def on_lifecycle_connection_failure(lifecycle_connection_failure: mqtt5.LifecycleConnectFailureData): print("Lifecycle Connection Failure") print("Connection failed with exception:{}".format(lifecycle_connection_failure.exception)) if __name__ == '__main__': print("\nStarting MQTT5 PubSub Sample\n") message_count = cmdData.input_count message_topic = cmdData.input_topic message_string = cmdData.input_message # Create the proxy options if the data is present in cmdData proxy_options = None if cmdData.input_proxy_host is not None and cmdData.input_proxy_port != 0: proxy_options = http.HttpProxyOptions( host_name=cmdData.input_proxy_host, port=cmdData.input_proxy_port) # Create MQTT5 client client = mqtt5_client_builder.mtls_from_path( endpoint=cmdData.input_endpoint, port=cmdData.input_port, cert_filepath=cmdData.input_cert, pri_key_filepath=cmdData.input_key, ca_filepath=cmdData.input_ca, http_proxy_options=proxy_options, on_publish_received=on_publish_received, on_lifecycle_stopped=on_lifecycle_stopped, on_lifecycle_connection_success=on_lifecycle_connection_success, on_lifecycle_connection_failure=on_lifecycle_connection_failure, client_id=cmdData.input_clientId) print("MQTT5 Client Created") if not cmdData.input_is_ci: print(f"Connecting to {cmdData.input_endpoint} with client ID '{cmdData.input_clientId}'...") else: print("Connecting to endpoint with client ID") client.start() lifecycle_connect_success_data = future_connection_success.result(TIMEOUT) connack_packet = lifecycle_connect_success_data.connack_packet negotiated_settings = lifecycle_connect_success_data.negotiated_settings if not cmdData.input_is_ci: print( f"Connected to endpoint:'{cmdData.input_endpoint}' with Client ID:'{cmdData.input_clientId}' with reason_code:{repr(connack_packet.reason_code)}") # Subscribe print("Subscribing to topic '{}'...".format(message_topic)) subscribe_future = client.subscribe(subscribe_packet=mqtt5.SubscribePacket( subscriptions=[mqtt5.Subscription( topic_filter=message_topic, qos=mqtt5.QoS.AT_LEAST_ONCE)] )) suback = subscribe_future.result(TIMEOUT) print("Subscribed with {}".format(suback.reason_codes)) # Publish message to server desired number of times. # This step is skipped if message is blank. # This step loops forever if count was set to 0. if message_string: if message_count == 0: print("Sending messages until program killed") else: print("Sending {} message(s)".format(message_count)) publish_count = 1 while (publish_count <= message_count) or (message_count == 0): message = "{} [{}]".format(message_string, publish_count) print("Publishing message to topic '{}': {}".format(message_topic, message)) publish_future = client.publish(mqtt5.PublishPacket( topic=message_topic, payload=json.dumps(message_string), qos=mqtt5.QoS.AT_LEAST_ONCE )) publish_completion_data = publish_future.result(TIMEOUT) print("PubAck received with {}".format(repr(publish_completion_data.puback.reason_code))) time.sleep(1) publish_count += 1 received_all_event.wait(TIMEOUT) print("{} message(s) received.".format(received_count)) # Unsubscribe print("Unsubscribing from topic '{}'".format(message_topic)) unsubscribe_future = client.unsubscribe(unsubscribe_packet=mqtt5.UnsubscribePacket( topic_filters=[message_topic])) unsuback = unsubscribe_future.result(TIMEOUT) print("Unsubscribed with {}".format(unsuback.reason_codes)) print("Stopping Client") client.stop() future_stopped.result(TIMEOUT) print("Client Stopped!")