/* * FreeRTOS V202107.00 * Copyright (C) 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Permission is hereby granted, free of charge, to any person obtaining a copy of * this software and associated documentation files (the "Software"), to deal in * the Software without restriction, including without limitation the rights to * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of * the Software, and to permit persons to whom the Software is furnished to do so, * subject to the following conditions: * * The above copyright notice and this permission notice shall be included in all * copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * * http://www.FreeRTOS.org * http://aws.amazon.com/freertos * */ /* * Demo for showing use of the MQTT API using a mutually authenticated * network connection. * * The Example shown below uses MQTT APIs to create MQTT messages and send them * over the mutually authenticated network connection established with the * MQTT broker. This example is single threaded and uses statically allocated * memory. It uses QoS1 for sending to and receiving messages from the broker. * * A mutually authenticated TLS connection is used to connect to the * MQTT message broker in this example. Define democonfigMQTT_BROKER_ENDPOINT * and democonfigROOT_CA_PEM, in mqtt_demo_mutual_auth_config.h, and the client * private key and certificate, in aws_clientcredential_keys.h, to establish a * mutually authenticated connection. */ /** * @file mqtt_demo_mutual_auth.c * @brief Demonstrates usage of the MQTT library. */ /* Standard includes. */ #include #include #include /* Demo Specific configs. */ #include "demo_config.h" #include "core_pkcs11_config.h" /* Kernel includes. */ #include "FreeRTOS.h" #include "task.h" /* MQTT library includes. */ #include "core_mqtt.h" /* Retry utilities include. */ #include "backoff_algorithm.h" /* Include PKCS11 helpers header. */ #include "pkcs11_helpers.h" /* Transport interface implementation include header for TLS. */ #include "transport_interface_api.h" /*------------- Demo configurations -------------------------*/ /** * @brief The root CA certificate belonging to the broker. */ #ifndef democonfigROOT_CA_PEM #define democonfigROOT_CA_PEM NULL #define democonfigROOT_CA_LENGTH 0 #else #define democonfigROOT_CA_LENGTH sizeof( democonfigROOT_CA_PEM ) #endif /** * @brief The maximum number of times to run the subscribe publish loop in this * demo. */ #ifndef democonfigMQTT_MAX_DEMO_COUNT #define democonfigMQTT_MAX_DEMO_COUNT ( 3 ) #endif /*-----------------------------------------------------------*/ /** * @brief The maximum number of retries for network operation with server. */ #define RETRY_MAX_ATTEMPTS ( 5U ) /** * @brief The maximum back-off delay (in milliseconds) for retrying failed operation * with server. */ #define RETRY_MAX_BACKOFF_DELAY_MS ( 5000U ) /** * @brief The base back-off delay (in milliseconds) to use for network operation retry * attempts. */ #define RETRY_BACKOFF_BASE_MS ( 500U ) /** * @brief Timeout for receiving CONNACK packet in milliseconds. */ #define mqttexampleCONNACK_RECV_TIMEOUT_MS ( 1000U ) /** * @brief The topic to subscribe and publish to in the example. * * The topic name starts with the client identifier to ensure that each demo * interacts with a unique topic name. */ #define mqttexampleTOPIC democonfigCLIENT_IDENTIFIER "/example/topic" /** * @brief The number of topic filters to subscribe. */ #define mqttexampleTOPIC_COUNT ( 1 ) /** * @brief The MQTT message published in this example. */ #define mqttexampleMESSAGE "Hello World!" /** * @brief Time in ticks to wait between each cycle of the demo implemented * by RunCoreMqttMutualAuthDemo(). */ #define mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS_TICKS ( pdMS_TO_TICKS( 5000U ) ) /** * @brief Timeout for MQTT_ProcessLoop in milliseconds. */ #define mqttexamplePROCESS_LOOP_TIMEOUT_MS ( 700U ) /** * @brief The maximum number of times to call MQTT_ProcessLoop() when polling * for a specific packet from the broker. */ #define MQTT_PROCESS_LOOP_PACKET_WAIT_COUNT_MAX ( 4U ) /** * @brief Keep alive time reported to the broker while establishing * an MQTT connection. * * It is the responsibility of the Client to ensure that the interval between * Control Packets being sent does not exceed the this Keep Alive value. In the * absence of sending any other Control Packets, the Client MUST send a * PINGREQ Packet. */ #define mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS ( 60U ) /** * @brief Delay (in ticks) between consecutive cycles of MQTT publish operations in a * demo iteration. * * Note that the process loop also has a timeout, so the total time between * publishes is the sum of the two delays. */ #define mqttexampleDELAY_BETWEEN_PUBLISHES_TICKS ( pdMS_TO_TICKS( 2000U ) ) /** * @brief Transport timeout in milliseconds for transport send and receive. */ #define mqttexampleTRANSPORT_SEND_RECV_TIMEOUT_MS ( 500U ) /** * @brief Milliseconds per second. */ #define MILLISECONDS_PER_SECOND ( 1000U ) /** * @brief Milliseconds per FreeRTOS tick. */ #define MILLISECONDS_PER_TICK ( MILLISECONDS_PER_SECOND / configTICK_RATE_HZ ) /*-----------------------------------------------------------*/ /** * @brief Calculate and perform an exponential backoff with jitter delay for * the next retry attempt of a failed network operation with the server. * * The function generates a random number, calculates the next backoff period * with the generated random number, and performs the backoff delay operation if the * number of retries have not exhausted. * * @note The PKCS11 module is used to generate the random number as it allows access * to a True Random Number Generator (TRNG) if the vendor platform supports it. * It is recommended to seed the random number generator with a device-specific entropy * source so that probability of collisions from devices in connection retries is mitigated. * * @note The backoff period is calculated using the backoffAlgorithm library. * * @param[in, out] pxRetryAttempts The context to use for backoff period calculation * with the backoffAlgorithm library. * * @return pdPASS if calculating the backoff period was successful; otherwise pdFAIL * if there was failure in random number generation OR all retry attempts had exhausted. */ static BaseType_t prvBackoffForRetry( BackoffAlgorithmContext_t * pxRetryParams ); /** * @brief Connect to MQTT broker with reconnection retries. * * If connection fails, retry is attempted after a timeout. * Timeout value will exponentially increase until maximum * timeout value is reached or the number of attempts are exhausted. * * @param[out] pxNetworkContext The output parameter to return the created network context. * * @return pdFAIL on failure; pdPASS on successful TLS+TCP network connection. */ static BaseType_t prvConnectToServerWithBackoffRetries( NetworkContext_t * pNetworkContext ); /** * @brief Sends an MQTT Connect packet over the already connected TLS over TCP connection. * * @param[in, out] pxMQTTContext MQTT context pointer. * @param[in] xNetworkContext Network context. * * @return pdFAIL on failure; pdPASS on successful MQTT connection. */ static BaseType_t prvCreateMQTTConnectionWithBroker( MQTTContext_t * pxMQTTContext, NetworkContext_t * pxNetworkContext ); /** * @brief Function to update variable #xTopicFilterContext with status * information from Subscribe ACK. Called by the event callback after processing * an incoming SUBACK packet. * * @param[in] pxPacketInfo Server response to the subscription request. */ static void prvUpdateSubAckStatus( MQTTPacketInfo_t * pxPacketInfo ); /** * @brief Subscribes to the topic as specified in mqttexampleTOPIC at the top of * this file. In the case of a Subscribe ACK failure, then subscription is * retried using an exponential backoff strategy with jitter. * * @param[in] pxMQTTContext MQTT context pointer. * * @return pdFAIL on failure; pdPASS on successful SUBSCRIBE request. */ static BaseType_t prvMQTTSubscribeWithBackoffRetries( MQTTContext_t * pxMQTTContext ); /** * @brief Publishes a message mqttexampleMESSAGE on mqttexampleTOPIC topic. * * @param[in] pxMQTTContext MQTT context pointer. * * @return pdFAIL on failure; pdPASS on successful PUBLISH operation. */ static BaseType_t prvMQTTPublishToTopic( MQTTContext_t * pxMQTTContext ); /** * @brief Unsubscribes from the previously subscribed topic as specified * in mqttexampleTOPIC. * * @param[in] pxMQTTContext MQTT context pointer. * * @return pdFAIL on failure; pdPASS on successful UNSUBSCRIBE request. */ static BaseType_t prvMQTTUnsubscribeFromTopic( MQTTContext_t * pxMQTTContext ); /** * @brief The timer query function provided to the MQTT context. * * @return Time in milliseconds. */ static uint32_t prvGetTimeMs( void ); /** * @brief Process a response or ack to an MQTT request (PING, PUBLISH, * SUBSCRIBE or UNSUBSCRIBE). This function processes PINGRESP, PUBACK, * SUBACK, and UNSUBACK. * * @param[in] pxIncomingPacket is a pointer to structure containing deserialized * MQTT response. * @param[in] usPacketId is the packet identifier from the ack received. */ static void prvMQTTProcessResponse( MQTTPacketInfo_t * pxIncomingPacket, uint16_t usPacketId ); /** * @brief Process incoming Publish message. * * @param[in] pxPublishInfo is a pointer to structure containing deserialized * Publish message. */ static void prvMQTTProcessIncomingPublish( MQTTPublishInfo_t * pxPublishInfo ); /** * @brief The application callback function for getting the incoming publishes, * incoming acks, and ping responses reported from the MQTT library. * * @param[in] pxMQTTContext MQTT context pointer. * @param[in] pxPacketInfo Packet Info pointer for the incoming packet. * @param[in] pxDeserializedInfo Deserialized information from the incoming packet. */ static void prvEventCallback( MQTTContext_t * pxMQTTContext, MQTTPacketInfo_t * pxPacketInfo, MQTTDeserializedInfo_t * pxDeserializedInfo ); /** * @brief Helper function to wait for a specific incoming packet from the * broker. * * @param[in] pxMQTTContext MQTT context pointer. * @param[in] usPacketType Packet type to wait for. * * @return The return status from call to #MQTT_ProcessLoop API. */ static MQTTStatus_t prvWaitForPacket( MQTTContext_t * pxMQTTContext, uint16_t usPacketType ); /*-----------------------------------------------------------*/ /** * @brief Static buffer used to hold MQTT messages being sent and received. */ static uint8_t ucSharedBuffer[ democonfigNETWORK_BUFFER_SIZE ]; /** * @brief Global entry time into the application to use as a reference timestamp * in the #prvGetTimeMs function. #prvGetTimeMs will always return the difference * between the current time and the global entry time. This will reduce the chances * of overflow for the 32 bit unsigned integer used for holding the timestamp. */ static uint32_t ulGlobalEntryTimeMs; /** * @brief Packet Identifier generated when Publish request was sent to the broker; * it is used to match received Publish ACK to the transmitted Publish packet. */ static uint16_t usPublishPacketIdentifier; /** * @brief Packet Identifier generated when Subscribe request was sent to the broker; * it is used to match received Subscribe ACK to the transmitted Subscribe packet. */ static uint16_t usSubscribePacketIdentifier; /** * @brief Packet Identifier generated when Unsubscribe request was sent to the broker; * it is used to match received Unsubscribe response to the transmitted Unsubscribe * request. */ static uint16_t usUnsubscribePacketIdentifier; /** * @brief MQTT packet type received from the MQTT broker. * * @note Only on receiving incoming PUBLISH, SUBACK, and UNSUBACK, this * variable is updated. For MQTT packets PUBACK and PINGRESP, the variable is * not updated since there is no need to specifically wait for it in this demo. * A single variable suffices as this demo uses single task and requests one operation * (of PUBLISH, SUBSCRIBE, UNSUBSCRIBE) at a time before expecting response from * the broker. Hence it is not possible to receive multiple packets of type PUBLISH, * SUBACK, and UNSUBACK in a single call of #prvWaitForPacket. * For a multi task application, consider a different method to wait for the packet, if needed. */ static uint16_t usPacketTypeReceived = 0U; /** * @brief A pair containing a topic filter and its SUBACK status. */ typedef struct topicFilterContext { const char * pcTopicFilter; MQTTSubAckStatus_t xSubAckStatus; } topicFilterContext_t; /** * @brief An array containing the context of a SUBACK; the SUBACK status * of a filter is updated when the event callback processes a SUBACK. */ static topicFilterContext_t xTopicFilterContext[ mqttexampleTOPIC_COUNT ] = { { mqttexampleTOPIC, MQTTSubAckFailure } }; /** @brief Static buffer used to hold MQTT messages being sent and received. */ static MQTTFixedBuffer_t xBuffer = { ucSharedBuffer, democonfigNETWORK_BUFFER_SIZE }; /*-----------------------------------------------------------*/ /* * @brief The example shown below uses MQTT APIs to create MQTT messages and * send them over the mutually authenticated network connection established with the * MQTT broker. This example is single threaded and uses statically allocated * memory. It uses QoS1 for sending to and receiving messages from the broker. * * This MQTT client subscribes to the topic as specified in mqttexampleTOPIC at the * top of this file by sending a subscribe packet and then waiting for a subscribe * acknowledgment (SUBACK).This client will then publish to the same topic it * subscribed to, so it will expect all the messages it sends to the broker to be * sent back to it from the broker. * * This example runs for democonfigMQTT_MAX_DEMO_COUNT, if the * connection to the broker goes down, the code tries to reconnect to the broker * with an exponential backoff mechanism. */ void vCoreMQTTMutualAuthDemoTask( void * pvParam ) { uint32_t ulPublishCount = 0U, ulTopicCount = 0U; const uint32_t ulMaxPublishCount = 5UL; NetworkContext_t xNetworkContext = { 0 }; MQTTContext_t xMQTTContext = { 0 }; MQTTStatus_t xMQTTStatus; uint32_t ulDemoRunCount = 0UL, ulDemoSuccessCount = 0UL; TransportStatus_t xNetworkStatus; BaseType_t xIsConnectionEstablished = pdFALSE; BaseType_t xDemoStatus = pdFAIL; ( void ) pvParam; /* Set the entry time of the demo application. This entry time will be used * to calculate relative time elapsed in the execution of the demo application, * by the timer utility function that is provided to the MQTT library. */ ulGlobalEntryTimeMs = prvGetTimeMs(); for( ulDemoRunCount = 0UL; ( ulDemoRunCount < democonfigMQTT_MAX_DEMO_COUNT ); ulDemoRunCount++ ) { /****************************** Connect. ******************************/ /* Attempt to establish TLS session with MQTT broker. If connection fails, * retry after a timeout. Timeout value will be exponentially increased until * the maximum number of attempts are reached or the maximum timeout value is reached. * The function returns a failure status if the TLS over TCP connection cannot be established * to the broker after the configured number of attempts. */ xDemoStatus = prvConnectToServerWithBackoffRetries( &xNetworkContext ); if( xDemoStatus == pdPASS ) { /* Set a flag indicating a TLS connection exists. This is done to * disconnect if the loop exits before disconnection happens. */ xIsConnectionEstablished = pdTRUE; /* Sends an MQTT Connect packet over the already established TLS connection, * and waits for connection acknowledgment (CONNACK) packet. */ LogInfo( ( "Creating an MQTT connection to %s.", democonfigMQTT_BROKER_ENDPOINT ) ); xDemoStatus = prvCreateMQTTConnectionWithBroker( &xMQTTContext, &xNetworkContext ); } /**************************** Subscribe. ******************************/ if( xDemoStatus == pdPASS ) { /* If server rejected the subscription request, attempt to resubscribe to topic. * Attempts are made according to the exponential backoff retry strategy * implemented in backoff_algorithm. */ xDemoStatus = prvMQTTSubscribeWithBackoffRetries( &xMQTTContext ); } /**************************** Publish and Keep Alive Loop. ******************************/ /* Publish messages with QoS1, send and process Keep alive messages. */ for( ulPublishCount = 0; ( ( xDemoStatus == pdPASS ) && ( ulPublishCount < ulMaxPublishCount ) ); ulPublishCount++ ) { LogInfo( ( "Publish to the MQTT topic %s.", mqttexampleTOPIC ) ); xDemoStatus = prvMQTTPublishToTopic( &xMQTTContext ); if( xDemoStatus == pdPASS ) { /* Process incoming publish echo, since application subscribed to the same * topic, the broker will send publish message back to the application. * #prvWaitForPacket will try to receive an incoming PUBLISH packet from broker. * Please note that PUBACK for the outgoing PUBLISH may also be received before * receiving an incoming PUBLISH. */ LogInfo( ( "Attempt to receive publish message from broker." ) ); xMQTTStatus = prvWaitForPacket( &xMQTTContext, MQTT_PACKET_TYPE_PUBLISH ); if( xMQTTStatus != MQTTSuccess ) { xDemoStatus = pdFAIL; } } /* Leave Connection Idle for some time. */ LogInfo( ( "Keeping Connection Idle..." ) ); vTaskDelay( mqttexampleDELAY_BETWEEN_PUBLISHES_TICKS ); } /************************ Unsubscribe from the topic. **************************/ if( xDemoStatus == pdPASS ) { LogInfo( ( "Unsubscribe from the MQTT topic %s.", mqttexampleTOPIC ) ); xDemoStatus = prvMQTTUnsubscribeFromTopic( &xMQTTContext ); } if( xDemoStatus == pdPASS ) { /* Process incoming UNSUBACK packet from the broker. */ xMQTTStatus = prvWaitForPacket( &xMQTTContext, MQTT_PACKET_TYPE_UNSUBACK ); if( xMQTTStatus != MQTTSuccess ) { xDemoStatus = pdFAIL; } } /**************************** Disconnect. ******************************/ if( xDemoStatus == pdPASS ) { /* Send an MQTT Disconnect packet over the already connected TLS over TCP connection. * There is no corresponding response for the disconnect packet. After sending * disconnect, client must close the network connection. */ LogInfo( ( "Disconnecting the MQTT connection with %s.", democonfigMQTT_BROKER_ENDPOINT ) ); xMQTTStatus = MQTT_Disconnect( &xMQTTContext ); } /* We will always close the network connection, even if an error may have occurred during * demo execution, to clean up the system resources that it may have consumed. */ if( xIsConnectionEstablished == pdTRUE ) { /* Close the network connection. */ xNetworkStatus = Transport_Disconnect( &xNetworkContext ); if( xNetworkStatus != TRANSPORT_STATUS_SUCCESS ) { xDemoStatus = pdFAIL; LogError( ( "Transport_Disconnect() failed to close the network connection. " "StatusCode=%d.", ( int ) xNetworkStatus ) ); } } /* Reset SUBACK status for each topic filter after completion of subscription request cycle. */ for( ulTopicCount = 0; ulTopicCount < mqttexampleTOPIC_COUNT; ulTopicCount++ ) { xTopicFilterContext[ ulTopicCount ].xSubAckStatus = MQTTSubAckFailure; } if( xDemoStatus == pdPASS ) { /* Wait for some time between two iterations to ensure that we do not * bombard the broker. */ LogInfo( ( "Demo completed an iteration successfully." ) ); LogInfo( ( "Demo iteration %lu completed successfully.", ( ulDemoRunCount + 1UL ) ) ); /* Update success count. */ ulDemoSuccessCount++; } else { /* Demo loop will be repeated for democonfigMQTT_MAX_DEMO_COUNT * times even if current loop resulted in a failure. */ LogInfo( ( "Demo failed at iteration %lu.", ( ulDemoRunCount + 1UL ) ) ); } LogInfo( ( "Short delay before starting the next iteration.... " ) ); vTaskDelay( mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS_TICKS ); } /* Demo run is considered successful if more than half of * #democonfigMQTT_MAX_DEMO_COUNT is successful. */ if( ulDemoSuccessCount > ( democonfigMQTT_MAX_DEMO_COUNT / 2 ) ) { xDemoStatus = pdPASS; LogInfo( ( "Demo run is successful with %lu successful loops out of total %lu loops.", ( ulDemoSuccessCount ), democonfigMQTT_MAX_DEMO_COUNT ) ); } else { xDemoStatus = pdFAIL; LogInfo( ( "Demo run failed with %lu failed loops out of total %lu loops." " RequiredSuccessCounts=%lu.", ( democonfigMQTT_MAX_DEMO_COUNT - ulDemoSuccessCount ), democonfigMQTT_MAX_DEMO_COUNT, ( ( democonfigMQTT_MAX_DEMO_COUNT / 2 ) + 1 ) ) ); } } /*-----------------------------------------------------------*/ static BaseType_t prvBackoffForRetry( BackoffAlgorithmContext_t * pxRetryParams ) { BaseType_t xReturnStatus = pdFAIL; uint16_t usNextRetryBackOff = 0U; BackoffAlgorithmStatus_t xBackoffAlgStatus = BackoffAlgorithmSuccess; /** * To calculate the backoff period for the next retry attempt, we will * generate a random number to provide to the backoffAlgorithm library. * * Note: The PKCS11 module is used to generate the random number as it allows access * to a True Random Number Generator (TRNG) if the vendor platform supports it. * It is recommended to use a random number generator seeded with a device-specific * entropy source so that probability of collisions from devices in connection retries * is mitigated. */ uint32_t ulRandomNum = 0; if( xPkcs11GenerateRandomNumber( ( uint8_t * ) &ulRandomNum, sizeof( ulRandomNum ) ) == pdPASS ) { /* Get back-off value (in milliseconds) for the next retry attempt. */ xBackoffAlgStatus = BackoffAlgorithm_GetNextBackoff( pxRetryParams, ulRandomNum, &usNextRetryBackOff ); if( xBackoffAlgStatus == BackoffAlgorithmRetriesExhausted ) { LogError( ( "All retry attempts have exhausted. Operation will not be retried" ) ); } else if( xBackoffAlgStatus == BackoffAlgorithmSuccess ) { /* Perform the backoff delay. */ vTaskDelay( pdMS_TO_TICKS( usNextRetryBackOff ) ); xReturnStatus = pdPASS; LogInfo( ( "Retry attempt %lu out of maximum retry attempts %lu.", ( pxRetryParams->attemptsDone + 1 ), pxRetryParams->maxRetryAttempts ) ); } } else { LogError( ( "Unable to retry operation with broker: Random number generation failed" ) ); } return xReturnStatus; } /*-----------------------------------------------------------*/ static BaseType_t prvConnectToServerWithBackoffRetries( NetworkContext_t * pxNetworkContext ) { ServerInfo_t xServerInfo = { 0 }; TLSParams_t xTLSParams = { 0 }; TransportStatus_t xNetworkStatus = TRANSPORT_STATUS_SUCCESS; BackoffAlgorithmContext_t xReconnectParams; BaseType_t xBackoffStatus = pdFALSE; /* Set the credentials for establishing a TLS connection. */ /* Initializer server information. */ xServerInfo.pHostName = democonfigMQTT_BROKER_ENDPOINT; xServerInfo.hostNameLength = strlen( democonfigMQTT_BROKER_ENDPOINT ); xServerInfo.port = democonfigMQTT_BROKER_PORT; /* Configure credentials for TLS mutual authenticated session. */ xTLSParams.pAlpnProtos = NULL; xTLSParams.maxFragmentLength = 0; xTLSParams.disableSni = false; xTLSParams.pRootCa = democonfigROOT_CA_PEM; xTLSParams.rootCaSize = democonfigROOT_CA_LENGTH; xTLSParams.pPrivateKeyLabel = pkcs11configLABEL_DEVICE_PRIVATE_KEY_FOR_TLS; xTLSParams.pClientCertLabel = pkcs11configLABEL_DEVICE_CERTIFICATE_FOR_TLS; xTLSParams.pLoginPIN = configPKCS11_DEFAULT_USER_PIN; /* Initialize reconnect attempts and interval. */ BackoffAlgorithm_InitializeParams( &xReconnectParams, RETRY_BACKOFF_BASE_MS, RETRY_MAX_BACKOFF_DELAY_MS, RETRY_MAX_ATTEMPTS ); /* Attempt to connect to MQTT broker. If connection fails, retry after * a timeout. Timeout value will exponentially increase till maximum * attempts are reached. */ do { /* Establish a TLS session with the MQTT broker. This example connects to * the MQTT broker as specified in democonfigMQTT_BROKER_ENDPOINT and * democonfigMQTT_BROKER_PORT at the top of this file. */ LogInfo( ( "Creating a TLS connection to %s:%u.", democonfigMQTT_BROKER_ENDPOINT, democonfigMQTT_BROKER_PORT ) ); /* Attempt to create a mutually authenticated TLS connection. */ xNetworkStatus = Transport_Connect( pxNetworkContext, &xServerInfo, &xTLSParams, mqttexampleTRANSPORT_SEND_RECV_TIMEOUT_MS, mqttexampleTRANSPORT_SEND_RECV_TIMEOUT_MS ); if( xNetworkStatus != TRANSPORT_STATUS_SUCCESS ) { LogWarn( ( "Connection to the broker failed with error %d. Attempting connection retry after backoff delay.", xNetworkStatus ) ); /* As the connection attempt failed, we will retry the connection after an * exponential backoff with jitter delay. */ /* Calculate the backoff period for the next retry attempt and perform the wait operation. */ xBackoffStatus = prvBackoffForRetry( &xReconnectParams ); } } while( ( xNetworkStatus != TRANSPORT_STATUS_SUCCESS ) && ( xBackoffStatus == pdPASS ) ); return ( xNetworkStatus == TRANSPORT_STATUS_SUCCESS ) ? pdPASS : pdFAIL; } /*-----------------------------------------------------------*/ static BaseType_t prvCreateMQTTConnectionWithBroker( MQTTContext_t * pxMQTTContext, NetworkContext_t * pxNetworkContext ) { MQTTStatus_t xResult; MQTTConnectInfo_t xConnectInfo; bool xSessionPresent; TransportInterface_t xTransport; BaseType_t xStatus = pdFAIL; /* Fill in Transport Interface send and receive function pointers. */ xTransport.pNetworkContext = pxNetworkContext; xTransport.send = Transport_Send; xTransport.recv = Transport_Recv; /* Initialize MQTT library. */ xResult = MQTT_Init( pxMQTTContext, &xTransport, prvGetTimeMs, prvEventCallback, &xBuffer ); configASSERT( xResult == MQTTSuccess ); /* Some fields are not used in this demo so start with everything at 0. */ ( void ) memset( ( void * ) &xConnectInfo, 0x00, sizeof( xConnectInfo ) ); /* Start with a clean session i.e. direct the MQTT broker to discard any * previous session data. Also, establishing a connection with clean session * will ensure that the broker does not store any data when this client * gets disconnected. */ xConnectInfo.cleanSession = true; /* The client identifier is used to uniquely identify this MQTT client to * the MQTT broker. In a production device the identifier can be something * unique, such as a device serial number. */ xConnectInfo.pClientIdentifier = democonfigCLIENT_IDENTIFIER; xConnectInfo.clientIdentifierLength = ( uint16_t ) strlen( democonfigCLIENT_IDENTIFIER ); /* Use the metrics string as username to report the OS and MQTT client version * metrics to AWS IoT. */ xConnectInfo.pUserName = democonfigMQTT_USERNAME; xConnectInfo.userNameLength = democonfigMQTT_USERNAME_LENGTH; /* Set MQTT keep-alive period. If the application does not send packets at an interval less than * the keep-alive period, the MQTT library will send PINGREQ packets. */ xConnectInfo.keepAliveSeconds = mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS; /* Send MQTT CONNECT packet to broker. LWT is not used in this demo, so it * is passed as NULL. */ xResult = MQTT_Connect( pxMQTTContext, &xConnectInfo, NULL, mqttexampleCONNACK_RECV_TIMEOUT_MS, &xSessionPresent ); if( xResult != MQTTSuccess ) { LogError( ( "Failed to establish MQTT connection: Server=%s, MQTTStatus=%s", democonfigMQTT_BROKER_ENDPOINT, MQTT_Status_strerror( xResult ) ) ); } else { /* Successfully established and MQTT connection with the broker. */ LogInfo( ( "An MQTT connection is established with %s.", democonfigMQTT_BROKER_ENDPOINT ) ); xStatus = pdPASS; } return xStatus; } /*-----------------------------------------------------------*/ static void prvUpdateSubAckStatus( MQTTPacketInfo_t * pxPacketInfo ) { MQTTStatus_t xResult = MQTTSuccess; uint8_t * pucPayload = NULL; size_t ulSize = 0; uint32_t ulTopicCount = 0U; xResult = MQTT_GetSubAckStatusCodes( pxPacketInfo, &pucPayload, &ulSize ); /* MQTT_GetSubAckStatusCodes always returns success if called with packet info * from the event callback and non-NULL parameters. */ configASSERT( xResult == MQTTSuccess ); for( ulTopicCount = 0; ulTopicCount < ulSize; ulTopicCount++ ) { xTopicFilterContext[ ulTopicCount ].xSubAckStatus = pucPayload[ ulTopicCount ]; } } /*-----------------------------------------------------------*/ static BaseType_t prvMQTTSubscribeWithBackoffRetries( MQTTContext_t * pxMQTTContext ) { MQTTStatus_t xResult = MQTTSuccess; BackoffAlgorithmContext_t xRetryParams; BaseType_t xBackoffStatus = pdFAIL; MQTTSubscribeInfo_t xMQTTSubscription[ mqttexampleTOPIC_COUNT ]; BaseType_t xFailedSubscribeToTopic = pdFALSE; uint32_t ulTopicCount = 0U; BaseType_t xStatus = pdFAIL; /* Some fields not used by this demo so start with everything at 0. */ ( void ) memset( ( void * ) &xMQTTSubscription, 0x00, sizeof( xMQTTSubscription ) ); /* Get a unique packet id. */ usSubscribePacketIdentifier = MQTT_GetPacketId( pxMQTTContext ); /* Subscribe to the mqttexampleTOPIC topic filter. This example subscribes to * only one topic and uses QoS1. */ xMQTTSubscription[ 0 ].qos = MQTTQoS1; xMQTTSubscription[ 0 ].pTopicFilter = mqttexampleTOPIC; xMQTTSubscription[ 0 ].topicFilterLength = ( uint16_t ) strlen( mqttexampleTOPIC ); /* Initialize retry attempts and interval. */ BackoffAlgorithm_InitializeParams( &xRetryParams, RETRY_BACKOFF_BASE_MS, RETRY_MAX_BACKOFF_DELAY_MS, RETRY_MAX_ATTEMPTS ); do { /* The client is now connected to the broker. Subscribe to the topic * as specified in mqttexampleTOPIC at the top of this file by sending a * subscribe packet then waiting for a subscribe acknowledgment (SUBACK). * This client will then publish to the same topic it subscribed to, so it * will expect all the messages it sends to the broker to be sent back to it * from the broker. This demo uses QOS0 in Subscribe, therefore, the Publish * messages received from the broker will have QOS0. */ LogInfo( ( "Attempt to subscribe to the MQTT topic %s.", mqttexampleTOPIC ) ); xResult = MQTT_Subscribe( pxMQTTContext, xMQTTSubscription, sizeof( xMQTTSubscription ) / sizeof( MQTTSubscribeInfo_t ), usSubscribePacketIdentifier ); if( xResult != MQTTSuccess ) { LogError( ( "Failed to SUBSCRIBE to MQTT topic %s. Error=%s", mqttexampleTOPIC, MQTT_Status_strerror( xResult ) ) ); } else { xStatus = pdPASS; LogInfo( ( "SUBSCRIBE sent for topic %s to broker.", mqttexampleTOPIC ) ); /* Process incoming packet from the broker. After sending the subscribe, the * client may receive a publish before it receives a subscribe ack. Therefore, * call generic incoming packet processing function. Since this demo is * subscribing to the topic to which no one is publishing, probability of * receiving Publish message before subscribe ack is zero; but application * must be ready to receive any packet. This demo uses the generic packet * processing function everywhere to highlight this fact. */ xResult = prvWaitForPacket( pxMQTTContext, MQTT_PACKET_TYPE_SUBACK ); if( xResult != MQTTSuccess ) { xStatus = pdFAIL; } } if( xStatus == pdPASS ) { /* Reset flag before checking suback responses. */ xFailedSubscribeToTopic = pdFALSE; /* Check if recent subscription request has been rejected. #xTopicFilterContext is updated * in the event callback to reflect the status of the SUBACK sent by the broker. It represents * either the QoS level granted by the server upon subscription, or acknowledgement of * server rejection of the subscription request. */ for( ulTopicCount = 0; ulTopicCount < mqttexampleTOPIC_COUNT; ulTopicCount++ ) { if( xTopicFilterContext[ ulTopicCount ].xSubAckStatus == MQTTSubAckFailure ) { xFailedSubscribeToTopic = pdTRUE; /* As the subscribe attempt failed, we will retry the connection after an * exponential backoff with jitter delay. */ /* Retry subscribe after exponential back-off. */ LogWarn( ( "Server rejected subscription request. Attempting to re-subscribe to topic %s.", xTopicFilterContext[ ulTopicCount ].pcTopicFilter ) ); xBackoffStatus = prvBackoffForRetry( &xRetryParams ); break; } } } } while( ( xFailedSubscribeToTopic == pdTRUE ) && ( xBackoffStatus == pdPASS ) ); return xStatus; } /*-----------------------------------------------------------*/ static BaseType_t prvMQTTPublishToTopic( MQTTContext_t * pxMQTTContext ) { MQTTStatus_t xResult; MQTTPublishInfo_t xMQTTPublishInfo; BaseType_t xStatus = pdPASS; /* Some fields are not used by this demo so start with everything at 0. */ ( void ) memset( ( void * ) &xMQTTPublishInfo, 0x00, sizeof( xMQTTPublishInfo ) ); /* This demo uses QoS1. */ xMQTTPublishInfo.qos = MQTTQoS1; xMQTTPublishInfo.retain = false; xMQTTPublishInfo.pTopicName = mqttexampleTOPIC; xMQTTPublishInfo.topicNameLength = ( uint16_t ) strlen( mqttexampleTOPIC ); xMQTTPublishInfo.pPayload = mqttexampleMESSAGE; xMQTTPublishInfo.payloadLength = strlen( mqttexampleMESSAGE ); /* Get a unique packet id. */ usPublishPacketIdentifier = MQTT_GetPacketId( pxMQTTContext ); /* Send PUBLISH packet. Packet ID is not used for a QoS1 publish. */ xResult = MQTT_Publish( pxMQTTContext, &xMQTTPublishInfo, usPublishPacketIdentifier ); if( xResult != MQTTSuccess ) { xStatus = pdFAIL; LogError( ( "Failed to send PUBLISH message to broker: Topic=%s, Error=%s", mqttexampleTOPIC, MQTT_Status_strerror( xResult ) ) ); } return xStatus; } /*-----------------------------------------------------------*/ static BaseType_t prvMQTTUnsubscribeFromTopic( MQTTContext_t * pxMQTTContext ) { MQTTStatus_t xResult; MQTTSubscribeInfo_t xMQTTSubscription[ mqttexampleTOPIC_COUNT ]; BaseType_t xStatus = pdPASS; /* Some fields not used by this demo so start with everything at 0. */ ( void ) memset( ( void * ) &xMQTTSubscription, 0x00, sizeof( xMQTTSubscription ) ); /* Get a unique packet id. */ usSubscribePacketIdentifier = MQTT_GetPacketId( pxMQTTContext ); /* Subscribe to the mqttexampleTOPIC topic filter. This example subscribes to * only one topic and uses QoS1. */ xMQTTSubscription[ 0 ].qos = MQTTQoS1; xMQTTSubscription[ 0 ].pTopicFilter = mqttexampleTOPIC; xMQTTSubscription[ 0 ].topicFilterLength = ( uint16_t ) strlen( mqttexampleTOPIC ); /* Get next unique packet identifier. */ usUnsubscribePacketIdentifier = MQTT_GetPacketId( pxMQTTContext ); /* Send UNSUBSCRIBE packet. */ xResult = MQTT_Unsubscribe( pxMQTTContext, xMQTTSubscription, sizeof( xMQTTSubscription ) / sizeof( MQTTSubscribeInfo_t ), usUnsubscribePacketIdentifier ); if( xResult != MQTTSuccess ) { xStatus = pdFAIL; LogError( ( "Failed to send UNSUBSCRIBE request to broker: TopicFilter=%s, Error=%s", mqttexampleTOPIC, MQTT_Status_strerror( xResult ) ) ); } return xStatus; } /*-----------------------------------------------------------*/ static void prvMQTTProcessResponse( MQTTPacketInfo_t * pxIncomingPacket, uint16_t usPacketId ) { uint32_t ulTopicCount = 0U; switch( pxIncomingPacket->type ) { case MQTT_PACKET_TYPE_PUBACK: LogInfo( ( "PUBACK received for packet Id %u.", usPacketId ) ); /* Make sure ACK packet identifier matches with Request packet identifier. */ configASSERT( usPublishPacketIdentifier == usPacketId ); break; case MQTT_PACKET_TYPE_SUBACK: /* Update the packet type received to SUBACK. */ usPacketTypeReceived = MQTT_PACKET_TYPE_SUBACK; /* A SUBACK from the broker, containing the server response to our subscription request, has been received. * It contains the status code indicating server approval/rejection for the subscription to the single topic * requested. The SUBACK will be parsed to obtain the status code, and this status code will be stored in global * variable #xTopicFilterContext. */ prvUpdateSubAckStatus( pxIncomingPacket ); for( ulTopicCount = 0; ulTopicCount < mqttexampleTOPIC_COUNT; ulTopicCount++ ) { if( xTopicFilterContext[ ulTopicCount ].xSubAckStatus != MQTTSubAckFailure ) { LogInfo( ( "Subscribed to the topic %s with maximum QoS %u.", xTopicFilterContext[ ulTopicCount ].pcTopicFilter, xTopicFilterContext[ ulTopicCount ].xSubAckStatus ) ); } } /* Make sure ACK packet identifier matches with Request packet identifier. */ configASSERT( usSubscribePacketIdentifier == usPacketId ); break; case MQTT_PACKET_TYPE_UNSUBACK: LogInfo( ( "Unsubscribed from the topic %s.", mqttexampleTOPIC ) ); /* Update the packet type received to UNSUBACK. */ usPacketTypeReceived = MQTT_PACKET_TYPE_UNSUBACK; /* Make sure ACK packet identifier matches with Request packet identifier. */ configASSERT( usUnsubscribePacketIdentifier == usPacketId ); break; case MQTT_PACKET_TYPE_PINGRESP: LogInfo( ( "Ping Response successfully received." ) ); break; /* Any other packet type is invalid. */ default: LogWarn( ( "prvMQTTProcessResponse() called with unknown packet type:(%02X).", pxIncomingPacket->type ) ); } } /*-----------------------------------------------------------*/ static void prvMQTTProcessIncomingPublish( MQTTPublishInfo_t * pxPublishInfo ) { configASSERT( pxPublishInfo != NULL ); /* Set the global for indicating that an incoming publish is received. */ usPacketTypeReceived = MQTT_PACKET_TYPE_PUBLISH; /* Process incoming Publish. */ LogInfo( ( "Incoming QoS : %d\n", pxPublishInfo->qos ) ); /* Verify the received publish is for the we have subscribed to. */ if( ( pxPublishInfo->topicNameLength == strlen( mqttexampleTOPIC ) ) && ( 0 == strncmp( mqttexampleTOPIC, pxPublishInfo->pTopicName, pxPublishInfo->topicNameLength ) ) ) { LogInfo( ( "Incoming Publish Topic Name: %.*s matches subscribed topic." "Incoming Publish Message : %.*s", pxPublishInfo->topicNameLength, pxPublishInfo->pTopicName, pxPublishInfo->payloadLength, pxPublishInfo->pPayload ) ); } else { LogInfo( ( "Incoming Publish Topic Name: %.*s does not match subscribed topic.", pxPublishInfo->topicNameLength, pxPublishInfo->pTopicName ) ); } } /*-----------------------------------------------------------*/ static void prvEventCallback( MQTTContext_t * pxMQTTContext, MQTTPacketInfo_t * pxPacketInfo, MQTTDeserializedInfo_t * pxDeserializedInfo ) { /* The MQTT context is not used for this demo. */ ( void ) pxMQTTContext; if( ( pxPacketInfo->type & 0xF0U ) == MQTT_PACKET_TYPE_PUBLISH ) { prvMQTTProcessIncomingPublish( pxDeserializedInfo->pPublishInfo ); } else { prvMQTTProcessResponse( pxPacketInfo, pxDeserializedInfo->packetIdentifier ); } } /*-----------------------------------------------------------*/ static uint32_t prvGetTimeMs( void ) { TickType_t xTickCount = 0; uint32_t ulTimeMs = 0UL; /* Get the current tick count. */ xTickCount = xTaskGetTickCount(); /* Convert the ticks to milliseconds. */ ulTimeMs = ( uint32_t ) xTickCount * MILLISECONDS_PER_TICK; /* Reduce ulGlobalEntryTimeMs from obtained time so as to always return the * elapsed time in the application. */ ulTimeMs = ( uint32_t ) ( ulTimeMs - ulGlobalEntryTimeMs ); return ulTimeMs; } /*-----------------------------------------------------------*/ static MQTTStatus_t prvWaitForPacket( MQTTContext_t * pxMQTTContext, uint16_t usPacketType ) { uint8_t ucCount = 0U; MQTTStatus_t xMQTTStatus = MQTTSuccess; /* Reset the packet type received. */ usPacketTypeReceived = 0U; while( ( usPacketTypeReceived != usPacketType ) && ( ucCount++ < MQTT_PROCESS_LOOP_PACKET_WAIT_COUNT_MAX ) && ( xMQTTStatus == MQTTSuccess ) ) { /* Event callback will set #usPacketTypeReceived when receiving appropriate packet. This * will wait for at most mqttexamplePROCESS_LOOP_TIMEOUT_MS. */ xMQTTStatus = MQTT_ProcessLoop( pxMQTTContext, mqttexamplePROCESS_LOOP_TIMEOUT_MS ); LogInfo(("Packet type received %d, expected %d, status %d", usPacketTypeReceived, usPacketType, xMQTTStatus )); } if( ( xMQTTStatus != MQTTSuccess ) || ( usPacketTypeReceived != usPacketType ) ) { LogError( ( "MQTT_ProcessLoop failed to receive packet: Packet type=%02X, LoopDuration=%u, Status=%s", usPacketType, ( mqttexamplePROCESS_LOOP_TIMEOUT_MS * ucCount ), MQTT_Status_strerror( xMQTTStatus ) ) ); } return xMQTTStatus; } /*-----------------------------------------------------------*/