/* * FreeRTOS V202011.00 * Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Permission is hereby granted, free of charge, to any person obtaining a copy of * this software and associated documentation files (the "Software"), to deal in * the Software without restriction, including without limitation the rights to * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of * the Software, and to permit persons to whom the Software is furnished to do so, * subject to the following conditions: * * The above copyright notice and this permission notice shall be included in all * copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * * https://www.FreeRTOS.org * https://github.com/FreeRTOS * */ /* * Demo for showing use of the MQTT API using a mutually authenticated * network connection. * * The Example shown below uses MQTT APIs to create MQTT messages and send them * over the mutually authenticated network connection established with the * MQTT broker. This example is single threaded and uses statically allocated * memory. It uses QoS1 for sending to and receiving messages from the broker. * * A mutually authenticated TLS connection is used to connect to the * MQTT message broker in this example. Define democonfigMQTT_BROKER_ENDPOINT, * democonfigROOT_CA_PEM, democonfigCLIENT_CERTIFICATE_PEM, * and democonfigCLIENT_PRIVATE_KEY_PEM in demo_config.h to establish a * mutually authenticated connection. */ /* Standard includes. */ #include #include /* Kernel includes. */ #include "FreeRTOS.h" #include "task.h" /* Demo Specific configs. */ #include "demo_config.h" /* MQTT library includes. */ #include "core_mqtt.h" /* Exponential backoff retry include. */ #include "backoff_algorithm.h" /* Transport interface implementation include header for TLS. */ #include "using_mbedtls.h" /* Use 1NCE service to onboard device. */ #ifdef USE_1NCE_ZERO_TOUCH_PROVISIONING #include "1nce_zero_touch_provisioning.h" #endif /*-----------------------------------------------------------*/ /* Compile time error for undefined configs. */ #ifndef democonfigMQTT_BROKER_ENDPOINT #error "Define the config democonfigMQTT_BROKER_ENDPOINT by following the instructions in file demo_config.h." #endif #ifndef democonfigROOT_CA_PEM #error "Please define Root CA certificate of the MQTT broker(democonfigROOT_CA_PEM) in demo_config.h." #endif /* If no username is defined, then a client certificate/key is required. */ #ifndef democonfigCLIENT_USERNAME /* *!!! Please note democonfigCLIENT_PRIVATE_KEY_PEM in used for *!!! convenience of demonstration only. Production devices should *!!! store keys securely, such as within a secure element. */ #ifndef democonfigCLIENT_CERTIFICATE_PEM #error "Please define client certificate(democonfigCLIENT_CERTIFICATE_PEM) in demo_config.h." #endif #ifndef democonfigCLIENT_PRIVATE_KEY_PEM #error "Please define client private key(democonfigCLIENT_PRIVATE_KEY_PEM) in demo_config.h." #endif #else /* If a username is defined, a client password also would need to be defined for * client authentication. */ #ifndef democonfigCLIENT_PASSWORD #error "Please define client password(democonfigCLIENT_PASSWORD) in demo_config.h for client authentication based on username/password." #endif /* AWS IoT MQTT broker port needs to be 443 for client authentication based on * username/password. */ #if defined( democonfigUSE_AWS_IOT_CORE_BROKER ) && democonfigMQTT_BROKER_PORT != 443 #error "Broker port(democonfigMQTT_BROKER_PORT) should be defined as 443 in demo_config.h for client authentication based on username/password in AWS IoT Core." #endif #endif /* ifndef democonfigCLIENT_USERNAME */ /*-----------------------------------------------------------*/ /* Default values for configs. */ #ifndef democonfigCLIENT_IDENTIFIER /** * @brief The MQTT client identifier used in this example. Each client identifier * must be unique so edit as required to ensure no two clients connecting to the * same broker use the same client identifier. * * @note Appending __TIME__ to the client id string will help to create a unique * client id every time an application binary is built. Only a single instance of * this application's compiled binary may be used at a time, since the client ID * will always be the same. */ #define democonfigCLIENT_IDENTIFIER "testClient"__TIME__ #endif #ifndef democonfigMQTT_BROKER_PORT /** * @brief The port to use for the demo. */ #define democonfigMQTT_BROKER_PORT ( 8883 ) #endif /*-----------------------------------------------------------*/ /** * @brief The maximum number of retries for network operation with server. */ #define mqttexampleRETRY_MAX_ATTEMPTS ( 5U ) /** * @brief The maximum back-off delay (in milliseconds) for retrying failed operation * with server. */ #define mqttexampleRETRY_MAX_BACKOFF_DELAY_MS ( 5000U ) /** * @brief The base back-off delay (in milliseconds) to use for network operation retry * attempts. */ #define mqttexampleRETRY_BACKOFF_BASE_MS ( 500U ) /** * @brief Timeout for receiving CONNACK packet in milliseconds. */ #define mqttexampleCONNACK_RECV_TIMEOUT_MS ( 1000U ) /** * @brief The topic to subscribe and publish to in the example. * * The topic name starts with the client identifier to ensure that each demo * interacts with a unique topic name. */ #define mqttexampleTOPIC democonfigCLIENT_IDENTIFIER "/example/topic" /** * @brief The number of topic filters to subscribe. */ #define mqttexampleTOPIC_COUNT ( 1 ) /** * @brief The MQTT message published in this example. */ #define mqttexampleMESSAGE "Hello World!" /** * @brief Time in ticks to wait between each cycle of the demo implemented * by RunMQTTTask(). */ #define mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS_TICKS ( pdMS_TO_TICKS( 5000U ) ) /** * @brief Timeout for MQTT_ProcessLoop in milliseconds. */ #define mqttexamplePROCESS_LOOP_TIMEOUT_MS ( 5000U ) /** * @brief Keep alive time reported to the broker while establishing * an MQTT connection. * * It is the responsibility of the Client to ensure that the interval between * Control Packets being sent does not exceed the this Keep Alive value. In the * absence of sending any other Control Packets, the Client MUST send a * PINGREQ Packet. */ #define mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS ( 60U ) /** * @brief Delay (in ticks) between consecutive cycles of MQTT publish operations in a * demo iteration. * * Note that the process loop also has a timeout, so the total time between * publishes is the sum of the two delays. */ #define mqttexampleDELAY_BETWEEN_PUBLISHES_TICKS ( pdMS_TO_TICKS( 2000U ) ) /** * @brief Transport timeout in milliseconds for transport send and receive. */ #define mqttexampleTRANSPORT_SEND_RECV_TIMEOUT_MS ( 5000U ) /** * @brief ALPN (Application-Layer Protocol Negotiation) protocol name for AWS IoT MQTT. * * This will be used if democonfigMQTT_BROKER_PORT is configured as 443 for the AWS IoT MQTT broker. * Please see more details about the ALPN protocol for AWS IoT MQTT endpoint * in the link below. * https://aws.amazon.com/blogs/iot/mqtt-with-tls-client-authentication-on-port-443-why-it-is-useful-and-how-it-works/ */ #define AWS_IOT_MQTT_ALPN "\x0ex-amzn-mqtt-ca" /** * @brief This is the ALPN (Application-Layer Protocol Negotiation) string * required by AWS IoT for password-based authentication using TCP port 443. */ #define AWS_IOT_CUSTOM_AUTH_ALPN "\x04mqtt" /** * Provide default values for undefined configuration settings. */ #ifndef democonfigOS_NAME #define democonfigOS_NAME "FreeRTOS" #endif #ifndef democonfigOS_VERSION #define democonfigOS_VERSION tskKERNEL_VERSION_NUMBER #endif #ifndef democonfigHARDWARE_PLATFORM_NAME #define democonfigHARDWARE_PLATFORM_NAME "WinSim" #endif #ifndef democonfigMQTT_LIB #define democonfigMQTT_LIB "core-mqtt@1.0.0" #endif /** * @brief The MQTT metrics string expected by AWS IoT. */ #define AWS_IOT_METRICS_STRING \ "?SDK=" democonfigOS_NAME "&Version=" democonfigOS_VERSION \ "&Platform=" democonfigHARDWARE_PLATFORM_NAME "&MQTTLib=" democonfigMQTT_LIB /** * @brief The length of the MQTT metrics string expected by AWS IoT. */ #define AWS_IOT_METRICS_STRING_LENGTH ( ( uint16_t ) ( sizeof( AWS_IOT_METRICS_STRING ) - 1 ) ) #ifdef democonfigCLIENT_USERNAME /** * @brief Append the username with the metrics string if #democonfigCLIENT_USERNAME is defined. * * This is to support both metrics reporting and username/password based client * authentication by AWS IoT. */ #define CLIENT_USERNAME_WITH_METRICS democonfigCLIENT_USERNAME AWS_IOT_METRICS_STRING #endif /** * @brief Milliseconds per second. */ #define MILLISECONDS_PER_SECOND ( 1000U ) /** * @brief Milliseconds per FreeRTOS tick. */ #define MILLISECONDS_PER_TICK ( MILLISECONDS_PER_SECOND / configTICK_RATE_HZ ) /*-----------------------------------------------------------*/ /** * @brief The task used to demonstrate the MQTT API. * * @param[in] pvParameters Parameters as passed at the time of task creation. Not * used in this example. */ void RunMQTTTask( void * pvParameters ); /** * @brief Connect to MQTT broker with reconnection retries. * * If connection fails, retry is attempted after a timeout. * Timeout value will exponentially increase until maximum * timeout value is reached or the number of attempts are exhausted. * * @param[out] pxNetworkContext The parameter to return the created network context. * * @return The status of the final connection attempt. */ static TlsTransportStatus_t prvConnectToServerWithBackoffRetries( NetworkCredentials_t * pxNetworkCredentials, NetworkContext_t * pNetworkContext ); /** * @brief Sends an MQTT Connect packet over the already connected TLS over TCP connection. * * @param[in, out] pxMQTTContext MQTT context pointer. * @param[in] xNetworkContext Network context. */ static void prvCreateMQTTConnectionWithBroker( MQTTContext_t * pxMQTTContext, NetworkContext_t * pxNetworkContext ); /** * @brief Function to update variable #xTopicFilterContext with status * information from Subscribe ACK. Called by the event callback after processing * an incoming SUBACK packet. * * @param[in] Server response to the subscription request. */ static void prvUpdateSubAckStatus( MQTTPacketInfo_t * pxPacketInfo ); /** * @brief Subscribes to the topic as specified in mqttexampleTOPIC at the top of * this file. In the case of a Subscribe ACK failure, then subscription is * retried using an exponential backoff strategy with jitter. * * @param[in] pxMQTTContext MQTT context pointer. */ static void prvMQTTSubscribeWithBackoffRetries( MQTTContext_t * pxMQTTContext ); /** * @brief Publishes a message mqttexampleMESSAGE on mqttexampleTOPIC topic. * * @param[in] pxMQTTContext MQTT context pointer. */ static void prvMQTTPublishToTopic( MQTTContext_t * pxMQTTContext ); /** * @brief Unsubscribes from the previously subscribed topic as specified * in mqttexampleTOPIC. * * @param[in] pxMQTTContext MQTT context pointer. */ static void prvMQTTUnsubscribeFromTopic( MQTTContext_t * pxMQTTContext ); /** * @brief The timer query function provided to the MQTT context. * * @return Time in milliseconds. */ static uint32_t prvGetTimeMs( void ); /** * @brief Process a response or ack to an MQTT request (PING, PUBLISH, * SUBSCRIBE or UNSUBSCRIBE). This function processes PINGRESP, PUBACK, * SUBACK, and UNSUBACK. * * @param[in] pxIncomingPacket is a pointer to structure containing deserialized * MQTT response. * @param[in] usPacketId is the packet identifier from the ack received. */ static void prvMQTTProcessResponse( MQTTPacketInfo_t * pxIncomingPacket, uint16_t usPacketId ); /** * @brief Process incoming Publish message. * * @param[in] pxPublishInfo is a pointer to structure containing deserialized * Publish message. */ static void prvMQTTProcessIncomingPublish( MQTTPublishInfo_t * pxPublishInfo ); /** * @brief The application callback function for getting the incoming publishes, * incoming acks, and ping responses reported from the MQTT library. * * @param[in] pxMQTTContext MQTT context pointer. * @param[in] pxPacketInfo Packet Info pointer for the incoming packet. * @param[in] pxDeserializedInfo Deserialized information from the incoming packet. */ static void prvEventCallback( MQTTContext_t * pxMQTTContext, MQTTPacketInfo_t * pxPacketInfo, MQTTDeserializedInfo_t * pxDeserializedInfo ); extern UBaseType_t uxRand( void ); /*-----------------------------------------------------------*/ /** * @brief Static buffer used to hold MQTT messages being sent and received. */ static uint8_t ucSharedBuffer[ democonfigNETWORK_BUFFER_SIZE ]; /** * @brief Global entry time into the application to use as a reference timestamp * in the #prvGetTimeMs function. #prvGetTimeMs will always return the difference * between the current time and the global entry time. This will reduce the chances * of overflow for the 32 bit unsigned integer used for holding the timestamp. */ static uint32_t ulGlobalEntryTimeMs; /** * @brief Packet Identifier generated when Publish request was sent to the broker; * it is used to match received Publish ACK to the transmitted Publish packet. */ static uint16_t usPublishPacketIdentifier; /** * @brief Packet Identifier generated when Subscribe request was sent to the broker; * it is used to match received Subscribe ACK to the transmitted Subscribe packet. */ static uint16_t usSubscribePacketIdentifier; /** * @brief Packet Identifier generated when Unsubscribe request was sent to the broker; * it is used to match received Unsubscribe response to the transmitted Unsubscribe * request. */ static uint16_t usUnsubscribePacketIdentifier; /* A bridge to feed hard coded info or dynamic acquired info for MQTT connection. */ static char * pThingName = NULL; static char * pEndpoint = NULL; static char * pExampleTopic = NULL; static char * pRootCA = NULL; static char * pClientCert = NULL; static char * pPrvKey = NULL; /** * @brief A pair containing a topic filter and its SUBACK status. */ typedef struct topicFilterContext { const char * pcTopicFilter; MQTTSubAckStatus_t xSubAckStatus; } topicFilterContext_t; /** * @brief An array containing the context of a SUBACK; the SUBACK status * of a filter is updated when the event callback processes a SUBACK. */ static topicFilterContext_t xTopicFilterContext[ mqttexampleTOPIC_COUNT ] = { { mqttexampleTOPIC, MQTTSubAckFailure } }; /** @brief Static buffer used to hold MQTT messages being sent and received. */ static MQTTFixedBuffer_t xBuffer = { ucSharedBuffer, democonfigNETWORK_BUFFER_SIZE }; /*-----------------------------------------------------------*/ /* * @brief The Example shown below uses MQTT APIs to create MQTT messages and * send them over the mutually authenticated network connection established with the * MQTT broker. This example is single threaded and uses statically allocated * memory. It uses QoS1 for sending to and receiving messages from the broker. * * This MQTT client subscribes to the topic as specified in mqttexampleTOPIC at the * top of this file by sending a subscribe packet and then waiting for a subscribe * acknowledgment (SUBACK).This client will then publish to the same topic it * subscribed to, so it will expect all the messages it sends to the broker to be * sent back to it from the broker. */ void RunMQTTTask( void * pvParameters ) { uint32_t ulPublishCount = 0U, ulTopicCount = 0U; const uint32_t ulMaxPublishCount = 5UL; NetworkContext_t xNetworkContext = { 0 }; NetworkCredentials_t xNetworkCredentials = { 0 }; MQTTContext_t xMQTTContext = { 0 }; MQTTStatus_t xMQTTStatus; TlsTransportStatus_t xNetworkStatus; /* Remove compiler warnings about unused parameters. */ ( void ) pvParameters; /* Set the entry time of the demo application. This entry time will be used * to calculate relative time elapsed in the execution of the demo application, * by the timer utility function that is provided to the MQTT library. */ ulGlobalEntryTimeMs = prvGetTimeMs(); #ifdef USE_1NCE_ZERO_TOUCH_PROVISIONING uint8_t status = nce_onboard( &pThingName, &pEndpoint, &pExampleTopic, &pRootCA, &pClientCert, &pPrvKey ); configASSERT( status == EXIT_SUCCESS ); #else pThingName = democonfigCLIENT_IDENTIFIER; pEndpoint = democonfigMQTT_BROKER_ENDPOINT; pExampleTopic = mqttexampleTOPIC; #endif /* ifdef USE_1NCE_ZERO_TOUCH_PROVISIONING */ for( ulTopicCount = 0; ulTopicCount < mqttexampleTOPIC_COUNT; ulTopicCount++ ) { xTopicFilterContext[ ulTopicCount ].pcTopicFilter = pExampleTopic; } for( ; ; ) { /****************************** Connect. ******************************/ /* Attempt to establish TLS session with MQTT broker. If connection fails, * retry after a timeout. Timeout value will be exponentially increased * until the maximum number of attempts are reached or the maximum timeout * value is reached. The function returns a failure status if the TCP * connection cannot be established to the broker after the configured * number of attempts. */ xNetworkStatus = prvConnectToServerWithBackoffRetries( &xNetworkCredentials, &xNetworkContext ); configASSERT( xNetworkStatus == TLS_TRANSPORT_SUCCESS ); /* Sends an MQTT Connect packet over the already established TLS connection, * and waits for connection acknowledgment (CONNACK) packet. */ LogInfo( ( "Creating an MQTT connection to %s.\r\n", pEndpoint ) ); prvCreateMQTTConnectionWithBroker( &xMQTTContext, &xNetworkContext ); /**************************** Subscribe. ******************************/ /* If server rejected the subscription request, attempt to resubscribe to * topic. Attempts are made according to the exponential backoff retry * strategy implemented in BackoffAlgorithm. */ prvMQTTSubscribeWithBackoffRetries( &xMQTTContext ); /****************** Publish and Keep Alive Loop. **********************/ /* Publish messages with QoS1, send and process Keep alive messages. */ for( ulPublishCount = 0; ulPublishCount < ulMaxPublishCount; ulPublishCount++ ) { LogInfo( ( "Publish to the MQTT topic %s.\r\n", pExampleTopic ) ); prvMQTTPublishToTopic( &xMQTTContext ); /* Process incoming publish echo, since application subscribed to the * same topic, the broker will send publish message back to the * application. */ LogInfo( ( "Attempt to receive publish message from broker.\r\n" ) ); xMQTTStatus = MQTT_ProcessLoop( &xMQTTContext, mqttexamplePROCESS_LOOP_TIMEOUT_MS ); configASSERT( xMQTTStatus == MQTTSuccess ); /* Leave Connection Idle for some time. */ LogInfo( ( "Keeping Connection Idle...\r\n\r\n" ) ); vTaskDelay( mqttexampleDELAY_BETWEEN_PUBLISHES_TICKS ); } /******************** Unsubscribe from the topic. *********************/ LogInfo( ( "Unsubscribe from the MQTT topic %s.\r\n", pExampleTopic ) ); prvMQTTUnsubscribeFromTopic( &xMQTTContext ); /* Process incoming UNSUBACK packet from the broker. */ xMQTTStatus = MQTT_ProcessLoop( &xMQTTContext, mqttexamplePROCESS_LOOP_TIMEOUT_MS ); configASSERT( xMQTTStatus == MQTTSuccess ); /**************************** Disconnect. *****************************/ /* Send an MQTT Disconnect packet over the already connected TLS over * TCP connection. There is no corresponding response for the disconnect * packet. After sending disconnect, client must close the network * connection. */ LogInfo( ( "Disconnecting the MQTT connection with %s.\r\n", pEndpoint ) ); xMQTTStatus = MQTT_Disconnect( &xMQTTContext ); configASSERT( xMQTTStatus == MQTTSuccess ); /* Close the network connection. */ TLS_FreeRTOS_Disconnect( &xNetworkContext ); /* Reset SUBACK status for each topic filter after completion of * subscription request cycle. */ for( ulTopicCount = 0; ulTopicCount < mqttexampleTOPIC_COUNT; ulTopicCount++ ) { xTopicFilterContext[ ulTopicCount ].xSubAckStatus = MQTTSubAckFailure; } /* Wait for some time between two iterations to ensure that we do not * bombard the broker. */ LogInfo( ( "RunMQTTTask() completed an iteration successfully. " "Total free heap is %u.\r\n", xPortGetFreeHeapSize() ) ); LogInfo( ( "Demo completed successfully.\r\n" ) ); LogInfo( ( "Short delay before starting the next iteration.... \r\n\r\n" ) ); vTaskDelay( mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS_TICKS ); } } /*-----------------------------------------------------------*/ static TlsTransportStatus_t prvConnectToServerWithBackoffRetries( NetworkCredentials_t * pxNetworkCredentials, NetworkContext_t * pxNetworkContext ) { TlsTransportStatus_t xNetworkStatus; BackoffAlgorithmStatus_t xBackoffAlgStatus = BackoffAlgorithmSuccess; BackoffAlgorithmContext_t xReconnectParams; uint16_t usNextRetryBackOff = 0U; #ifdef democonfigUSE_AWS_IOT_CORE_BROKER /* ALPN protocols must be a NULL-terminated list of strings. Therefore, * the first entry will contain the actual ALPN protocol string while the * second entry must remain NULL. */ char * pcAlpnProtocols[] = { NULL, NULL }; /* The ALPN string changes depending on whether username/password authentication is used. */ #ifdef democonfigCLIENT_USERNAME pcAlpnProtocols[ 0 ] = AWS_IOT_CUSTOM_AUTH_ALPN; #else pcAlpnProtocols[ 0 ] = AWS_IOT_MQTT_ALPN; #endif pxNetworkCredentials->pAlpnProtos = pcAlpnProtocols; #endif /* ifdef democonfigUSE_AWS_IOT_CORE_BROKER */ pxNetworkCredentials->disableSni = democonfigDISABLE_SNI; /* Set the credentials for establishing a TLS connection. */ #ifdef USE_1NCE_ZERO_TOUCH_PROVISIONING pxNetworkCredentials->pRootCa = ( uint8_t * ) pRootCA; pxNetworkCredentials->rootCaSize = strlen( pRootCA ) + 1; #ifdef democonfigCLIENT_CERTIFICATE_PEM pxNetworkCredentials->pClientCert = ( uint8_t * ) pClientCert; pxNetworkCredentials->clientCertSize = strlen( pClientCert ) + 1; pxNetworkCredentials->pPrivateKey = ( uint8_t * ) pPrvKey; pxNetworkCredentials->privateKeySize = strlen( pPrvKey ) + 1; #endif /* #ifdef democonfigCLIENT_CERTIFICATE_PEM */ #else /* #ifdef USE_1NCE_ZERO_TOUCH_PROVISIONING */ pxNetworkCredentials->pRootCa = ( const unsigned char * ) democonfigROOT_CA_PEM; pxNetworkCredentials->rootCaSize = sizeof( democonfigROOT_CA_PEM ); #ifdef democonfigCLIENT_CERTIFICATE_PEM pxNetworkCredentials->pClientCert = ( const unsigned char * ) democonfigCLIENT_CERTIFICATE_PEM; pxNetworkCredentials->clientCertSize = sizeof( democonfigCLIENT_CERTIFICATE_PEM ); pxNetworkCredentials->pPrivateKey = ( const unsigned char * ) democonfigCLIENT_PRIVATE_KEY_PEM; pxNetworkCredentials->privateKeySize = sizeof( democonfigCLIENT_PRIVATE_KEY_PEM ); #endif /* #ifdef democonfigCLIENT_CERTIFICATE_PEM */ #endif /* #ifdef USE_1NCE_ZERO_TOUCH_PROVISIONING */ /* Initialize reconnect attempts and interval. */ BackoffAlgorithm_InitializeParams( &xReconnectParams, mqttexampleRETRY_BACKOFF_BASE_MS, mqttexampleRETRY_MAX_BACKOFF_DELAY_MS, mqttexampleRETRY_MAX_ATTEMPTS ); /* Attempt to connect to MQTT broker. If connection fails, retry after * a timeout. Timeout value will exponentially increase till maximum * attempts are reached. */ do { /* Establish a TLS session with the MQTT broker. This example connects to * the MQTT broker as specified in democonfigMQTT_BROKER_ENDPOINT and * democonfigMQTT_BROKER_PORT at the top of this file. */ LogInfo( ( "Creating a TLS connection to %s:%u.\r\n", pEndpoint, democonfigMQTT_BROKER_PORT ) ); /* Attempt to create a mutually authenticated TLS connection. */ xNetworkStatus = TLS_FreeRTOS_Connect( pxNetworkContext, pEndpoint, democonfigMQTT_BROKER_PORT, pxNetworkCredentials, mqttexampleTRANSPORT_SEND_RECV_TIMEOUT_MS, mqttexampleTRANSPORT_SEND_RECV_TIMEOUT_MS ); if( xNetworkStatus != TLS_TRANSPORT_SUCCESS ) { /* Generate a random number and calculate backoff value (in milliseconds) for * the next connection retry. * Note: It is recommended to seed the random number generator with a device-specific * entropy source so that possibility of multiple devices retrying failed network operations * at similar intervals can be avoided. */ xBackoffAlgStatus = BackoffAlgorithm_GetNextBackoff( &xReconnectParams, uxRand(), &usNextRetryBackOff ); if( xBackoffAlgStatus == BackoffAlgorithmRetriesExhausted ) { LogError( ( "Connection to the broker failed, all attempts exhausted." ) ); } else if( xBackoffAlgStatus == BackoffAlgorithmSuccess ) { LogWarn( ( "Connection to the broker failed. " "Retrying connection with backoff and jitter." ) ); vTaskDelay( pdMS_TO_TICKS( usNextRetryBackOff ) ); } } } while( ( xNetworkStatus != TLS_TRANSPORT_SUCCESS ) && ( xBackoffAlgStatus == BackoffAlgorithmSuccess ) ); return xNetworkStatus; } /*-----------------------------------------------------------*/ static void prvCreateMQTTConnectionWithBroker( MQTTContext_t * pxMQTTContext, NetworkContext_t * pxNetworkContext ) { MQTTStatus_t xResult; MQTTConnectInfo_t xConnectInfo; bool xSessionPresent; TransportInterface_t xTransport; /*** * For readability, error handling in this function is restricted to the use of * asserts(). ***/ /* Fill in Transport Interface send and receive function pointers. */ xTransport.pNetworkContext = pxNetworkContext; xTransport.send = TLS_FreeRTOS_send; xTransport.recv = TLS_FreeRTOS_recv; /* Initialize MQTT library. */ xResult = MQTT_Init( pxMQTTContext, &xTransport, prvGetTimeMs, prvEventCallback, &xBuffer ); configASSERT( xResult == MQTTSuccess ); /* Some fields are not used in this demo so start with everything at 0. */ ( void ) memset( ( void * ) &xConnectInfo, 0x00, sizeof( xConnectInfo ) ); /* Start with a clean session i.e. direct the MQTT broker to discard any * previous session data. Also, establishing a connection with clean session * will ensure that the broker does not store any data when this client * gets disconnected. */ xConnectInfo.cleanSession = true; /* The client identifier is used to uniquely identify this MQTT client to * the MQTT broker. In a production device the identifier can be something * unique, such as a device serial number. */ xConnectInfo.pClientIdentifier = pThingName; xConnectInfo.clientIdentifierLength = ( uint16_t ) strlen( pThingName ); /* Set MQTT keep-alive period. If the application does not send packets at an interval less than * the keep-alive period, the MQTT library will send PINGREQ packets. */ xConnectInfo.keepAliveSeconds = mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS; /* Append metrics when connecting to the AWS IoT Core broker. */ #ifdef democonfigUSE_AWS_IOT_CORE_BROKER #ifdef democonfigCLIENT_USERNAME xConnectInfo.pUserName = CLIENT_USERNAME_WITH_METRICS; xConnectInfo.userNameLength = ( uint16_t ) strlen( CLIENT_USERNAME_WITH_METRICS ); xConnectInfo.pPassword = democonfigCLIENT_PASSWORD; xConnectInfo.passwordLength = ( uint16_t ) strlen( democonfigCLIENT_PASSWORD ); #else xConnectInfo.pUserName = AWS_IOT_METRICS_STRING; xConnectInfo.userNameLength = AWS_IOT_METRICS_STRING_LENGTH; /* Password for authentication is not used. */ xConnectInfo.pPassword = NULL; xConnectInfo.passwordLength = 0U; #endif #else /* ifdef democonfigUSE_AWS_IOT_CORE_BROKER */ #ifdef democonfigCLIENT_USERNAME xConnectInfo.pUserName = democonfigCLIENT_USERNAME; xConnectInfo.userNameLength = ( uint16_t ) strlen( democonfigCLIENT_USERNAME ); xConnectInfo.pPassword = democonfigCLIENT_PASSWORD; xConnectInfo.passwordLength = ( uint16_t ) strlen( democonfigCLIENT_PASSWORD ); #endif /* ifdef democonfigCLIENT_USERNAME */ #endif /* ifdef democonfigUSE_AWS_IOT_CORE_BROKER */ /* Send MQTT CONNECT packet to broker. LWT is not used in this demo, so it * is passed as NULL. */ xResult = MQTT_Connect( pxMQTTContext, &xConnectInfo, NULL, mqttexampleCONNACK_RECV_TIMEOUT_MS, &xSessionPresent ); configASSERT( xResult == MQTTSuccess ); /* Successfully established and MQTT connection with the broker. */ LogInfo( ( "An MQTT connection is established with %s.", pEndpoint ) ); } /*-----------------------------------------------------------*/ static void prvUpdateSubAckStatus( MQTTPacketInfo_t * pxPacketInfo ) { MQTTStatus_t xResult = MQTTSuccess; uint8_t * pucPayload = NULL; size_t ulSize = 0; uint32_t ulTopicCount = 0U; xResult = MQTT_GetSubAckStatusCodes( pxPacketInfo, &pucPayload, &ulSize ); /* MQTT_GetSubAckStatusCodes always returns success if called with packet info * from the event callback and non-NULL parameters. */ configASSERT( xResult == MQTTSuccess ); for( ulTopicCount = 0; ulTopicCount < ulSize; ulTopicCount++ ) { xTopicFilterContext[ ulTopicCount ].xSubAckStatus = pucPayload[ ulTopicCount ]; } } /*-----------------------------------------------------------*/ static void prvMQTTSubscribeWithBackoffRetries( MQTTContext_t * pxMQTTContext ) { MQTTStatus_t xResult = MQTTSuccess; BackoffAlgorithmStatus_t xBackoffAlgStatus = BackoffAlgorithmSuccess; BackoffAlgorithmContext_t xRetryParams; uint16_t usNextRetryBackOff = 0U; MQTTSubscribeInfo_t xMQTTSubscription[ mqttexampleTOPIC_COUNT ]; bool xFailedSubscribeToTopic = false; uint32_t ulTopicCount = 0U; /* Some fields not used by this demo so start with everything at 0. */ ( void ) memset( ( void * ) &xMQTTSubscription, 0x00, sizeof( xMQTTSubscription ) ); /* Get a unique packet id. */ usSubscribePacketIdentifier = MQTT_GetPacketId( pxMQTTContext ); /* Subscribe to the mqttexampleTOPIC topic filter. This example subscribes to * only one topic and uses QoS1. */ xMQTTSubscription[ 0 ].qos = MQTTQoS1; xMQTTSubscription[ 0 ].pTopicFilter = pExampleTopic; xMQTTSubscription[ 0 ].topicFilterLength = ( uint16_t ) strlen( pExampleTopic ); /* Initialize context for backoff retry attempts if SUBSCRIBE request fails. */ BackoffAlgorithm_InitializeParams( &xRetryParams, mqttexampleRETRY_BACKOFF_BASE_MS, mqttexampleRETRY_MAX_BACKOFF_DELAY_MS, mqttexampleRETRY_MAX_ATTEMPTS ); do { /* The client is now connected to the broker. Subscribe to the topic * as specified in mqttexampleTOPIC at the top of this file by sending a * subscribe packet then waiting for a subscribe acknowledgment (SUBACK). * This client will then publish to the same topic it subscribed to, so it * will expect all the messages it sends to the broker to be sent back to it * from the broker. This demo uses QOS0 in Subscribe, therefore, the Publish * messages received from the broker will have QOS0. */ LogInfo( ( "Attempt to subscribe to the MQTT topic %s.\r\n", pExampleTopic ) ); xResult = MQTT_Subscribe( pxMQTTContext, xMQTTSubscription, sizeof( xMQTTSubscription ) / sizeof( MQTTSubscribeInfo_t ), usSubscribePacketIdentifier ); configASSERT( xResult == MQTTSuccess ); LogInfo( ( "SUBSCRIBE sent for topic %s to broker.\n\n", pExampleTopic ) ); /* Process incoming packet from the broker. After sending the subscribe, the * client may receive a publish before it receives a subscribe ack. Therefore, * call generic incoming packet processing function. Since this demo is * subscribing to the topic to which no one is publishing, probability of * receiving Publish message before subscribe ack is zero; but application * must be ready to receive any packet. This demo uses the generic packet * processing function everywhere to highlight this fact. */ xResult = MQTT_ProcessLoop( pxMQTTContext, mqttexamplePROCESS_LOOP_TIMEOUT_MS ); configASSERT( xResult == MQTTSuccess ); /* Reset flag before checking suback responses. */ xFailedSubscribeToTopic = false; /* Check if recent subscription request has been rejected. #xTopicFilterContext is updated * in the event callback to reflect the status of the SUBACK sent by the broker. It represents * either the QoS level granted by the server upon subscription, or acknowledgement of * server rejection of the subscription request. */ for( ulTopicCount = 0; ulTopicCount < mqttexampleTOPIC_COUNT; ulTopicCount++ ) { if( xTopicFilterContext[ ulTopicCount ].xSubAckStatus == MQTTSubAckFailure ) { xFailedSubscribeToTopic = true; /* Generate a random number and calculate backoff value (in milliseconds) for * the next connection retry. * Note: It is recommended to seed the random number generator with a device-specific * entropy source so that possibility of multiple devices retrying failed network operations * at similar intervals can be avoided. */ xBackoffAlgStatus = BackoffAlgorithm_GetNextBackoff( &xRetryParams, uxRand(), &usNextRetryBackOff ); if( xBackoffAlgStatus == BackoffAlgorithmRetriesExhausted ) { LogError( ( "Server rejected subscription request. All retry attempts have exhausted. Topic=%s", xTopicFilterContext[ ulTopicCount ].pcTopicFilter ) ); } else if( xBackoffAlgStatus == BackoffAlgorithmSuccess ) { LogWarn( ( "Server rejected subscription request. Attempting to re-subscribe to topic %s.", xTopicFilterContext[ ulTopicCount ].pcTopicFilter ) ); /* Backoff before the next re-subscribe attempt. */ vTaskDelay( pdMS_TO_TICKS( usNextRetryBackOff ) ); } break; } } configASSERT( xBackoffAlgStatus != BackoffAlgorithmRetriesExhausted ); } while( ( xFailedSubscribeToTopic == true ) && ( xBackoffAlgStatus == BackoffAlgorithmSuccess ) ); } /*-----------------------------------------------------------*/ static void prvMQTTPublishToTopic( MQTTContext_t * pxMQTTContext ) { MQTTStatus_t xResult; MQTTPublishInfo_t xMQTTPublishInfo; /*** * For readability, error handling in this function is restricted to the use of * asserts(). ***/ /* Some fields are not used by this demo so start with everything at 0. */ ( void ) memset( ( void * ) &xMQTTPublishInfo, 0x00, sizeof( xMQTTPublishInfo ) ); /* This demo uses QoS1. */ xMQTTPublishInfo.qos = MQTTQoS1; xMQTTPublishInfo.retain = false; xMQTTPublishInfo.pTopicName = pExampleTopic; xMQTTPublishInfo.topicNameLength = ( uint16_t ) strlen( pExampleTopic ); xMQTTPublishInfo.pPayload = mqttexampleMESSAGE; xMQTTPublishInfo.payloadLength = strlen( mqttexampleMESSAGE ); /* Get a unique packet id. */ usPublishPacketIdentifier = MQTT_GetPacketId( pxMQTTContext ); /* Send PUBLISH packet. Packet ID is not used for a QoS1 publish. */ xResult = MQTT_Publish( pxMQTTContext, &xMQTTPublishInfo, usPublishPacketIdentifier ); configASSERT( xResult == MQTTSuccess ); } /*-----------------------------------------------------------*/ static void prvMQTTUnsubscribeFromTopic( MQTTContext_t * pxMQTTContext ) { MQTTStatus_t xResult; MQTTSubscribeInfo_t xMQTTSubscription[ mqttexampleTOPIC_COUNT ]; /* Some fields not used by this demo so start with everything at 0. */ ( void ) memset( ( void * ) &xMQTTSubscription, 0x00, sizeof( xMQTTSubscription ) ); /* Get a unique packet id. */ usSubscribePacketIdentifier = MQTT_GetPacketId( pxMQTTContext ); /* Subscribe to the mqttexampleTOPIC topic filter. This example subscribes to * only one topic and uses QoS1. */ xMQTTSubscription[ 0 ].qos = MQTTQoS1; xMQTTSubscription[ 0 ].pTopicFilter = pExampleTopic; xMQTTSubscription[ 0 ].topicFilterLength = ( uint16_t ) strlen( pExampleTopic ); /* Get next unique packet identifier. */ usUnsubscribePacketIdentifier = MQTT_GetPacketId( pxMQTTContext ); /* Send UNSUBSCRIBE packet. */ xResult = MQTT_Unsubscribe( pxMQTTContext, xMQTTSubscription, sizeof( xMQTTSubscription ) / sizeof( MQTTSubscribeInfo_t ), usUnsubscribePacketIdentifier ); configASSERT( xResult == MQTTSuccess ); } /*-----------------------------------------------------------*/ static void prvMQTTProcessResponse( MQTTPacketInfo_t * pxIncomingPacket, uint16_t usPacketId ) { uint32_t ulTopicCount = 0U; switch( pxIncomingPacket->type ) { case MQTT_PACKET_TYPE_PUBACK: LogInfo( ( "PUBACK received for packet Id %u.\r\n", usPacketId ) ); /* Make sure ACK packet identifier matches with Request packet identifier. */ configASSERT( usPublishPacketIdentifier == usPacketId ); break; case MQTT_PACKET_TYPE_SUBACK: /* A SUBACK from the broker, containing the server response to our subscription request, has been received. * It contains the status code indicating server approval/rejection for the subscription to the single topic * requested. The SUBACK will be parsed to obtain the status code, and this status code will be stored in global * variable #xTopicFilterContext. */ prvUpdateSubAckStatus( pxIncomingPacket ); for( ulTopicCount = 0; ulTopicCount < mqttexampleTOPIC_COUNT; ulTopicCount++ ) { if( xTopicFilterContext[ ulTopicCount ].xSubAckStatus != MQTTSubAckFailure ) { LogInfo( ( "Subscribed to the topic %s with maximum QoS %u.\r\n", xTopicFilterContext[ ulTopicCount ].pcTopicFilter, xTopicFilterContext[ ulTopicCount ].xSubAckStatus ) ); } } /* Make sure ACK packet identifier matches with Request packet identifier. */ configASSERT( usSubscribePacketIdentifier == usPacketId ); break; case MQTT_PACKET_TYPE_UNSUBACK: LogInfo( ( "Unsubscribed from the topic %s.\r\n", pExampleTopic ) ); /* Make sure ACK packet identifier matches with Request packet identifier. */ configASSERT( usUnsubscribePacketIdentifier == usPacketId ); break; case MQTT_PACKET_TYPE_PINGRESP: /* Nothing to be done from application as library handles * PINGRESP with the use of MQTT_ProcessLoop API function. */ LogWarn( ( "PINGRESP should not be handled by the application " "callback when using MQTT_ProcessLoop.\n" ) ); break; /* Any other packet type is invalid. */ default: LogWarn( ( "prvMQTTProcessResponse() called with unknown packet type:(%02X).\r\n", pxIncomingPacket->type ) ); } } /*-----------------------------------------------------------*/ static void prvMQTTProcessIncomingPublish( MQTTPublishInfo_t * pxPublishInfo ) { configASSERT( pxPublishInfo != NULL ); /* Process incoming Publish. */ LogInfo( ( "Incoming QoS : %d\n", pxPublishInfo->qos ) ); /* Verify the received publish is for the we have subscribed to. */ if( ( pxPublishInfo->topicNameLength == strlen( pExampleTopic ) ) && ( 0 == strncmp( pExampleTopic, pxPublishInfo->pTopicName, pxPublishInfo->topicNameLength ) ) ) { LogInfo( ( "\r\nIncoming Publish Topic Name: %.*s matches subscribed topic.\r\n" "Incoming Publish Message : %.*s\r\n", pxPublishInfo->topicNameLength, pxPublishInfo->pTopicName, pxPublishInfo->payloadLength, ( char * ) pxPublishInfo->pPayload ) ); } else { LogInfo( ( "Incoming Publish Topic Name: %.*s does not match subscribed topic.\r\n", pxPublishInfo->topicNameLength, pxPublishInfo->pTopicName ) ); } } /*-----------------------------------------------------------*/ static void prvEventCallback( MQTTContext_t * pxMQTTContext, MQTTPacketInfo_t * pxPacketInfo, MQTTDeserializedInfo_t * pxDeserializedInfo ) { /* The MQTT context is not used for this demo. */ ( void ) pxMQTTContext; if( ( pxPacketInfo->type & 0xF0U ) == MQTT_PACKET_TYPE_PUBLISH ) { prvMQTTProcessIncomingPublish( pxDeserializedInfo->pPublishInfo ); } else { prvMQTTProcessResponse( pxPacketInfo, pxDeserializedInfo->packetIdentifier ); } } /*-----------------------------------------------------------*/ static uint32_t prvGetTimeMs( void ) { TickType_t xTickCount = 0; uint32_t ulTimeMs = 0UL; /* Get the current tick count. */ xTickCount = xTaskGetTickCount(); /* Convert the ticks to milliseconds. */ ulTimeMs = ( uint32_t ) xTickCount * MILLISECONDS_PER_TICK; /* Reduce ulGlobalEntryTimeMs from obtained time so as to always return the * elapsed time in the application. */ ulTimeMs = ( uint32_t ) ( ulTimeMs - ulGlobalEntryTimeMs ); return ulTimeMs; } /*-----------------------------------------------------------*/