# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0. from time import sleep from awscrt import mqtt, http from awsiot import iotshadow, mqtt_connection_builder from concurrent.futures import Future import sys import threading import traceback from uuid import uuid4 from utils.command_line_utils import CommandLineUtils # - Overview - # This sample uses the AWS IoT Device Shadow Service to keep a property in # sync between device and server. Imagine a light whose color may be changed # through an app, or set by a local user. # # - Instructions - # Once connected, type a value in the terminal and press Enter to update # the property's "reported" value. The sample also responds when the "desired" # value changes on the server. To observe this, edit the Shadow document in # the AWS Console and set a new "desired" value. # # - Detail - # On startup, the sample requests the shadow document to learn the property's # initial state. The sample also subscribes to "delta" events from the server, # which are sent when a property's "desired" value differs from its "reported" # value. When the sample learns of a new desired value, that value is changed # on the device and an update is sent to the server with the new "reported" # value. # cmdData is the arguments/input from the command line placed into a single struct for # use in this sample. This handles all of the command line parsing, validating, etc. # See the Utils/CommandLineUtils for more information. cmdData = CommandLineUtils.parse_sample_input_shadow() # Using globals to simplify sample code is_sample_done = threading.Event() mqtt_connection = None shadow_thing_name = cmdData.input_thing_name shadow_property = cmdData.input_shadow_property SHADOW_VALUE_DEFAULT = "off" class LockedData: def __init__(self): self.lock = threading.Lock() self.shadow_value = None self.disconnect_called = False self.request_tokens = set() locked_data = LockedData() # Function for gracefully quitting this sample def exit(msg_or_exception): if isinstance(msg_or_exception, Exception): print("Exiting sample due to exception.") traceback.print_exception(msg_or_exception.__class__, msg_or_exception, sys.exc_info()[2]) else: print("Exiting sample:", msg_or_exception) with locked_data.lock: if not locked_data.disconnect_called: print("Disconnecting...") locked_data.disconnect_called = True future = mqtt_connection.disconnect() future.add_done_callback(on_disconnected) def on_disconnected(disconnect_future): # type: (Future) -> None print("Disconnected.") # Signal that sample is finished is_sample_done.set() def on_get_shadow_accepted(response): # type: (iotshadow.GetShadowResponse) -> None try: with locked_data.lock: # check that this is a response to a request from this session try: locked_data.request_tokens.remove(response.client_token) except KeyError: print("Ignoring get_shadow_accepted message due to unexpected token.") return print("Finished getting initial shadow state.") if locked_data.shadow_value is not None: print(" Ignoring initial query because a delta event has already been received.") return if response.state: if response.state.delta: value = response.state.delta.get(shadow_property) if value: print(" Shadow contains delta value '{}'.".format(value)) change_shadow_value(value) return if response.state.reported: value = response.state.reported.get(shadow_property) if value: print(" Shadow contains reported value '{}'.".format(value)) set_local_value_due_to_initial_query(response.state.reported[shadow_property]) return print(" Shadow document lacks '{}' property. Setting defaults...".format(shadow_property)) change_shadow_value(SHADOW_VALUE_DEFAULT) return except Exception as e: exit(e) def on_get_shadow_rejected(error): # type: (iotshadow.ErrorResponse) -> None try: # check that this is a response to a request from this session with locked_data.lock: try: locked_data.request_tokens.remove(error.client_token) except KeyError: print("Ignoring get_shadow_rejected message due to unexpected token.") return if error.code == 404: print("Thing has no shadow document. Creating with defaults...") change_shadow_value(SHADOW_VALUE_DEFAULT) else: exit("Get request was rejected. code:{} message:'{}'".format( error.code, error.message)) except Exception as e: exit(e) def on_shadow_delta_updated(delta): # type: (iotshadow.ShadowDeltaUpdatedEvent) -> None try: print("Received shadow delta event.") if delta.state and (shadow_property in delta.state): value = delta.state[shadow_property] if value is None: print(" Delta reports that '{}' was deleted. Resetting defaults...".format(shadow_property)) change_shadow_value(SHADOW_VALUE_DEFAULT) return else: print(" Delta reports that desired value is '{}'. Changing local value...".format(value)) if (delta.client_token is not None): print(" ClientToken is: " + delta.client_token) change_shadow_value(value) else: print(" Delta did not report a change in '{}'".format(shadow_property)) except Exception as e: exit(e) def on_publish_update_shadow(future): # type: (Future) -> None try: future.result() print("Update request published.") except Exception as e: print("Failed to publish update request.") exit(e) def on_update_shadow_accepted(response): # type: (iotshadow.UpdateShadowResponse) -> None try: # check that this is a response to a request from this session with locked_data.lock: try: locked_data.request_tokens.remove(response.client_token) except KeyError: print("Ignoring update_shadow_accepted message due to unexpected token.") return try: if response.state.reported is not None: if shadow_property in response.state.reported: print("Finished updating reported shadow value to '{}'.".format( response.state.reported[shadow_property])) # type: ignore else: print("Could not find shadow property with name: '{}'.".format(shadow_property)) # type: ignore else: print("Shadow states cleared.") # when the shadow states are cleared, reported and desired are set to None print("Enter desired value: ") # remind user they can input new values except BaseException: exit("Updated shadow is missing the target property") except Exception as e: exit(e) def on_update_shadow_rejected(error): # type: (iotshadow.ErrorResponse) -> None try: # check that this is a response to a request from this session with locked_data.lock: try: locked_data.request_tokens.remove(error.client_token) except KeyError: print("Ignoring update_shadow_rejected message due to unexpected token.") return exit("Update request was rejected. code:{} message:'{}'".format( error.code, error.message)) except Exception as e: exit(e) def set_local_value_due_to_initial_query(reported_value): with locked_data.lock: locked_data.shadow_value = reported_value print("Enter desired value: ") # remind user they can input new values def change_shadow_value(value): with locked_data.lock: if locked_data.shadow_value == value: print("Local value is already '{}'.".format(value)) print("Enter desired value: ") # remind user they can input new values return print("Changed local shadow value to '{}'.".format(value)) locked_data.shadow_value = value print("Updating reported shadow value to '{}'...".format(value)) # use a unique token so we can correlate this "request" message to # any "response" messages received on the /accepted and /rejected topics token = str(uuid4()) # if the value is "clear shadow" then send a UpdateShadowRequest with None # for both reported and desired to clear the shadow document completely. if value == "clear_shadow": tmp_state = iotshadow.ShadowState( reported=None, desired=None, reported_is_nullable=True, desired_is_nullable=True) request = iotshadow.UpdateShadowRequest( thing_name=shadow_thing_name, state=tmp_state, client_token=token, ) # Otherwise, send a normal update request else: # if the value is "none" then set it to a Python none object to # clear the individual shadow property if value == "none": value = None request = iotshadow.UpdateShadowRequest( thing_name=shadow_thing_name, state=iotshadow.ShadowState( reported={shadow_property: value}, desired={shadow_property: value}, ), client_token=token, ) future = shadow_client.publish_update_shadow(request, mqtt.QoS.AT_LEAST_ONCE) locked_data.request_tokens.add(token) future.add_done_callback(on_publish_update_shadow) def user_input_thread_fn(): # If we are not in CI, then take terminal input if not cmdData.input_is_ci: while True: try: # Read user input new_value = input() # If user wants to quit sample, then quit. # Otherwise change the shadow value. if new_value in ['exit', 'quit']: exit("User has quit") break else: change_shadow_value(new_value) except Exception as e: print("Exception on input thread.") exit(e) break # Otherwise, send shadow updates automatically else: try: messages_sent = 0 while messages_sent < 5: cli_input = "Shadow_Value_" + str(messages_sent) change_shadow_value(cli_input) sleep(1) messages_sent += 1 exit("CI has quit") except Exception as e: print("Exception on input thread (CI)") exit(e) if __name__ == '__main__': # Create the proxy options if the data is present in cmdData proxy_options = None if cmdData.input_proxy_host is not None and cmdData.input_proxy_port != 0: proxy_options = http.HttpProxyOptions( host_name=cmdData.input_proxy_host, port=cmdData.input_proxy_port) # Create a MQTT connection from the command line data mqtt_connection = mqtt_connection_builder.mtls_from_path( endpoint=cmdData.input_endpoint, port=cmdData.input_port, cert_filepath=cmdData.input_cert, pri_key_filepath=cmdData.input_key, ca_filepath=cmdData.input_ca, client_id=cmdData.input_clientId, clean_session=False, keep_alive_secs=30, http_proxy_options=proxy_options) if not cmdData.input_is_ci: print(f"Connecting to {cmdData.input_endpoint} with client ID '{cmdData.input_clientId}'...") else: print("Connecting to endpoint with client ID") connected_future = mqtt_connection.connect() shadow_client = iotshadow.IotShadowClient(mqtt_connection) # Wait for connection to be fully established. # Note that it's not necessary to wait, commands issued to the # mqtt_connection before its fully connected will simply be queued. # But this sample waits here so it's obvious when a connection # fails or succeeds. connected_future.result() print("Connected!") try: # Subscribe to necessary topics. # Note that is **is** important to wait for "accepted/rejected" subscriptions # to succeed before publishing the corresponding "request". print("Subscribing to Update responses...") update_accepted_subscribed_future, _ = shadow_client.subscribe_to_update_shadow_accepted( request=iotshadow.UpdateShadowSubscriptionRequest(thing_name=shadow_thing_name), qos=mqtt.QoS.AT_LEAST_ONCE, callback=on_update_shadow_accepted) update_rejected_subscribed_future, _ = shadow_client.subscribe_to_update_shadow_rejected( request=iotshadow.UpdateShadowSubscriptionRequest(thing_name=shadow_thing_name), qos=mqtt.QoS.AT_LEAST_ONCE, callback=on_update_shadow_rejected) # Wait for subscriptions to succeed update_accepted_subscribed_future.result() update_rejected_subscribed_future.result() print("Subscribing to Get responses...") get_accepted_subscribed_future, _ = shadow_client.subscribe_to_get_shadow_accepted( request=iotshadow.GetShadowSubscriptionRequest(thing_name=shadow_thing_name), qos=mqtt.QoS.AT_LEAST_ONCE, callback=on_get_shadow_accepted) get_rejected_subscribed_future, _ = shadow_client.subscribe_to_get_shadow_rejected( request=iotshadow.GetShadowSubscriptionRequest(thing_name=shadow_thing_name), qos=mqtt.QoS.AT_LEAST_ONCE, callback=on_get_shadow_rejected) # Wait for subscriptions to succeed get_accepted_subscribed_future.result() get_rejected_subscribed_future.result() print("Subscribing to Delta events...") delta_subscribed_future, _ = shadow_client.subscribe_to_shadow_delta_updated_events( request=iotshadow.ShadowDeltaUpdatedSubscriptionRequest(thing_name=shadow_thing_name), qos=mqtt.QoS.AT_LEAST_ONCE, callback=on_shadow_delta_updated) # Wait for subscription to succeed delta_subscribed_future.result() # The rest of the sample runs asynchronously. # Issue request for shadow's current state. # The response will be received by the on_get_accepted() callback print("Requesting current shadow state...") with locked_data.lock: # use a unique token so we can correlate this "request" message to # any "response" messages received on the /accepted and /rejected topics token = str(uuid4()) publish_get_future = shadow_client.publish_get_shadow( request=iotshadow.GetShadowRequest(thing_name=shadow_thing_name, client_token=token), qos=mqtt.QoS.AT_LEAST_ONCE) locked_data.request_tokens.add(token) # Ensure that publish succeeds publish_get_future.result() # Launch thread to handle user input. # A "daemon" thread won't prevent the program from shutting down. print("Launching thread to read user input...") user_input_thread = threading.Thread(target=user_input_thread_fn, name='user_input_thread') user_input_thread.daemon = True user_input_thread.start() except Exception as e: exit(e) # Wait for the sample to finish (user types 'quit', or an error occurs) is_sample_done.wait()