/* * AWS IoT Device SDK for Embedded C 202211.00 * Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Permission is hereby granted, free of charge, to any person obtaining a copy of * this software and associated documentation files (the "Software"), to deal in * the Software without restriction, including without limitation the rights to * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of * the Software, and to permit persons to whom the Software is furnished to do so, * subject to the following conditions: * * The above copyright notice and this permission notice shall be included in all * copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ /* * Demo for showing the use of MQTT APIs to establish an MQTT session, * subscribe to a topic, publish to a topic, receive incoming publishes, * unsubscribe from a topic and disconnect the MQTT session. * * The example is single threaded and uses statically allocated memory; * it uses QOS2 and therefore implements a retransmission mechanism * for Publish messages. Retransmission of publish messages are attempted * when a MQTT connection is established with a session that was already * present. All the outgoing publish messages waiting to receive PUBREC * are resend in this demo. In order to support retransmission all the outgoing * publishes are stored until a PUBREC is received. */ /* Standard includes. */ #include #include #include #include /* POSIX includes. */ #include /* Include Demo Config as the first non-system header. */ #include "demo_config.h" /* MQTT API headers. */ #include "core_mqtt.h" #include "core_mqtt_state.h" /* OpenSSL sockets transport implementation. */ #include "openssl_posix.h" /*Include backoff algorithm header for retry logic.*/ #include "backoff_algorithm.h" /* Clock for timer. */ #include "clock.h" /** * These configuration settings are required to run the basic TLS demo. * Throw compilation error if the below configs are not defined. */ #ifndef BROKER_ENDPOINT #error "Please define an MQTT broker endpoint, BROKER_ENDPOINT, in demo_config.h." #endif #ifndef ROOT_CA_CERT_PATH #error "Please define path to Root CA certificate of the MQTT broker, ROOT_CA_CERT_PATH, in demo_config.h." #endif #ifndef CLIENT_IDENTIFIER #error "Please define a unique CLIENT_IDENTIFIER." #endif /** * Provide default values for undefined configuration settings. */ #ifndef BROKER_PORT #define BROKER_PORT ( 8883 ) #endif #ifndef NETWORK_BUFFER_SIZE #define NETWORK_BUFFER_SIZE ( 1024U ) #endif /** * @brief Length of MQTT server host name. */ #define BROKER_ENDPOINT_LENGTH ( ( uint16_t ) ( sizeof( BROKER_ENDPOINT ) - 1 ) ) /** * @brief Length of client identifier. */ #define CLIENT_IDENTIFIER_LENGTH ( ( uint16_t ) ( sizeof( CLIENT_IDENTIFIER ) - 1 ) ) /** * @brief The maximum number of retries for connecting to server. */ #define CONNECTION_RETRY_MAX_ATTEMPTS ( 5U ) /** * @brief The maximum back-off delay (in milliseconds) for retrying connection to server. */ #define CONNECTION_RETRY_MAX_BACKOFF_DELAY_MS ( 5000U ) /** * @brief The base back-off delay (in milliseconds) to use for connection retry attempts. */ #define CONNECTION_RETRY_BACKOFF_BASE_MS ( 500U ) /** * @brief Timeout for receiving CONNACK packet in milli seconds. */ #define CONNACK_RECV_TIMEOUT_MS ( 1000U ) /** * @brief The topic to subscribe and publish to in the example. * * The topic name starts with the client identifier to ensure that each demo * interacts with a unique topic name. */ #define MQTT_EXAMPLE_TOPIC CLIENT_IDENTIFIER "/example/topic" /** * @brief Length of client MQTT topic. */ #define MQTT_EXAMPLE_TOPIC_LENGTH ( ( uint16_t ) ( sizeof( MQTT_EXAMPLE_TOPIC ) - 1 ) ) /** * @brief The MQTT message published in this example. */ #define MQTT_EXAMPLE_MESSAGE "Hello World!" /** * @brief The length of the MQTT message published in this example. */ #define MQTT_EXAMPLE_MESSAGE_LENGTH ( ( uint16_t ) ( sizeof( MQTT_EXAMPLE_MESSAGE ) - 1 ) ) /** * @brief Maximum number of outgoing publishes maintained in the application * until an ack is received from the broker. */ #define MAX_OUTGOING_PUBLISHES ( 5U ) /** * @brief Invalid packet identifier for the MQTT packets. Zero is always an * invalid packet identifier as per MQTT 3.1.1 spec. */ #define MQTT_PACKET_ID_INVALID ( ( uint16_t ) 0U ) /** * @brief Timeout for MQTT_ProcessLoop function in milliseconds. */ #define MQTT_PROCESS_LOOP_TIMEOUT_MS ( 500U ) /** * @brief The maximum time interval in seconds which is allowed to elapse * between two Control Packets. * * It is the responsibility of the Client to ensure that the interval between * Control Packets being sent does not exceed the this Keep Alive value. In the * absence of sending any other Control Packets, the Client MUST send a * PINGREQ Packet. */ #define MQTT_KEEP_ALIVE_INTERVAL_SECONDS ( 60U ) /** * @brief Delay between MQTT publishes in seconds. */ #define DELAY_BETWEEN_PUBLISHES_SECONDS ( 1U ) /** * @brief Number of PUBLISH messages sent per iteration. */ #define MQTT_PUBLISH_COUNT_PER_LOOP ( 5U ) /** * @brief Delay in seconds between two iterations of subscribePublishLoop(). */ #define MQTT_SUBPUB_LOOP_DELAY_SECONDS ( 5U ) /** * @brief Transport timeout in milliseconds for transport send and receive. */ #define TRANSPORT_SEND_RECV_TIMEOUT_MS ( 1000 ) /** * @brief The length of the outgoing publish records array used by the coreMQTT * library to track QoS > 0 packet ACKS for outgoing publishes. */ #define OUTGOING_PUBLISH_RECORD_LEN ( 10U ) /** * @brief The length of the incoming publish records array used by the coreMQTT * library to track QoS > 0 packet ACKS for incoming publishes. */ #define INCOMING_PUBLISH_RECORD_LEN ( 10U ) /*-----------------------------------------------------------*/ /** * @brief Structure to keep the MQTT publish packets until an ack is received * for QoS2 publishes. */ typedef struct PublishPackets { /** * @brief Packet identifier of the publish packet. */ uint16_t packetId; /** * @brief Publish info of the publish packet. */ MQTTPublishInfo_t pubInfo; } PublishPackets_t; /*-----------------------------------------------------------*/ /** * @brief Packet Identifier updated when an ACK packet is received. * * It is used to match an expected ACK for a transmitted packet. */ static uint16_t globalAckPacketIdentifier = 0U; /** * @brief Packet Identifier generated when Subscribe request was sent to the broker; * it is used to match received Subscribe ACK to the transmitted subscribe. */ static uint16_t globalSubscribePacketIdentifier = 0U; /** * @brief Packet Identifier generated when Unsubscribe request was sent to the broker; * it is used to match received Unsubscribe ACK to the transmitted unsubscribe * request. */ static uint16_t globalUnsubscribePacketIdentifier = 0U; /** * @brief Array to keep the outgoing publish messages. * These stored outgoing publish messages are kept until a successful ack * is received. */ static PublishPackets_t outgoingPublishPackets[ MAX_OUTGOING_PUBLISHES ] = { 0 }; /** * @brief Array to keep subscription topics. * Used to re-subscribe to topics that failed initial subscription attempts. */ static MQTTSubscribeInfo_t pGlobalSubscriptionList[ 1 ]; /** * @brief The network buffer must remain valid for the lifetime of the MQTT context. */ static uint8_t buffer[ NETWORK_BUFFER_SIZE ]; /** * @brief Status of latest Subscribe ACK; * it is updated every time the callback function processes a Subscribe ACK * and accounts for subscription to a single topic. */ static MQTTSubAckStatus_t globalSubAckStatus = MQTTSubAckFailure; /** * @brief Array to track the outgoing publish records for outgoing publishes * with QoS > 0. * * This is passed into #MQTT_InitStatefulQoS to allow for QoS > 0. * */ static MQTTPubAckInfo_t pOutgoingPublishRecords[ OUTGOING_PUBLISH_RECORD_LEN ]; /** * @brief Array to track the incoming publish records for incoming publishes * with QoS > 0. * * This is passed into #MQTT_InitStatefulQoS to allow for QoS > 0. * */ static MQTTPubAckInfo_t pIncomingPublishRecords[ INCOMING_PUBLISH_RECORD_LEN ]; /*-----------------------------------------------------------*/ /* Each compilation unit must define the NetworkContext struct. */ struct NetworkContext { OpensslParams_t * pParams; }; /*-----------------------------------------------------------*/ /** * @brief The random number generator to use for exponential backoff with * jitter retry logic. * * @return The generated random number. */ static uint32_t generateRandomNumber(); /** * @brief Connect to MQTT broker with reconnection retries. * * If connection fails, retry is attempted after a timeout. * Timeout value will exponentially increase until maximum * timeout value is reached or the number of attempts are exhausted. * * @param[out] pNetworkContext The output parameter to return the created network context. * @param[out] pMqttContext The output to return the created MQTT context. * @param[in,out] pClientSessionPresent Pointer to flag indicating if an * MQTT session is present in the client. * @param[out] pBrokerSessionPresent Session was already present in the broker or not. * Session present response is obtained from the CONNACK from broker. * * @return EXIT_FAILURE on failure; EXIT_SUCCESS on successful connection. */ static int connectToServerWithBackoffRetries( NetworkContext_t * pNetworkContext, MQTTContext_t * pMqttContext, bool * pClientSessionPresent, bool * pBrokerSessionPresent ); /** * @brief A function that uses the passed MQTT connection to * subscribe to a topic, publish to the same topic * MQTT_PUBLISH_COUNT_PER_LOOP number of times, and verify if it * receives the Publish message back. * * @param[in] pMqttContext MQTT context pointer. * * @return EXIT_FAILURE on failure; EXIT_SUCCESS on success. */ static int subscribePublishLoop( MQTTContext_t * pMqttContext ); /** * @brief The function to handle the incoming publishes. * * @param[in] pPublishInfo Pointer to publish info of the incoming publish. * @param[in] packetIdentifier Packet identifier of the incoming publish. */ static void handleIncomingPublish( MQTTPublishInfo_t * pPublishInfo, uint16_t packetIdentifier ); /** * @brief The application callback function for getting the incoming publish * and incoming acks reported from MQTT library. * * @param[in] pMqttContext MQTT context pointer. * @param[in] pPacketInfo Packet Info pointer for the incoming packet. * @param[in] pDeserializedInfo Deserialized information from the incoming packet. */ static void eventCallback( MQTTContext_t * pMqttContext, MQTTPacketInfo_t * pPacketInfo, MQTTDeserializedInfo_t * pDeserializedInfo ); /** * @brief Initializes the MQTT library. * * @param[in] pMqttContext MQTT context pointer. * @param[in] pNetworkContext The network context pointer. * * @return EXIT_SUCCESS if the MQTT library is initialized; * EXIT_FAILURE otherwise. */ static int initializeMqtt( MQTTContext_t * pMqttContext, NetworkContext_t * pNetworkContext ); /** * @brief Sends an MQTT CONNECT packet over the already connected TCP socket. * * @param[in] pMqttContext MQTT context pointer. * @param[in] createCleanSession Creates a new MQTT session if true. * If false, tries to establish the existing session if there was session * already present in broker. * @param[out] pSessionPresent Session was already present in the broker or not. * Session present response is obtained from the CONNACK from broker. * * @return EXIT_SUCCESS if an MQTT session is established; * EXIT_FAILURE otherwise. */ static int establishMqttSession( MQTTContext_t * pMqttContext, bool createCleanSession, bool * pSessionPresent ); /** * @brief Close an MQTT session by sending MQTT DISCONNECT. * * @param[in] pMqttContext MQTT context pointer. * * @return EXIT_SUCCESS if DISCONNECT was successfully sent; * EXIT_FAILURE otherwise. */ static int disconnectMqttSession( MQTTContext_t * pMqttContext ); /** * @brief Sends an MQTT SUBSCRIBE to subscribe to #MQTT_EXAMPLE_TOPIC * defined at the top of the file. * * @param[in] pMqttContext MQTT context pointer. * * @return EXIT_SUCCESS if SUBSCRIBE was successfully sent; * EXIT_FAILURE otherwise. */ static int subscribeToTopic( MQTTContext_t * pMqttContext ); /** * @brief Sends an MQTT UNSUBSCRIBE to unsubscribe from * #MQTT_EXAMPLE_TOPIC defined at the top of the file. * * @param[in] pMqttContext MQTT context pointer. * * @return EXIT_SUCCESS if UNSUBSCRIBE was successfully sent; * EXIT_FAILURE otherwise. */ static int unsubscribeFromTopic( MQTTContext_t * pMqttContext ); /** * @brief Sends an MQTT PUBLISH to #MQTT_EXAMPLE_TOPIC defined at * the top of the file. * * @param[in] pMqttContext MQTT context pointer. * * @return EXIT_SUCCESS if PUBLISH was successfully sent; * EXIT_FAILURE otherwise. */ static int publishToTopic( MQTTContext_t * pMqttContext ); /** * @brief Function to get the free index at which an outgoing publish * can be stored. * * @param[out] pIndex The output parameter to return the index at which an * outgoing publish message can be stored. * * @return EXIT_FAILURE if no more publishes can be stored; * EXIT_SUCCESS if an index to store the next outgoing publish is obtained. */ static int getNextFreeIndexForOutgoingPublishes( uint8_t * pIndex ); /** * @brief Function to clean up an outgoing publish at given index from the * #outgoingPublishPackets array. * * @param[in] index The index at which a publish message has to be cleaned up. */ static void cleanupOutgoingPublishAt( uint8_t index ); /** * @brief Function to clean up all the outgoing publishes maintained in the * array. */ static void cleanupOutgoingPublishes( void ); /** * @brief Function to clean up the publish packet with the given packet id. * * @param[in] packetId Packet identifier of the packet to be cleaned up from * the array. */ static void cleanupOutgoingPublishWithPacketID( uint16_t packetId ); /** * @brief Function to resend the publishes if a session is re-established with * the broker. This function handles the resending of the QoS2 publish packets, * which are maintained locally. * * @param[in] pMqttContext MQTT context pointer. */ static int handlePublishResend( MQTTContext_t * pMqttContext ); /** * @brief Function to update variable globalSubAckStatus with status * information from Subscribe ACK. Called by eventCallback after processing * incoming subscribe echo. * * @param[in] Server response to the subscription request. */ static void updateSubAckStatus( MQTTPacketInfo_t * pPacketInfo ); /** * @brief Function to handle resubscription of topics on Subscribe * ACK failure. Uses an exponential backoff strategy with jitter. * * @param[in] pMqttContext MQTT context pointer. */ static int handleResubscribe( MQTTContext_t * pMqttContext ); /** * @brief Wait for an expected ACK packet to be received. * * This function handles waiting for an expected ACK packet by calling * #MQTT_ProcessLoop and waiting for #mqttCallback to set the global ACK * packet identifier to the expected ACK packet identifier. * * @param[in] pMqttContext MQTT context pointer. * @param[in] usPacketIdentifier Packet identifier for expected ACK packet. * @param[in] ulTimeout Maximum duration to wait for expected ACK packet. * * @return EXIT_SUCCESS if the expected ACK packet was received, EXIT_FAILURE * otherwise. */ static int waitForPacketAck( MQTTContext_t * pMqttContext, uint16_t usPacketIdentifier, uint32_t ulTimeout ); /** * @brief Call #MQTT_ProcessLoop in a loop for the duration of a timeout or * #MQTT_ProcessLoop returns a failure. * * @param[in] pMqttContext MQTT context pointer. * @param[in] ulTimeoutMs Duration to call #MQTT_ProcessLoop for. * * @return Returns the return value of the last call to #MQTT_ProcessLoop. */ static MQTTStatus_t processLoopWithTimeout( MQTTContext_t * pMqttContext, uint32_t ulTimeoutMs ); /*-----------------------------------------------------------*/ static uint32_t generateRandomNumber() { return( rand() ); } /*-----------------------------------------------------------*/ static int connectToServerWithBackoffRetries( NetworkContext_t * pNetworkContext, MQTTContext_t * pMqttContext, bool * pClientSessionPresent, bool * pBrokerSessionPresent ) { int returnStatus = EXIT_FAILURE; BackoffAlgorithmStatus_t backoffAlgStatus = BackoffAlgorithmSuccess; OpensslStatus_t opensslStatus = OPENSSL_SUCCESS; BackoffAlgorithmContext_t reconnectParams; ServerInfo_t serverInfo; OpensslCredentials_t opensslCredentials; uint16_t nextRetryBackOff; bool createCleanSession; /* Initialize information to connect to the MQTT broker. */ serverInfo.pHostName = BROKER_ENDPOINT; serverInfo.hostNameLength = BROKER_ENDPOINT_LENGTH; serverInfo.port = BROKER_PORT; /* Initialize credentials for establishing TLS session. */ memset( &opensslCredentials, 0, sizeof( OpensslCredentials_t ) ); opensslCredentials.pRootCaPath = ROOT_CA_CERT_PATH; opensslCredentials.sniHostName = BROKER_ENDPOINT; /* Initialize reconnect attempts and interval */ BackoffAlgorithm_InitializeParams( &reconnectParams, CONNECTION_RETRY_BACKOFF_BASE_MS, CONNECTION_RETRY_MAX_BACKOFF_DELAY_MS, CONNECTION_RETRY_MAX_ATTEMPTS ); /* Attempt to connect to MQTT broker. If connection fails, retry after * a timeout. Timeout value will exponentially increase till maximum * attempts are reached. */ do { /* Establish a TLS session with the MQTT broker. This example connects * to the MQTT broker as specified in BROKER_ENDPOINT and BROKER_PORT at * the top of this file. */ LogInfo( ( "Establishing a TLS session to %.*s:%d.", BROKER_ENDPOINT_LENGTH, BROKER_ENDPOINT, BROKER_PORT ) ); opensslStatus = Openssl_Connect( pNetworkContext, &serverInfo, &opensslCredentials, TRANSPORT_SEND_RECV_TIMEOUT_MS, TRANSPORT_SEND_RECV_TIMEOUT_MS ); if( opensslStatus == OPENSSL_SUCCESS ) { /* A clean MQTT session needs to be created, if there is no session saved * in this MQTT client. */ createCleanSession = ( *pClientSessionPresent == true ) ? false : true; /* Sends an MQTT Connect packet using the established TLS session, * then waits for connection acknowledgment (CONNACK) packet. */ returnStatus = establishMqttSession( pMqttContext, createCleanSession, pBrokerSessionPresent ); if( returnStatus == EXIT_FAILURE ) { /* End TLS session, then close TCP connection. */ ( void ) Openssl_Disconnect( pNetworkContext ); } } if( returnStatus == EXIT_FAILURE ) { /* Generate a random number and get back-off value (in milliseconds) for the next connection retry. */ backoffAlgStatus = BackoffAlgorithm_GetNextBackoff( &reconnectParams, generateRandomNumber(), &nextRetryBackOff ); if( backoffAlgStatus == BackoffAlgorithmRetriesExhausted ) { LogError( ( "Connection to the broker failed, all attempts exhausted." ) ); returnStatus = EXIT_FAILURE; } else if( backoffAlgStatus == BackoffAlgorithmSuccess ) { LogWarn( ( "Connection to the broker failed. Retrying connection " "after %hu ms backoff.", ( unsigned short ) nextRetryBackOff ) ); Clock_SleepMs( nextRetryBackOff ); } } } while( ( returnStatus == EXIT_FAILURE ) && ( backoffAlgStatus == BackoffAlgorithmSuccess ) ); return returnStatus; } /*-----------------------------------------------------------*/ static int getNextFreeIndexForOutgoingPublishes( uint8_t * pIndex ) { int returnStatus = EXIT_FAILURE; uint8_t index = 0; assert( outgoingPublishPackets != NULL ); assert( pIndex != NULL ); for( index = 0; index < MAX_OUTGOING_PUBLISHES; index++ ) { /* A free index is marked by invalid packet id. * Check if the the index has a free slot. */ if( outgoingPublishPackets[ index ].packetId == MQTT_PACKET_ID_INVALID ) { returnStatus = EXIT_SUCCESS; /* Copy the available index into the output param. */ *pIndex = index; break; } } /* Copy the available index into the output param. */ *pIndex = index; return returnStatus; } /*-----------------------------------------------------------*/ static void cleanupOutgoingPublishAt( uint8_t index ) { assert( outgoingPublishPackets != NULL ); assert( index < MAX_OUTGOING_PUBLISHES ); /* Clear the outgoing publish packet. */ ( void ) memset( &( outgoingPublishPackets[ index ] ), 0x00, sizeof( outgoingPublishPackets[ index ] ) ); } /*-----------------------------------------------------------*/ static void cleanupOutgoingPublishes( void ) { assert( outgoingPublishPackets != NULL ); /* Clean up all the outgoing publish packets. */ ( void ) memset( outgoingPublishPackets, 0x00, sizeof( outgoingPublishPackets ) ); } /*-----------------------------------------------------------*/ static void cleanupOutgoingPublishWithPacketID( uint16_t packetId ) { uint8_t index = 0; assert( outgoingPublishPackets != NULL ); assert( packetId != MQTT_PACKET_ID_INVALID ); /* Clean up all the saved outgoing publishes. */ for( ; index < MAX_OUTGOING_PUBLISHES; index++ ) { if( outgoingPublishPackets[ index ].packetId == packetId ) { cleanupOutgoingPublishAt( index ); LogInfo( ( "Cleaned up outgoing publish packet with packet id %u.\n\n", packetId ) ); break; } } } /*-----------------------------------------------------------*/ static int handlePublishResend( MQTTContext_t * pMqttContext ) { int returnStatus = EXIT_SUCCESS; MQTTStatus_t mqttStatus = MQTTSuccess; uint8_t index = 0U; MQTTStateCursor_t cursor = MQTT_STATE_CURSOR_INITIALIZER; uint16_t packetIdToResend = MQTT_PACKET_ID_INVALID; bool foundPacketId = false; assert( pMqttContext != NULL ); assert( outgoingPublishPackets != NULL ); /* MQTT_PublishToResend() provides a packet ID of the next PUBLISH packet * that should be resent. In accordance with the MQTT v3.1.1 spec, * MQTT_PublishToResend() preserves the ordering of when the original * PUBLISH packets were sent. The outgoingPublishPackets array is searched * through for the associated packet ID. If the application requires * increased efficiency in the look up of the packet ID, then a hashmap of * packetId key and PublishPacket_t values may be used instead. */ packetIdToResend = MQTT_PublishToResend( pMqttContext, &cursor ); while( packetIdToResend != MQTT_PACKET_ID_INVALID ) { foundPacketId = false; for( index = 0U; index < MAX_OUTGOING_PUBLISHES; index++ ) { if( outgoingPublishPackets[ index ].packetId == packetIdToResend ) { foundPacketId = true; outgoingPublishPackets[ index ].pubInfo.dup = true; LogInfo( ( "Sending duplicate PUBLISH with packet id %u.", outgoingPublishPackets[ index ].packetId ) ); mqttStatus = MQTT_Publish( pMqttContext, &outgoingPublishPackets[ index ].pubInfo, outgoingPublishPackets[ index ].packetId ); if( mqttStatus != MQTTSuccess ) { LogError( ( "Sending duplicate PUBLISH for packet id %u " " failed with status %s.", outgoingPublishPackets[ index ].packetId, MQTT_Status_strerror( mqttStatus ) ) ); returnStatus = EXIT_FAILURE; break; } else { LogInfo( ( "Sent duplicate PUBLISH successfully for packet id %u.\n\n", outgoingPublishPackets[ index ].packetId ) ); } } } if( foundPacketId == false ) { LogError( ( "Packet id %u requires resend, but was not found in " "outgoingPublishPackets.", packetIdToResend ) ); returnStatus = EXIT_FAILURE; break; } else { /* Get the next packetID to be resent. */ packetIdToResend = MQTT_PublishToResend( pMqttContext, &cursor ); } } return returnStatus; } /*-----------------------------------------------------------*/ static void handleIncomingPublish( MQTTPublishInfo_t * pPublishInfo, uint16_t packetIdentifier ) { assert( pPublishInfo != NULL ); ( void ) packetIdentifier; /* Process incoming Publish. */ LogInfo( ( "Incoming QOS : %d.", pPublishInfo->qos ) ); /* Verify the received publish is for the topic we have subscribed to. */ if( ( pPublishInfo->topicNameLength == MQTT_EXAMPLE_TOPIC_LENGTH ) && ( 0 == strncmp( MQTT_EXAMPLE_TOPIC, pPublishInfo->pTopicName, pPublishInfo->topicNameLength ) ) ) { LogInfo( ( "Incoming Publish Topic Name: %.*s matches subscribed topic.\n" "Incoming Publish message Packet Id is %u.\n" "Incoming Publish Message : %.*s.\n\n", pPublishInfo->topicNameLength, pPublishInfo->pTopicName, packetIdentifier, ( int ) pPublishInfo->payloadLength, ( const char * ) pPublishInfo->pPayload ) ); } else { LogInfo( ( "Incoming Publish Topic Name: %.*s does not match subscribed topic.", pPublishInfo->topicNameLength, pPublishInfo->pTopicName ) ); } } /*-----------------------------------------------------------*/ static void updateSubAckStatus( MQTTPacketInfo_t * pPacketInfo ) { uint8_t * pPayload = NULL; size_t pSize = 0; MQTTStatus_t mqttStatus = MQTT_GetSubAckStatusCodes( pPacketInfo, &pPayload, &pSize ); /* MQTT_GetSubAckStatusCodes always returns success if called with packet info * from the event callback and non-NULL parameters. */ assert( mqttStatus == MQTTSuccess ); /* Suppress unused variable warning when asserts are disabled in build. */ ( void ) mqttStatus; /* Demo only subscribes to one topic, so only one status code is returned. */ globalSubAckStatus = ( MQTTSubAckStatus_t ) pPayload[ 0 ]; } /*-----------------------------------------------------------*/ static int handleResubscribe( MQTTContext_t * pMqttContext ) { int returnStatus = EXIT_SUCCESS; MQTTStatus_t mqttStatus = MQTTSuccess; BackoffAlgorithmStatus_t backoffAlgStatus = BackoffAlgorithmSuccess; BackoffAlgorithmContext_t retryParams; uint16_t nextRetryBackOff = 0U; assert( pMqttContext != NULL ); /* Initialize retry attempts and interval. */ BackoffAlgorithm_InitializeParams( &retryParams, CONNECTION_RETRY_BACKOFF_BASE_MS, CONNECTION_RETRY_MAX_BACKOFF_DELAY_MS, CONNECTION_RETRY_MAX_ATTEMPTS ); do { /* Send SUBSCRIBE packet. * Note: reusing the value specified in globalSubscribePacketIdentifier is acceptable here * because this function is entered only after the receipt of a SUBACK, at which point * its associated packet id is free to use. */ mqttStatus = MQTT_Subscribe( pMqttContext, pGlobalSubscriptionList, sizeof( pGlobalSubscriptionList ) / sizeof( MQTTSubscribeInfo_t ), globalSubscribePacketIdentifier ); if( mqttStatus != MQTTSuccess ) { LogError( ( "Failed to send SUBSCRIBE packet to broker with error = %s.", MQTT_Status_strerror( mqttStatus ) ) ); returnStatus = EXIT_FAILURE; break; } LogInfo( ( "SUBSCRIBE sent for topic %.*s to broker.\n\n", MQTT_EXAMPLE_TOPIC_LENGTH, MQTT_EXAMPLE_TOPIC ) ); /* Process incoming packet. */ returnStatus = waitForPacketAck( pMqttContext, globalSubscribePacketIdentifier, MQTT_PROCESS_LOOP_TIMEOUT_MS ); if( returnStatus == EXIT_FAILURE ) { break; } /* Check if recent subscription request has been rejected. globalSubAckStatus is updated * in eventCallback to reflect the status of the SUBACK sent by the broker. It represents * either the QoS level granted by the server upon subscription, or acknowledgement of * server rejection of the subscription request. */ if( globalSubAckStatus == MQTTSubAckFailure ) { /* Generate a random number and get back-off value (in milliseconds) for the next re-subscribe attempt. */ backoffAlgStatus = BackoffAlgorithm_GetNextBackoff( &retryParams, generateRandomNumber(), &nextRetryBackOff ); if( backoffAlgStatus == BackoffAlgorithmRetriesExhausted ) { LogError( ( "Server rejected subscription request, all attempts exhausted." ) ); returnStatus = EXIT_FAILURE; } else if( backoffAlgStatus == BackoffAlgorithmSuccess ) { LogWarn( ( "Server rejected subscription request. Retrying " "connection after %hu ms backoff.", ( unsigned short ) nextRetryBackOff ) ); Clock_SleepMs( nextRetryBackOff ); } } } while( ( globalSubAckStatus == MQTTSubAckFailure ) && ( backoffAlgStatus == BackoffAlgorithmSuccess ) ); return returnStatus; } /*-----------------------------------------------------------*/ static void eventCallback( MQTTContext_t * pMqttContext, MQTTPacketInfo_t * pPacketInfo, MQTTDeserializedInfo_t * pDeserializedInfo ) { uint16_t packetIdentifier; assert( pMqttContext != NULL ); assert( pPacketInfo != NULL ); assert( pDeserializedInfo != NULL ); /* Suppress unused parameter warning when asserts are disabled in build. */ ( void ) pMqttContext; packetIdentifier = pDeserializedInfo->packetIdentifier; /* Handle incoming publish. The lower 4 bits of the publish packet * type is used for the dup, QoS, and retain flags. Hence masking * out the lower bits to check if the packet is publish. */ if( ( pPacketInfo->type & 0xF0U ) == MQTT_PACKET_TYPE_PUBLISH ) { assert( pDeserializedInfo->pPublishInfo != NULL ); /* Handle incoming publish. */ handleIncomingPublish( pDeserializedInfo->pPublishInfo, packetIdentifier ); } else { /* Handle other packets. */ switch( pPacketInfo->type ) { case MQTT_PACKET_TYPE_SUBACK: /* A SUBACK from the broker, containing the server response to our subscription request, has been received. * It contains the status code indicating server approval/rejection for the subscription to the single topic * requested. The SUBACK will be parsed to obtain the status code, and this status code will be stored in global * variable globalSubAckStatus. */ updateSubAckStatus( pPacketInfo ); /* Check status of the subscription request. If globalSubAckStatus does not indicate * server refusal of the request (MQTTSubAckFailure), it contains the QoS level granted * by the server, indicating a successful subscription attempt. */ if( globalSubAckStatus != MQTTSubAckFailure ) { LogInfo( ( "Subscribed to the topic %.*s. with maximum QoS %u.\n\n", MQTT_EXAMPLE_TOPIC_LENGTH, MQTT_EXAMPLE_TOPIC, globalSubAckStatus ) ); } /* Make sure ACK packet identifier matches with Request packet identifier. */ assert( globalSubscribePacketIdentifier == packetIdentifier ); /* Update the global ACK packet identifier. */ globalAckPacketIdentifier = packetIdentifier; break; case MQTT_PACKET_TYPE_UNSUBACK: LogInfo( ( "Unsubscribed from the topic %.*s.\n\n", MQTT_EXAMPLE_TOPIC_LENGTH, MQTT_EXAMPLE_TOPIC ) ); /* Make sure ACK packet identifier matches with Request packet identifier. */ assert( globalUnsubscribePacketIdentifier == packetIdentifier ); /* Update the global ACK packet identifier. */ globalAckPacketIdentifier = packetIdentifier; break; case MQTT_PACKET_TYPE_PINGRESP: /* Nothing to be done from application as library handles * PINGRESP. */ LogWarn( ( "PINGRESP should not be handled by the application " "callback when using MQTT_ProcessLoop.\n\n" ) ); break; case MQTT_PACKET_TYPE_PUBREC: LogInfo( ( "PUBREC received for packet id %u.\n\n", packetIdentifier ) ); /* Cleanup publish packet when a PUBREC is received. */ cleanupOutgoingPublishWithPacketID( packetIdentifier ); break; case MQTT_PACKET_TYPE_PUBREL: /* Nothing to be done from application as library handles * PUBREL. */ LogInfo( ( "PUBREL received for packet id %u.\n\n", packetIdentifier ) ); break; case MQTT_PACKET_TYPE_PUBCOMP: /* Nothing to be done from application as library handles * PUBCOMP. */ LogInfo( ( "PUBCOMP received for packet id %u.\n\n", packetIdentifier ) ); break; /* Any other packet type is invalid. */ default: LogError( ( "Unknown packet type received:(%02x).\n\n", pPacketInfo->type ) ); } } } /*-----------------------------------------------------------*/ static int establishMqttSession( MQTTContext_t * pMqttContext, bool createCleanSession, bool * pSessionPresent ) { int returnStatus = EXIT_SUCCESS; MQTTStatus_t mqttStatus; MQTTConnectInfo_t connectInfo; /* Establish MQTT session by sending a CONNECT packet. */ /* If #createCleanSession is true, start with a clean session * i.e. direct the MQTT broker to discard any previous session data. * If #createCleanSession is false, directs the broker to attempt to * reestablish a session which was already present. */ connectInfo.cleanSession = createCleanSession; /* The client identifier is used to uniquely identify this MQTT client to * the MQTT broker. In a production device the identifier can be something * unique, such as a device serial number. */ connectInfo.pClientIdentifier = CLIENT_IDENTIFIER; connectInfo.clientIdentifierLength = CLIENT_IDENTIFIER_LENGTH; /* The maximum time interval in seconds which is allowed to elapse * between two Control Packets. * It is the responsibility of the Client to ensure that the interval between * Control Packets being sent does not exceed the this Keep Alive value. In the * absence of sending any other Control Packets, the Client MUST send a * PINGREQ Packet. */ connectInfo.keepAliveSeconds = MQTT_KEEP_ALIVE_INTERVAL_SECONDS; /* Username and password for authentication. Not used in this demo. */ connectInfo.pUserName = NULL; connectInfo.userNameLength = 0U; connectInfo.pPassword = NULL; connectInfo.passwordLength = 0U; /* Send MQTT CONNECT packet to broker. */ mqttStatus = MQTT_Connect( pMqttContext, &connectInfo, NULL, CONNACK_RECV_TIMEOUT_MS, pSessionPresent ); if( mqttStatus != MQTTSuccess ) { returnStatus = EXIT_FAILURE; LogError( ( "Connection with MQTT broker failed with status %s.", MQTT_Status_strerror( mqttStatus ) ) ); } else { LogInfo( ( "MQTT connection successfully established with broker.\n\n" ) ); } return returnStatus; } /*-----------------------------------------------------------*/ static int disconnectMqttSession( MQTTContext_t * pMqttContext ) { MQTTStatus_t mqttStatus = MQTTSuccess; int returnStatus = EXIT_SUCCESS; assert( pMqttContext != NULL ); /* Send DISCONNECT. */ mqttStatus = MQTT_Disconnect( pMqttContext ); if( mqttStatus != MQTTSuccess ) { LogError( ( "Sending MQTT DISCONNECT failed with status=%s.", MQTT_Status_strerror( mqttStatus ) ) ); returnStatus = EXIT_FAILURE; } return returnStatus; } /*-----------------------------------------------------------*/ static int subscribeToTopic( MQTTContext_t * pMqttContext ) { int returnStatus = EXIT_SUCCESS; MQTTStatus_t mqttStatus; assert( pMqttContext != NULL ); /* Start with everything at 0. */ ( void ) memset( ( void * ) pGlobalSubscriptionList, 0x00, sizeof( pGlobalSubscriptionList ) ); /* This example subscribes to only one topic and uses QOS2. */ pGlobalSubscriptionList[ 0 ].qos = MQTTQoS2; pGlobalSubscriptionList[ 0 ].pTopicFilter = MQTT_EXAMPLE_TOPIC; pGlobalSubscriptionList[ 0 ].topicFilterLength = MQTT_EXAMPLE_TOPIC_LENGTH; /* Generate packet identifier for the SUBSCRIBE packet. */ globalSubscribePacketIdentifier = MQTT_GetPacketId( pMqttContext ); /* Send SUBSCRIBE packet. */ mqttStatus = MQTT_Subscribe( pMqttContext, pGlobalSubscriptionList, sizeof( pGlobalSubscriptionList ) / sizeof( MQTTSubscribeInfo_t ), globalSubscribePacketIdentifier ); if( mqttStatus != MQTTSuccess ) { LogError( ( "Failed to send SUBSCRIBE packet to broker with error = %s.", MQTT_Status_strerror( mqttStatus ) ) ); returnStatus = EXIT_FAILURE; } else { LogInfo( ( "SUBSCRIBE sent for topic %.*s to broker.\n\n", MQTT_EXAMPLE_TOPIC_LENGTH, MQTT_EXAMPLE_TOPIC ) ); } return returnStatus; } /*-----------------------------------------------------------*/ static int unsubscribeFromTopic( MQTTContext_t * pMqttContext ) { int returnStatus = EXIT_SUCCESS; MQTTStatus_t mqttStatus; assert( pMqttContext != NULL ); /* Start with everything at 0. */ ( void ) memset( ( void * ) pGlobalSubscriptionList, 0x00, sizeof( pGlobalSubscriptionList ) ); /* This example subscribes to and unsubscribes from only one topic * and uses QOS2. */ pGlobalSubscriptionList[ 0 ].qos = MQTTQoS2; pGlobalSubscriptionList[ 0 ].pTopicFilter = MQTT_EXAMPLE_TOPIC; pGlobalSubscriptionList[ 0 ].topicFilterLength = MQTT_EXAMPLE_TOPIC_LENGTH; /* Generate packet identifier for the UNSUBSCRIBE packet. */ globalUnsubscribePacketIdentifier = MQTT_GetPacketId( pMqttContext ); /* Send UNSUBSCRIBE packet. */ mqttStatus = MQTT_Unsubscribe( pMqttContext, pGlobalSubscriptionList, sizeof( pGlobalSubscriptionList ) / sizeof( MQTTSubscribeInfo_t ), globalUnsubscribePacketIdentifier ); if( mqttStatus != MQTTSuccess ) { LogError( ( "Failed to send UNSUBSCRIBE packet to broker with error = %s.", MQTT_Status_strerror( mqttStatus ) ) ); returnStatus = EXIT_FAILURE; } else { LogInfo( ( "UNSUBSCRIBE sent for topic %.*s to broker.\n\n", MQTT_EXAMPLE_TOPIC_LENGTH, MQTT_EXAMPLE_TOPIC ) ); } return returnStatus; } /*-----------------------------------------------------------*/ static int publishToTopic( MQTTContext_t * pMqttContext ) { int returnStatus = EXIT_SUCCESS; MQTTStatus_t mqttStatus = MQTTSuccess; uint8_t publishIndex = MAX_OUTGOING_PUBLISHES; assert( pMqttContext != NULL ); /* Get the next free index for the outgoing publish. All QoS2 outgoing * publishes are stored until a PUBREC is received. These messages are * stored for supporting a resend if a network connection is broken before * receiving a PUBREC. */ returnStatus = getNextFreeIndexForOutgoingPublishes( &publishIndex ); if( returnStatus == EXIT_FAILURE ) { LogError( ( "Unable to find a free spot for outgoing PUBLISH message.\n\n" ) ); } else { /* This example publishes to only one topic and uses QOS2. */ outgoingPublishPackets[ publishIndex ].pubInfo.qos = MQTTQoS2; outgoingPublishPackets[ publishIndex ].pubInfo.pTopicName = MQTT_EXAMPLE_TOPIC; outgoingPublishPackets[ publishIndex ].pubInfo.topicNameLength = MQTT_EXAMPLE_TOPIC_LENGTH; outgoingPublishPackets[ publishIndex ].pubInfo.pPayload = MQTT_EXAMPLE_MESSAGE; outgoingPublishPackets[ publishIndex ].pubInfo.payloadLength = MQTT_EXAMPLE_MESSAGE_LENGTH; /* Get a new packet id. */ outgoingPublishPackets[ publishIndex ].packetId = MQTT_GetPacketId( pMqttContext ); /* Send PUBLISH packet. */ mqttStatus = MQTT_Publish( pMqttContext, &outgoingPublishPackets[ publishIndex ].pubInfo, outgoingPublishPackets[ publishIndex ].packetId ); if( mqttStatus != MQTTSuccess ) { LogError( ( "Failed to send PUBLISH packet to broker with error = %s.", MQTT_Status_strerror( mqttStatus ) ) ); cleanupOutgoingPublishAt( publishIndex ); returnStatus = EXIT_FAILURE; } else { LogInfo( ( "PUBLISH sent for topic %.*s to broker with packet ID %u.\n\n", MQTT_EXAMPLE_TOPIC_LENGTH, MQTT_EXAMPLE_TOPIC, outgoingPublishPackets[ publishIndex ].packetId ) ); } } return returnStatus; } /*-----------------------------------------------------------*/ static int initializeMqtt( MQTTContext_t * pMqttContext, NetworkContext_t * pNetworkContext ) { int returnStatus = EXIT_SUCCESS; MQTTStatus_t mqttStatus; MQTTFixedBuffer_t networkBuffer; TransportInterface_t transport = { NULL }; assert( pMqttContext != NULL ); assert( pNetworkContext != NULL ); /* Fill in TransportInterface send and receive function pointers. * For this demo, TCP sockets are used to send and receive data * from network. Network context is SSL context for OpenSSL.*/ transport.pNetworkContext = pNetworkContext; transport.send = Openssl_Send; transport.recv = Openssl_Recv; transport.writev = NULL; /* Fill the values for network buffer. */ networkBuffer.pBuffer = buffer; networkBuffer.size = NETWORK_BUFFER_SIZE; /* Initialize MQTT library. */ mqttStatus = MQTT_Init( pMqttContext, &transport, Clock_GetTimeMs, eventCallback, &networkBuffer ); if( mqttStatus != MQTTSuccess ) { returnStatus = EXIT_FAILURE; LogError( ( "MQTT_Init failed: Status = %s.", MQTT_Status_strerror( mqttStatus ) ) ); } else { mqttStatus = MQTT_InitStatefulQoS( pMqttContext, pOutgoingPublishRecords, OUTGOING_PUBLISH_RECORD_LEN, pIncomingPublishRecords, INCOMING_PUBLISH_RECORD_LEN ); if( mqttStatus != MQTTSuccess ) { returnStatus = EXIT_FAILURE; LogError( ( "MQTT_InitStatefulQoS failed: Status = %s.", MQTT_Status_strerror( mqttStatus ) ) ); } } return returnStatus; } /*-----------------------------------------------------------*/ static int subscribePublishLoop( MQTTContext_t * pMqttContext ) { int returnStatus = EXIT_SUCCESS; MQTTStatus_t mqttStatus = MQTTSuccess; uint32_t publishCount = 0; const uint32_t maxPublishCount = MQTT_PUBLISH_COUNT_PER_LOOP; assert( pMqttContext != NULL ); if( returnStatus == EXIT_SUCCESS ) { /* The client is now connected to the broker. Subscribe to the topic * as specified in MQTT_EXAMPLE_TOPIC at the top of this file by sending a * subscribe packet. This client will then publish to the same topic it * subscribed to, so it will expect all the messages it sends to the broker * to be sent back to it from the broker. This demo uses QOS2 in Subscribe, * therefore, the Publish messages received from the broker will have QOS2. */ LogInfo( ( "Subscribing to the MQTT topic %.*s.", MQTT_EXAMPLE_TOPIC_LENGTH, MQTT_EXAMPLE_TOPIC ) ); returnStatus = subscribeToTopic( pMqttContext ); } if( returnStatus == EXIT_SUCCESS ) { /* Process incoming packet from the broker. Acknowledgment for subscription * ( SUBACK ) will be received here. However after sending the subscribe, the * client may receive a publish before it receives a subscribe ack. Since this * demo is subscribing to the topic to which no one is publishing, probability * of receiving publish message before subscribe ack is zero; but application * must be ready to receive any packet. This demo uses MQTT_ProcessLoop to * receive packet from network. */ returnStatus = waitForPacketAck( pMqttContext, globalSubscribePacketIdentifier, MQTT_PROCESS_LOOP_TIMEOUT_MS ); } /* Check if recent subscription request has been rejected. globalSubAckStatus is updated * in eventCallback to reflect the status of the SUBACK sent by the broker. */ if( ( returnStatus == EXIT_SUCCESS ) && ( globalSubAckStatus == MQTTSubAckFailure ) ) { /* If server rejected the subscription request, attempt to resubscribe to topic. * Attempts are made according to the exponential backoff retry strategy * implemented in retryUtils. */ LogInfo( ( "Server rejected initial subscription request. Attempting to re-subscribe to topic %.*s.", MQTT_EXAMPLE_TOPIC_LENGTH, MQTT_EXAMPLE_TOPIC ) ); returnStatus = handleResubscribe( pMqttContext ); } if( returnStatus == EXIT_SUCCESS ) { /* Publish messages with QOS2, receive incoming messages and * send keep alive messages. */ for( publishCount = 0; publishCount < maxPublishCount; publishCount++ ) { LogInfo( ( "Sending Publish to the MQTT topic %.*s.", MQTT_EXAMPLE_TOPIC_LENGTH, MQTT_EXAMPLE_TOPIC ) ); returnStatus = publishToTopic( pMqttContext ); /* Calling MQTT_ProcessLoop to process incoming publish echo, since * application subscribed to the same topic the broker will send * publish message back to the application. This function also * sends ping request to broker if MQTT_KEEP_ALIVE_INTERVAL_SECONDS * has expired since the last MQTT packet sent and receive * ping responses. */ mqttStatus = processLoopWithTimeout( pMqttContext, MQTT_PROCESS_LOOP_TIMEOUT_MS ); /* For any error in #MQTT_ProcessLoop, exit the loop and disconnect * from the broker. */ if( ( mqttStatus != MQTTSuccess ) && ( mqttStatus != MQTTNeedMoreBytes ) ) { LogError( ( "MQTT_ProcessLoop returned with status = %s.", MQTT_Status_strerror( mqttStatus ) ) ); returnStatus = EXIT_FAILURE; break; } LogInfo( ( "Delay before continuing to next iteration.\n\n" ) ); /* Leave connection idle for some time. */ sleep( DELAY_BETWEEN_PUBLISHES_SECONDS ); } } if( returnStatus == EXIT_SUCCESS ) { /* Unsubscribe from the topic. */ LogInfo( ( "Unsubscribing from the MQTT topic %.*s.", MQTT_EXAMPLE_TOPIC_LENGTH, MQTT_EXAMPLE_TOPIC ) ); returnStatus = unsubscribeFromTopic( pMqttContext ); } if( returnStatus == EXIT_SUCCESS ) { /* Process Incoming UNSUBACK packet from the broker. */ returnStatus = waitForPacketAck( pMqttContext, globalUnsubscribePacketIdentifier, MQTT_PROCESS_LOOP_TIMEOUT_MS ); } /* Send an MQTT Disconnect packet over the already connected TCP socket. * There is no corresponding response for the disconnect packet. After sending * disconnect, client must close the network connection. */ LogInfo( ( "Disconnecting the MQTT connection with %.*s.", BROKER_ENDPOINT_LENGTH, BROKER_ENDPOINT ) ); if( returnStatus == EXIT_FAILURE ) { /* Returned status is not used to update the local status as there * were failures in demo execution. */ ( void ) disconnectMqttSession( pMqttContext ); } else { returnStatus = disconnectMqttSession( pMqttContext ); } /* Reset global SUBACK status variable after completion of subscription request cycle. */ globalSubAckStatus = MQTTSubAckFailure; return returnStatus; } /*-----------------------------------------------------------*/ static int waitForPacketAck( MQTTContext_t * pMqttContext, uint16_t usPacketIdentifier, uint32_t ulTimeout ) { uint32_t ulMqttProcessLoopEntryTime; uint32_t ulMqttProcessLoopTimeoutTime; uint32_t ulCurrentTime; MQTTStatus_t eMqttStatus = MQTTSuccess; int returnStatus = EXIT_FAILURE; /* Reset the ACK packet identifier being received. */ globalAckPacketIdentifier = 0U; ulCurrentTime = pMqttContext->getTime(); ulMqttProcessLoopEntryTime = ulCurrentTime; ulMqttProcessLoopTimeoutTime = ulCurrentTime + ulTimeout; /* Call MQTT_ProcessLoop multiple times until the expected packet ACK * is received, a timeout happens, or MQTT_ProcessLoop fails. */ while( ( globalAckPacketIdentifier != usPacketIdentifier ) && ( ulCurrentTime < ulMqttProcessLoopTimeoutTime ) && ( eMqttStatus == MQTTSuccess || eMqttStatus == MQTTNeedMoreBytes ) ) { /* Event callback will set #globalAckPacketIdentifier when receiving * appropriate packet. */ eMqttStatus = MQTT_ProcessLoop( pMqttContext ); ulCurrentTime = pMqttContext->getTime(); } if( ( ( eMqttStatus != MQTTSuccess ) && ( eMqttStatus != MQTTNeedMoreBytes ) ) || ( globalAckPacketIdentifier != usPacketIdentifier ) ) { LogError( ( "MQTT_ProcessLoop failed to receive ACK packet: Expected ACK Packet ID=%02X, LoopDuration=%u, Status=%s", usPacketIdentifier, ( ulCurrentTime - ulMqttProcessLoopEntryTime ), MQTT_Status_strerror( eMqttStatus ) ) ); } else { returnStatus = EXIT_SUCCESS; } return returnStatus; } /*-----------------------------------------------------------*/ static MQTTStatus_t processLoopWithTimeout( MQTTContext_t * pMqttContext, uint32_t ulTimeoutMs ) { uint32_t ulMqttProcessLoopTimeoutTime; uint32_t ulCurrentTime; MQTTStatus_t eMqttStatus = MQTTSuccess; ulCurrentTime = pMqttContext->getTime(); ulMqttProcessLoopTimeoutTime = ulCurrentTime + ulTimeoutMs; /* Call MQTT_ProcessLoop multiple times a timeout happens, or * MQTT_ProcessLoop fails. */ while( ( ulCurrentTime < ulMqttProcessLoopTimeoutTime ) && ( eMqttStatus == MQTTSuccess || eMqttStatus == MQTTNeedMoreBytes ) ) { eMqttStatus = MQTT_ProcessLoop( pMqttContext ); ulCurrentTime = pMqttContext->getTime(); } return eMqttStatus; } /*-----------------------------------------------------------*/ /** * @brief Entry point of demo. * * The example shown below uses MQTT APIs to send and receive MQTT packets * over the TLS connection established using OpenSSL. * * The example is single threaded and uses statically allocated memory; * it uses QOS2 and therefore implements a retransmission mechanism * for Publish messages. Retransmission of publish messages are attempted * when a MQTT connection is established with a session that was already * present. All the outgoing publish messages waiting to receive PUBREC * are resent in this demo. In order to support retransmission all the outgoing * publishes are stored until a PUBREC is received. */ int main( int argc, char ** argv ) { int returnStatus = EXIT_SUCCESS; MQTTContext_t mqttContext = { 0 }; NetworkContext_t networkContext = { 0 }; OpensslParams_t opensslParams = { 0 }; bool clientSessionPresent = false, brokerSessionPresent = false; struct timespec tp; ( void ) argc; ( void ) argv; /* Set the pParams member of the network context with desired transport. */ networkContext.pParams = &opensslParams; /* Seed pseudo random number generator (provided by ISO C standard library) for * use by retry utils library when retrying failed network operations. */ /* Get current time to seed pseudo random number generator. */ ( void ) clock_gettime( CLOCK_REALTIME, &tp ); /* Seed pseudo random number generator with nanoseconds. */ srand( tp.tv_nsec ); /* Initialize MQTT library. Initialization of the MQTT library needs to be * done only once in this demo. */ returnStatus = initializeMqtt( &mqttContext, &networkContext ); if( returnStatus == EXIT_SUCCESS ) { for( ; ; ) { /* Attempt to connect to the MQTT broker. If connection fails, retry after * a timeout. Timeout value will be exponentially increased till the maximum * attempts are reached or maximum timeout value is reached. The function * returns EXIT_FAILURE if the TCP connection cannot be established to * broker after configured number of attempts. */ returnStatus = connectToServerWithBackoffRetries( &networkContext, &mqttContext, &clientSessionPresent, &brokerSessionPresent ); if( returnStatus == EXIT_FAILURE ) { /* Log error to indicate connection failure after all * reconnect attempts are over. */ LogError( ( "Failed to connect to MQTT broker %.*s.", BROKER_ENDPOINT_LENGTH, BROKER_ENDPOINT ) ); } else { /* Update the flag to indicate that an MQTT client session is saved. * Once this flag is set, MQTT connect in the following iterations of * this demo will be attempted without requesting for a clean session. */ clientSessionPresent = true; /* Check if session is present and if there are any outgoing publishes * that need to resend. This is only valid if the broker is * re-establishing a session which was already present. */ if( brokerSessionPresent == true ) { LogInfo( ( "An MQTT session with broker is re-established. " "Resending unacked publishes." ) ); /* Handle all the resend of publish messages. */ returnStatus = handlePublishResend( &mqttContext ); } else { LogInfo( ( "A clean MQTT connection is established." " Cleaning up all the stored outgoing publishes.\n\n" ) ); /* Clean up the outgoing publishes waiting for ack as this new * connection doesn't re-establish an existing session. */ cleanupOutgoingPublishes(); } /* If TLS session is established, execute Subscribe/Publish loop. */ returnStatus = subscribePublishLoop( &mqttContext ); /* End TLS session, then close TCP connection. */ ( void ) Openssl_Disconnect( &networkContext ); } if( returnStatus == EXIT_SUCCESS ) { /* Log message indicating an iteration completed successfully. */ LogInfo( ( "Demo completed successfully." ) ); } LogInfo( ( "Short delay before starting the next iteration ....\n " ) ); sleep( MQTT_SUBPUB_LOOP_DELAY_SECONDS ); } } return returnStatus; } /*-----------------------------------------------------------*/