/* * AWS IoT Device SDK for Embedded C 202211.00 * Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Permission is hereby granted, free of charge, to any person obtaining a copy of * this software and associated documentation files (the "Software"), to deal in * the Software without restriction, including without limitation the rights to * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of * the Software, and to permit persons to whom the Software is furnished to do so, * subject to the following conditions: * * The above copyright notice and this permission notice shall be included in all * copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ /* * Demo for showing the use of MQTT APIs to establish an MQTT session, * subscribe to a topic, publish to a topic, receive incoming publishes, * unsubscribe from a topic and disconnect the MQTT session. * * This demo shows an example of connecting to a local Greengrass core using * local auth, and using the MQTT broker provided by its Moquette component. */ #include #include #include #include #include /* Logging configuration for the Demo. */ #define LIBRARY_LOG_NAME "DEMO" #define LIBRARY_LOG_LEVEL LOG_INFO #include "logging_stack.h" #include "demo_config.h" /* coreMQTT API headers. */ #include "core_mqtt.h" #include "core_mqtt_serializer.h" #include "core_mqtt_state.h" /* OpenSSL sockets transport implementation. */ #include "openssl_posix.h" /*Include backoff algorithm header for retry logic.*/ #include "backoff_algorithm.h" /* Clock for timer. */ #include "clock.h" /* These configuration settings are required to run the mutual auth demo. * Throw compilation error if the below configs are not defined. */ #ifndef GREENGRASS_ADDRESS #error "Please define the Greengrass IP (GREENGRASS_ADDRESS) in demo_config.h." #endif #ifndef ROOT_CA_CERT_PATH #error "Please define path to the Greengrass Root CA certificate (ROOT_CA_CERT_PATH) in demo_config.h." #endif #ifndef THING_NAME #error "Please define the device Thing name (THING_NAME) in demo_config.h." #endif #ifndef CLIENT_CERT_PATH #error "Please define path to client certificate (CLIENT_CERT_PATH) in demo_config.h." #endif #ifndef CLIENT_PRIVATE_KEY_PATH #error "Please define path to client private key (CLIENT_PRIVATE_KEY_PATH) in demo_config.h." #endif /** * @brief Length of the Greengrass IP. */ #define GREENGRASS_ADDRESS_LENGTH ( ( uint16_t ) ( sizeof( GREENGRASS_ADDRESS ) - 1 ) ) /** * @brief Length of the Thing name. */ #define THING_NAME_LENGTH ( ( uint16_t ) ( sizeof( THING_NAME ) - 1 ) ) /** * @brief The topic to subscribe and publish to in the example. */ #define MQTT_EXAMPLE_TOPIC "/example/topic" /** * @brief Length of client MQTT topic. */ #define MQTT_EXAMPLE_TOPIC_LENGTH ( ( uint16_t ) ( sizeof( MQTT_EXAMPLE_TOPIC ) - 1 ) ) /** * @brief The MQTT message published in this example. */ #define MQTT_EXAMPLE_MESSAGE "Hello World!" /** * @brief The length of the MQTT message published in this example. */ #define MQTT_EXAMPLE_MESSAGE_LENGTH ( ( uint16_t ) ( sizeof( MQTT_EXAMPLE_MESSAGE ) - 1 ) ) /** * @brief Size of the network buffer for MQTT packets. */ #define NETWORK_BUFFER_SIZE ( 1024U ) /** * @brief Timeout for receiving CONNACK packet in milli seconds. */ #define CONNACK_RECV_TIMEOUT_MS ( 1000U ) /** * @brief Maximum number of outgoing publishes maintained in the application * until an ack is received from the broker. */ #define MAX_OUTGOING_PUBLISHES ( 5U ) /** * @brief Invalid packet identifier for the MQTT packets. Zero is always an * invalid packet identifier as per MQTT 3.1.1 spec. */ #define MQTT_PACKET_ID_INVALID ( ( uint16_t ) 0U ) /** * @brief Timeout for MQTT_ProcessLoop function in milliseconds. */ #define MQTT_PROCESS_LOOP_TIMEOUT_MS ( 500U ) /** * @brief The maximum time interval in seconds which is allowed to elapse * between two Control Packets. * * It is the responsibility of the Client to ensure that the interval between * Control Packets being sent does not exceed the this Keep Alive value. In the * absence of sending any other Control Packets, the Client MUST send a * PINGREQ Packet. */ #define MQTT_KEEP_ALIVE_INTERVAL_SECONDS ( 60U ) /** * @brief Delay between MQTT publishes in seconds. */ #define DELAY_BETWEEN_PUBLISHES_SECONDS ( 1U ) /** * @brief Number of PUBLISH messages sent per iteration. */ #define MQTT_PUBLISH_COUNT_PER_LOOP ( 5U ) /** * @brief Delay in seconds between two iterations of subscribePublishLoop(). */ #define MQTT_SUBPUB_LOOP_DELAY_SECONDS ( 5U ) /** * @brief Transport timeout in milliseconds for transport send and receive. */ #define TRANSPORT_SEND_RECV_TIMEOUT_MS ( 500 ) /*-----------------------------------------------------------*/ /** * @brief Status of latest Subscribe ACK; * it is updated every time the callback function processes a Subscribe ACK * and accounts for subscription to a single topic. */ static MQTTSubAckStatus_t globalSubAckStatus = MQTTSubAckFailure; /*-----------------------------------------------------------*/ /* Each compilation unit must define the NetworkContext struct. */ struct NetworkContext { OpensslParams_t * pParams; }; /*-----------------------------------------------------------*/ /** * @brief Connect to MQTT broker with reconnection retries. * * If connection fails, retry is attempted after a timeout. * Timeout value will exponentially increase until maximum * timeout value is reached or the number of attempts are exhausted. * * @param[out] pNetworkContext The output parameter to return the created network context. * @param[out] pMqttContext The output to return the created MQTT context. * * @return EXIT_FAILURE on failure; EXIT_SUCCESS on successful connection. */ static int connectToServerWithBackoffRetries( NetworkContext_t * pNetworkContext, MQTTContext_t * pMqttContext ); /** * @brief A function that uses the passed MQTT connection to * subscribe to a topic, publish to the same topic * MQTT_PUBLISH_COUNT_PER_LOOP number of times, and verify if it * receives the Publish message back. * * @param[in] pMqttContext MQTT context pointer. * * @return EXIT_FAILURE on failure; EXIT_SUCCESS on success. */ static int subscribePublishLoop( MQTTContext_t * pMqttContext ); /** * @brief The function to handle the incoming publishes. * * @param[in] pPublishInfo Pointer to publish info of the incoming publish. * @param[in] packetIdentifier Packet identifier of the incoming publish. */ static void handleIncomingPublish( MQTTPublishInfo_t * pPublishInfo, uint16_t packetIdentifier ); /** * @brief The application callback function for getting the incoming publish * and incoming acks reported from MQTT library. * * @param[in] pMqttContext MQTT context pointer. * @param[in] pPacketInfo Packet Info pointer for the incoming packet. * @param[in] pDeserializedInfo Deserialized information from the incoming packet. */ static void eventCallback( MQTTContext_t * pMqttContext, MQTTPacketInfo_t * pPacketInfo, MQTTDeserializedInfo_t * pDeserializedInfo ); /** * @brief Initializes the MQTT library. * * @param[in] pMqttContext MQTT context pointer. * @param[in] pNetworkContext The network context pointer. * * @return EXIT_SUCCESS if the MQTT library is initialized; * EXIT_FAILURE otherwise. */ static int initializeMqtt( MQTTContext_t * pMqttContext, NetworkContext_t * pNetworkContext ); /** * @brief Sends an MQTT CONNECT packet over the already connected TCP socket. * * @param[in] pMqttContext MQTT context pointer. * * @return EXIT_SUCCESS if an MQTT session is established; * EXIT_FAILURE otherwise. */ static int establishMqttSession( MQTTContext_t * pMqttContext ); /** * @brief Close an MQTT session by sending MQTT DISCONNECT. * * @param[in] pMqttContext MQTT context pointer. * * @return EXIT_SUCCESS if DISCONNECT was successfully sent; * EXIT_FAILURE otherwise. */ static int disconnectMqttSession( MQTTContext_t * pMqttContext ); /** * @brief Sends an MQTT SUBSCRIBE to subscribe to #MQTT_EXAMPLE_TOPIC * defined at the top of the file. * * @param[in] pMqttContext MQTT context pointer. * * @return EXIT_SUCCESS if SUBSCRIBE was successfully sent; * EXIT_FAILURE otherwise. */ static int subscribeToTopic( MQTTContext_t * pMqttContext ); /** * @brief Sends an MQTT UNSUBSCRIBE to unsubscribe from * #MQTT_EXAMPLE_TOPIC defined at the top of the file. * * @param[in] pMqttContext MQTT context pointer. * * @return EXIT_SUCCESS if UNSUBSCRIBE was successfully sent; * EXIT_FAILURE otherwise. */ static int unsubscribeFromTopic( MQTTContext_t * pMqttContext ); /** * @brief Sends an MQTT PUBLISH to #MQTT_EXAMPLE_TOPIC defined at * the top of the file. * * @param[in] pMqttContext MQTT context pointer. * * @return EXIT_SUCCESS if PUBLISH was successfully sent; * EXIT_FAILURE otherwise. */ static int publishToTopic( MQTTContext_t * pMqttContext ); /** * @brief Function to update variable globalSubAckStatus with status * information from Subscribe ACK. Called by eventCallback after processing * incoming subscribe echo. * * @param[in] Server response to the subscription request. */ static void updateSubAckStatus( MQTTPacketInfo_t * pPacketInfo ); /*-----------------------------------------------------------*/ static int connectToServerWithBackoffRetries( NetworkContext_t * pNetworkContext, MQTTContext_t * pMqttContext ) { int returnStatus = EXIT_FAILURE; BackoffAlgorithmStatus_t backoffAlgStatus = BackoffAlgorithmSuccess; OpensslStatus_t opensslStatus = OPENSSL_SUCCESS; BackoffAlgorithmContext_t reconnectParams; ServerInfo_t serverInfo; OpensslCredentials_t opensslCredentials = { 0 }; uint16_t nextRetryBackOff; /* Initialize information to connect to the MQTT broker. */ serverInfo.pHostName = GREENGRASS_ADDRESS; serverInfo.hostNameLength = GREENGRASS_ADDRESS_LENGTH; serverInfo.port = MQTT_PORT; /* Initialize credentials for establishing TLS session. */ opensslCredentials.pRootCaPath = ROOT_CA_CERT_PATH; opensslCredentials.pClientCertPath = CLIENT_CERT_PATH; opensslCredentials.pPrivateKeyPath = CLIENT_PRIVATE_KEY_PATH; /* Initialize reconnect attempts and interval */ BackoffAlgorithm_InitializeParams( &reconnectParams, 500U, /* Backoff base ms */ 5000U, /* Max backoff delay ms */ 5U ); /* Max attempts */ /* Attempt to connect to the core's broker. If connection fails, retry after * backoff period. */ do { LogInfo( ( "Establishing a TLS session to %.*s:%d.", GREENGRASS_ADDRESS_LENGTH, GREENGRASS_ADDRESS, MQTT_PORT ) ); opensslStatus = Openssl_Connect( pNetworkContext, &serverInfo, &opensslCredentials, TRANSPORT_SEND_RECV_TIMEOUT_MS, TRANSPORT_SEND_RECV_TIMEOUT_MS ); if( opensslStatus == OPENSSL_SUCCESS ) { /* Sends an MQTT Connect packet using the established TLS session, * then waits for connection acknowledgment (CONNACK) packet. */ returnStatus = establishMqttSession( pMqttContext ); if( returnStatus == EXIT_FAILURE ) { /* End TLS session, then close TCP connection. */ ( void ) Openssl_Disconnect( pNetworkContext ); } } if( returnStatus == EXIT_FAILURE ) { /* Generate a random number and get back-off value (in milliseconds) for the next connection retry. */ backoffAlgStatus = BackoffAlgorithm_GetNextBackoff( &reconnectParams, rand(), &nextRetryBackOff ); if( backoffAlgStatus == BackoffAlgorithmRetriesExhausted ) { LogError( ( "Connection to the broker failed, all attempts exhausted." ) ); returnStatus = EXIT_FAILURE; } else if( backoffAlgStatus == BackoffAlgorithmSuccess ) { LogWarn( ( "Connection to the broker failed. Retrying connection " "after %hu ms backoff.", ( unsigned short ) nextRetryBackOff ) ); Clock_SleepMs( nextRetryBackOff ); } } } while( ( returnStatus == EXIT_FAILURE ) && ( backoffAlgStatus == BackoffAlgorithmSuccess ) ); return returnStatus; } /*-----------------------------------------------------------*/ static void handleIncomingPublish( MQTTPublishInfo_t * pPublishInfo, uint16_t packetIdentifier ) { assert( pPublishInfo != NULL ); /* Process incoming Publish. */ LogInfo( ( "Incoming QoS : %d.", pPublishInfo->qos ) ); /* Verify the received publish is for the topic we have subscribed to. */ if( ( pPublishInfo->topicNameLength == MQTT_EXAMPLE_TOPIC_LENGTH ) && ( 0 == strncmp( MQTT_EXAMPLE_TOPIC, pPublishInfo->pTopicName, pPublishInfo->topicNameLength ) ) ) { LogInfo( ( "Incoming Publish Topic Name: %.*s matches subscribed topic.\n" "Incoming Publish message Packet Id is %u.\n" "Incoming Publish Message : %.*s.\n\n", pPublishInfo->topicNameLength, pPublishInfo->pTopicName, packetIdentifier, ( int ) pPublishInfo->payloadLength, ( const char * ) pPublishInfo->pPayload ) ); } else { LogInfo( ( "Incoming Publish Topic Name: %.*s does not match subscribed topic.", pPublishInfo->topicNameLength, pPublishInfo->pTopicName ) ); } } /*-----------------------------------------------------------*/ static void updateSubAckStatus( MQTTPacketInfo_t * pPacketInfo ) { uint8_t * pPayload = NULL; size_t pSize = 0; ( void ) MQTT_GetSubAckStatusCodes( pPacketInfo, &pPayload, &pSize ); /* Demo only subscribes to one topic, so only one status code is returned. */ globalSubAckStatus = ( MQTTSubAckStatus_t ) pPayload[ 0 ]; } /*-----------------------------------------------------------*/ static void eventCallback( MQTTContext_t * pMqttContext, MQTTPacketInfo_t * pPacketInfo, MQTTDeserializedInfo_t * pDeserializedInfo ) { uint16_t packetId; assert( pMqttContext != NULL ); assert( pPacketInfo != NULL ); assert( pDeserializedInfo != NULL ); ( void ) pMqttContext; packetId = pDeserializedInfo->packetIdentifier; /* Handle incoming publish. The lower 4 bits of the publish packet * type is used for the dup, QoS, and retain flags. Hence masking * out the lower bits to check if the packet is publish. */ if( ( pPacketInfo->type & 0xF0U ) == MQTT_PACKET_TYPE_PUBLISH ) { assert( pDeserializedInfo->pPublishInfo != NULL ); handleIncomingPublish( pDeserializedInfo->pPublishInfo, packetId ); } else { /* Handle other packets. */ switch( pPacketInfo->type ) { case MQTT_PACKET_TYPE_SUBACK: /* A SUBACK from the broker, containing the server response to our subscription request, has been received. * It contains the status code indicating server approval/rejection for the subscription to the single topic * requested. The SUBACK will be parsed to obtain the status code, and this status code will be stored in global * variable globalSubAckStatus. */ updateSubAckStatus( pPacketInfo ); /* Check status of the subscription request. If globalSubAckStatus does not indicate * server refusal of the request (MQTTSubAckFailure), it contains the QoS level granted * by the server, indicating a successful subscription attempt. */ if( globalSubAckStatus != MQTTSubAckFailure ) { LogInfo( ( "Subscribed to the topic %.*s. with maximum QoS %u.", MQTT_EXAMPLE_TOPIC_LENGTH, MQTT_EXAMPLE_TOPIC, globalSubAckStatus ) ); } break; case MQTT_PACKET_TYPE_UNSUBACK: LogInfo( ( "Unsubscribed from the topic %.*s.", MQTT_EXAMPLE_TOPIC_LENGTH, MQTT_EXAMPLE_TOPIC ) ); break; default: LogError( ( "Unhandled packet type received: %02x", pPacketInfo->type ) ); } } } /*-----------------------------------------------------------*/ static int establishMqttSession( MQTTContext_t * pMqttContext ) { int returnStatus = EXIT_SUCCESS; MQTTStatus_t mqttStatus; MQTTConnectInfo_t connectInfo = { 0 }; bool sessionPresent = false; assert( pMqttContext != NULL ); /* Establish MQTT session by sending a CONNECT packet. */ /* For connecting to a Greengrass core, the MQTT client identifier must * match the device's Thing name. */ connectInfo.pClientIdentifier = THING_NAME; connectInfo.clientIdentifierLength = THING_NAME_LENGTH; connectInfo.keepAliveSeconds = MQTT_KEEP_ALIVE_INTERVAL_SECONDS; connectInfo.cleanSession = true; mqttStatus = MQTT_Connect( pMqttContext, &connectInfo, NULL, CONNACK_RECV_TIMEOUT_MS, &sessionPresent ); if( mqttStatus != MQTTSuccess ) { returnStatus = EXIT_FAILURE; LogError( ( "Connection with MQTT broker failed with status %s.", MQTT_Status_strerror( mqttStatus ) ) ); } else { LogInfo( ( "MQTT connection successfully established with broker.\n\n" ) ); } return returnStatus; } /*-----------------------------------------------------------*/ static int disconnectMqttSession( MQTTContext_t * pMqttContext ) { MQTTStatus_t mqttStatus = MQTTSuccess; int returnStatus = EXIT_SUCCESS; assert( pMqttContext != NULL ); /* Send DISCONNECT. */ mqttStatus = MQTT_Disconnect( pMqttContext ); if( mqttStatus != MQTTSuccess ) { LogError( ( "Sending MQTT DISCONNECT failed with status=%s.", MQTT_Status_strerror( mqttStatus ) ) ); returnStatus = EXIT_FAILURE; } return returnStatus; } /*-----------------------------------------------------------*/ static int subscribeToTopic( MQTTContext_t * pMqttContext ) { int returnStatus = EXIT_SUCCESS; MQTTStatus_t mqttStatus; MQTTSubscribeInfo_t subInfo = { 0 }; assert( pMqttContext != NULL ); /* This example subscribes to only one topic with max QoS 0. */ subInfo.qos = MQTTQoS0; subInfo.pTopicFilter = MQTT_EXAMPLE_TOPIC; subInfo.topicFilterLength = MQTT_EXAMPLE_TOPIC_LENGTH; /* Send SUBSCRIBE packet. */ mqttStatus = MQTT_Subscribe( pMqttContext, &subInfo, 1U, MQTT_GetPacketId( pMqttContext ) ); if( mqttStatus != MQTTSuccess ) { LogError( ( "Failed to send SUBSCRIBE packet to broker with error = %s.", MQTT_Status_strerror( mqttStatus ) ) ); returnStatus = EXIT_FAILURE; } else { LogInfo( ( "SUBSCRIBE sent for topic %.*s to broker.\n\n", MQTT_EXAMPLE_TOPIC_LENGTH, MQTT_EXAMPLE_TOPIC ) ); } return returnStatus; } /*-----------------------------------------------------------*/ static int unsubscribeFromTopic( MQTTContext_t * pMqttContext ) { int returnStatus = EXIT_SUCCESS; MQTTStatus_t mqttStatus; MQTTSubscribeInfo_t unsubInfo = { 0 }; assert( pMqttContext != NULL ); /* This example subscribes to and unsubscribes from only one topic. */ unsubInfo.pTopicFilter = MQTT_EXAMPLE_TOPIC; unsubInfo.topicFilterLength = MQTT_EXAMPLE_TOPIC_LENGTH; /* Send UNSUBSCRIBE packet. */ mqttStatus = MQTT_Unsubscribe( pMqttContext, &unsubInfo, 1U, MQTT_GetPacketId( pMqttContext ) ); if( mqttStatus != MQTTSuccess ) { LogError( ( "Failed to send UNSUBSCRIBE packet to broker with error = %s.", MQTT_Status_strerror( mqttStatus ) ) ); returnStatus = EXIT_FAILURE; } else { LogInfo( ( "UNSUBSCRIBE sent for topic %.*s to broker.\n\n", MQTT_EXAMPLE_TOPIC_LENGTH, MQTT_EXAMPLE_TOPIC ) ); } return returnStatus; } /*-----------------------------------------------------------*/ static int publishToTopic( MQTTContext_t * pMqttContext ) { int returnStatus = EXIT_SUCCESS; MQTTStatus_t mqttStatus = MQTTSuccess; MQTTPublishInfo_t publish = { 0 }; uint16_t packetId; assert( pMqttContext != NULL ); /* This example publishes to only one topic and uses QoS0. */ publish.qos = MQTTQoS0; publish.pTopicName = MQTT_EXAMPLE_TOPIC; publish.topicNameLength = MQTT_EXAMPLE_TOPIC_LENGTH; publish.pPayload = MQTT_EXAMPLE_MESSAGE; publish.payloadLength = MQTT_EXAMPLE_MESSAGE_LENGTH; packetId = MQTT_GetPacketId( pMqttContext ); mqttStatus = MQTT_Publish( pMqttContext, &publish, packetId ); if( mqttStatus != MQTTSuccess ) { LogError( ( "Failed to send PUBLISH packet to broker with error = %s.", MQTT_Status_strerror( mqttStatus ) ) ); returnStatus = EXIT_FAILURE; } else { LogInfo( ( "PUBLISH sent for topic %.*s to broker with packet ID %u.\n\n", MQTT_EXAMPLE_TOPIC_LENGTH, MQTT_EXAMPLE_TOPIC, packetId ) ); } return returnStatus; } /*-----------------------------------------------------------*/ static int initializeMqtt( MQTTContext_t * pMqttContext, NetworkContext_t * pNetworkContext ) { int returnStatus = EXIT_SUCCESS; MQTTStatus_t mqttStatus; MQTTFixedBuffer_t networkBuffer; TransportInterface_t transport = { NULL }; /* The network buffer must remain valid for the lifetime of the MQTT context. */ static uint8_t buffer[ NETWORK_BUFFER_SIZE ]; assert( pMqttContext != NULL ); assert( pNetworkContext != NULL ); /* Fill in TransportInterface send and receive function pointers. * For this demo, TCP sockets are used to send and receive data * from network. Network context is SSL context for OpenSSL.*/ transport.pNetworkContext = pNetworkContext; transport.send = Openssl_Send; transport.recv = Openssl_Recv; /* Fill the values for network buffer. */ networkBuffer.pBuffer = buffer; networkBuffer.size = NETWORK_BUFFER_SIZE; /* Initialize MQTT library. */ mqttStatus = MQTT_Init( pMqttContext, &transport, Clock_GetTimeMs, eventCallback, &networkBuffer ); if( mqttStatus != MQTTSuccess ) { returnStatus = EXIT_FAILURE; LogError( ( "MQTT init failed: Status = %s.", MQTT_Status_strerror( mqttStatus ) ) ); } return returnStatus; } /*-----------------------------------------------------------*/ static int subscribePublishLoop( MQTTContext_t * pMqttContext ) { int returnStatus = EXIT_SUCCESS; MQTTStatus_t mqttStatus = MQTTSuccess; uint32_t publishCount = 0; const uint32_t maxPublishCount = MQTT_PUBLISH_COUNT_PER_LOOP; assert( pMqttContext != NULL ); if( returnStatus == EXIT_SUCCESS ) { /* The client is now connected to the broker. Subscribe to the topic * as specified in MQTT_EXAMPLE_TOPIC at the top of this file by sending a * subscribe packet. This client will then publish to the same topic it * subscribed to, so it will expect all the messages it sends to the broker * to be sent back to it from the broker. */ LogInfo( ( "Subscribing to the MQTT topic %.*s.", MQTT_EXAMPLE_TOPIC_LENGTH, MQTT_EXAMPLE_TOPIC ) ); returnStatus = subscribeToTopic( pMqttContext ); } if( returnStatus == EXIT_SUCCESS ) { /* Process incoming packet from the broker. Acknowledgment for subscription * ( SUBACK ) will be received here. However after sending the subscribe, the * client may receive a publish before it receives a subscribe ack. Since this * demo is subscribing to the topic to which no one is publishing, probability * of receiving publish message before subscribe ack is zero; but application * must be ready to receive any packet. This demo uses MQTT_ProcessLoop to * receive packet from network. */ mqttStatus = MQTT_ProcessLoop( pMqttContext ); if( mqttStatus != MQTTSuccess ) { returnStatus = EXIT_FAILURE; LogError( ( "MQTT_ProcessLoop returned with status = %s.", MQTT_Status_strerror( mqttStatus ) ) ); } } /* Check if recent subscription request has been rejected. globalSubAckStatus is updated * in eventCallback to reflect the status of the SUBACK sent by the broker. */ if( ( returnStatus == EXIT_SUCCESS ) && ( globalSubAckStatus == MQTTSubAckFailure ) ) { LogWarn( ( "Server rejected subscription request." ) ); returnStatus = EXIT_FAILURE; } if( returnStatus == EXIT_SUCCESS ) { /* Publish messages with QoS0, receive incoming messages and * send keep alive messages. */ for( publishCount = 0; publishCount < maxPublishCount; publishCount++ ) { LogInfo( ( "Sending Publish to the MQTT topic %.*s.", MQTT_EXAMPLE_TOPIC_LENGTH, MQTT_EXAMPLE_TOPIC ) ); returnStatus = publishToTopic( pMqttContext ); /* Calling MQTT_ProcessLoop to process incoming publish echo, since * application subscribed to the same topic the broker will send * publish message back to the application. This function also * sends ping request to broker if MQTT_KEEP_ALIVE_INTERVAL_SECONDS * has expired since the last MQTT packet sent and receive * ping responses. */ mqttStatus = MQTT_ProcessLoop( pMqttContext ); /* For any error in #MQTT_ProcessLoop, exit the loop and disconnect * from the broker. */ if( mqttStatus != MQTTSuccess ) { LogError( ( "MQTT_ProcessLoop returned with status = %s.", MQTT_Status_strerror( mqttStatus ) ) ); returnStatus = EXIT_FAILURE; break; } LogInfo( ( "Delay before continuing to next iteration.\n\n" ) ); /* Leave connection idle for some time. */ sleep( DELAY_BETWEEN_PUBLISHES_SECONDS ); } } if( returnStatus == EXIT_SUCCESS ) { /* Unsubscribe from the topic. */ LogInfo( ( "Unsubscribing from the MQTT topic %.*s.", MQTT_EXAMPLE_TOPIC_LENGTH, MQTT_EXAMPLE_TOPIC ) ); returnStatus = unsubscribeFromTopic( pMqttContext ); } if( returnStatus == EXIT_SUCCESS ) { /* Process Incoming UNSUBACK packet from the broker. */ mqttStatus = MQTT_ProcessLoop( pMqttContext ); if( mqttStatus != MQTTSuccess ) { returnStatus = EXIT_FAILURE; LogError( ( "MQTT_ProcessLoop returned with status = %s.", MQTT_Status_strerror( mqttStatus ) ) ); } } /* Send an MQTT Disconnect packet over the already connected TCP socket. * There is no corresponding response for the disconnect packet. After sending * disconnect, client must close the network connection. */ LogInfo( ( "Disconnecting the MQTT connection with %.*s.", GREENGRASS_ADDRESS_LENGTH, GREENGRASS_ADDRESS ) ); if( returnStatus == EXIT_FAILURE ) { /* Returned status is not used to update the local status as there * were failures in demo execution. */ ( void ) disconnectMqttSession( pMqttContext ); } else { returnStatus = disconnectMqttSession( pMqttContext ); } /* Reset global SUBACK status variable after completion of subscription request cycle. */ globalSubAckStatus = MQTTSubAckFailure; return returnStatus; } /*-----------------------------------------------------------*/ /** * @brief Entry point of demo. * * The example shown below uses MQTT APIs to send and receive MQTT packets * over the TLS connection established using OpenSSL. */ int main( int argc, char ** argv ) { int returnStatus = EXIT_SUCCESS; MQTTContext_t mqttContext = { 0 }; NetworkContext_t networkContext = { 0 }; OpensslParams_t opensslParams = { 0 }; struct timespec tp; ( void ) argc; ( void ) argv; /* Set the pParams member of the network context with desired transport. */ networkContext.pParams = &opensslParams; /* Seed pseudo random number generator (provided by ISO C standard library) for * use by retry utils library when retrying failed network operations. */ /* Get current time to seed pseudo random number generator. */ ( void ) clock_gettime( CLOCK_REALTIME, &tp ); /* Seed pseudo random number generator with nanoseconds. */ srand( tp.tv_nsec ); /* Initialize MQTT library. Initialization of the MQTT library needs to be * done only once in this demo. */ returnStatus = initializeMqtt( &mqttContext, &networkContext ); if( returnStatus == EXIT_SUCCESS ) { for( ; ; ) { /* Attempt to connect to the MQTT broker. If connection fails, retry after * a timeout. Timeout value will be exponentially increased till the maximum * attempts are reached or maximum timeout value is reached. The function * returns EXIT_FAILURE if the TCP connection cannot be established to * broker after configured number of attempts. */ returnStatus = connectToServerWithBackoffRetries( &networkContext, &mqttContext ); if( returnStatus == EXIT_FAILURE ) { /* Log error to indicate connection failure after all * reconnect attempts are over. */ LogError( ( "Failed to connect to MQTT broker %.*s.", GREENGRASS_ADDRESS_LENGTH, GREENGRASS_ADDRESS ) ); } else { /* If TLS session is established, execute Subscribe/Publish loop. */ returnStatus = subscribePublishLoop( &mqttContext ); /* End TLS session, then close TCP connection. */ ( void ) Openssl_Disconnect( &networkContext ); } if( returnStatus == EXIT_SUCCESS ) { /* Log message indicating an iteration completed successfully. */ LogInfo( ( "Demo completed successfully." ) ); } LogInfo( ( "Short delay before starting the next iteration....\n" ) ); sleep( MQTT_SUBPUB_LOOP_DELAY_SECONDS ); } } return returnStatus; } /*-----------------------------------------------------------*/