/* * Lab-Project-coreMQTT-Agent 201215 * Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Permission is hereby granted, free of charge, to any person obtaining a copy of * this software and associated documentation files (the "Software"), to deal in * the Software without restriction, including without limitation the rights to * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of * the Software, and to permit persons to whom the Software is furnished to do so, * subject to the following conditions: * * The above copyright notice and this permission notice shall be included in all * copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * * http://www.FreeRTOS.org * http://aws.amazon.com/freertos */ /* * This file demonstrates using the MQTT agent API to send MQTT packets that * contain a payload nearly equal to the buffer size used to serialize and * deserialize MQTT packets. It can be used to test behavior when the MQTT * packet is larger than the TCP/IP buffers. The task can run simultaneously * to other demo tasks that also use the MQTT agent API to interact over the * same MQTT connection to the same MQTT broker. * * prvLargeMessageSubscribePublishTask() implements the demo task, which * subscribes to a topic then periodically publishes large payloads to the * same topic to which it has subscribed. Each time it publishes to the topic * it waits for the published data to be published back to it from the MQTT * broker - checking that the received data matches the transmitted data * exactly. */ /* Standard includes. */ #include <string.h> #include <stdio.h> #include <assert.h> /* Kernel includes. */ #include "FreeRTOS.h" #include "task.h" #include "queue.h" /* Demo Specific configs. */ #include "demo_config.h" /* MQTT library includes. */ #include "core_mqtt.h" #include "core_mqtt_state.h" /* MQTT agent include. */ #include "core_mqtt_agent.h" /* Subscription manager header include. */ #include "subscription_manager.h" /** * @brief This demo uses task notifications to signal tasks from MQTT callback * functions. mqttexampleMS_TO_WAIT_FOR_NOTIFICATION defines the time, in ticks, * to wait for such a callback. */ #define mqttexampleMS_TO_WAIT_FOR_NOTIFICATION ( 5000 ) /** * @brief Time, in milliseconds, to wait between cycles of the demo task. */ #define mqttexampleDELAY_BETWEEN_PUBLISH_OPERATIONS_MS ( 1000UL ) /** * @brief The maximum amount of time in milliseconds to wait for the commands * to be posted to the MQTT agent should the MQTT agent's command queue be full. * Tasks wait in the Blocked state, so don't use any CPU time. */ #define mqttexampleMAX_COMMAND_SEND_BLOCK_TIME_MS ( 200 ) /** * @brief Create an MQTT payload that almost fills the buffer allocated for * MQTT message serialization, leaving a little room for the MQTT protocol * headers themselves. */ #define mqttexamplePROTOCOL_OVERHEAD ( 50 ) #define mqttexampleMAX_PAYLOAD_LENGTH ( MQTT_AGENT_NETWORK_BUFFER_SIZE - mqttexamplePROTOCOL_OVERHEAD ) /*-----------------------------------------------------------*/ /** * @brief Defines the structure to use as the command callback context in this * demo. */ struct MQTTAgentCommandContext { MQTTStatus_t xReturnStatus; /* Pass out the result of the operation. */ TaskHandle_t xTaskToNotify; /* Handle of the task to send a notification to. */ void * pvTag; /* Use for callback specific data. */ }; /** * @brief Passed into MQTTAgent_Subscribe() as the callback to execute when the * broker ACKs the SUBSCRIBE message. Its implementation sends a notification * to the task that called MQTTAgent_Subscribe() to let the task know the * SUBSCRIBE operation completed. It also sets the xReturnStatus of the * structure passed in as the command's context to the value of the * xReturnStatus parameter - which enables the task to check the status of the * operation. * * See https://freertos.org/mqtt/mqtt-agent-demo.html#example_mqtt_api_call * * @param[in] pxCommandContext Context of the initial command. * @param[in].xReturnStatus The result of the command. */ static void prvSubscribeCommandCallback( MQTTAgentCommandContext_t * pxCommandContext, MQTTAgentReturnInfo_t * pxReturnInfo ); /** * @brief Passed into MQTTAgent_Subscribe() as the callback to execute when * there is an incoming publish on the topic being subscribed to. Its * implementation copies the incoming MQTT message payload into a buffer so the * task can validate the received data matches the outgoing data (the task * subscribes to the same topic that it publishes to, so any outgoing data on * that topic is also received back from the MQTT broker). * * See https://freertos.org/mqtt/mqtt-agent-demo.html#example_mqtt_api_call * * @param[in] pxSubscriptionContext Context of the initial command. * @param[in] pxCommandContext Context of the initial command. */ static void prvIncomingPublishCallback( void * pxSubscriptionContext, MQTTPublishInfo_t * pxPublishInfo ); /** * @brief Called by the task to wait for a notification from a callback function * after the task first executes either MQTTAgent_Publish()* or * MQTTAgent_Subscribe(). * * See https://freertos.org/mqtt/mqtt-agent-demo.html#example_mqtt_api_call * * @param[in] pxCommandContext Context of the initial command. * * @return The value returned from the containing ulTaskNotifyTake() call. This * is the task's notification value before it was decremented by * ulTaskNotificationTake(). */ static uint32_t prvWaitForCommandAcknowledgment( void ); /** * @brief Create a buffer of data that is easily recognizable in WireShark. * Completely fill the MQTT network buffer, other than leaving a space for the * MQTT protocol headers themselves. * * @param[in] pcBuffer The buffer into which the MQTT payload is written. * @param[in] xBufferSize The length of buffer pointed to by pcBuffer. */ static void prvCreateMQTTPayload( char * pcBuffer, size_t xBufferSize ); /** * @brief Subscribe to the topic the demo task will also publish to - that * results in all outgoing publishes being published back to the task * (effectively echoed back). * * @param[in] pcReceivedPublishPayload The buffer into which the callback that * executes on reception of incoming publish messages will write the payload * from the incoming message. This is stored in the callback context so it can * be accessed from within the callback function. */ static void prvSubscribeToTopic( char * pcReceivedPublishPayload ); /** * @brief The function that implements the task demonstrated by this file. */ static void prvLargeMessageSubscribePublishTask( void * pvParameters ); /*-----------------------------------------------------------*/ /* The MQTT topic used by this demo. */ static const char * pcTopicFilter = "/max/payload/message"; extern MQTTAgentContext_t xGlobalMqttAgentContext; /*-----------------------------------------------------------*/ void vStartLargeMessageSubscribePublishTask( configSTACK_DEPTH_TYPE uxStackSize, UBaseType_t uxPriority ) { xTaskCreate( prvLargeMessageSubscribePublishTask, "LargeSubPub", uxStackSize, NULL, uxPriority, NULL ); } /*-----------------------------------------------------------*/ static void prvSubscribeCommandCallback( MQTTAgentCommandContext_t * pxCommandContext, MQTTAgentReturnInfo_t * pxReturnInfo ) { bool xSubscriptionAdded = false; /* Store the result in the application defined context so the calling task * can check it. */ pxCommandContext->xReturnStatus = pxReturnInfo->returnCode; /* Check if the subscribe operation is a success. Only one topic is * subscribed by this demo. */ if( pxReturnInfo->returnCode == MQTTSuccess ) { /* Add subscription so that incoming publishes are routed to the application * callback. */ xSubscriptionAdded = addSubscription( ( SubscriptionElement_t * ) xGlobalMqttAgentContext.pIncomingCallbackContext, pcTopicFilter, ( uint16_t ) strlen( pcTopicFilter ), prvIncomingPublishCallback, pxCommandContext ); if( xSubscriptionAdded == false ) { LogError( ( "Failed to register an incoming publish callback for topic %.*s.", ( unsigned int ) strlen( pcTopicFilter ), pcTopicFilter ) ); } } xTaskNotifyGive( pxCommandContext->xTaskToNotify ); } /*-----------------------------------------------------------*/ static void prvIncomingPublishCallback( void * pxSubscriptionContext, MQTTPublishInfo_t * pxPublishInfo ) { MQTTAgentCommandContext_t * pxApplicationDefinedContext = ( MQTTAgentCommandContext_t * ) pxSubscriptionContext; /* Check the incoming message will fit in the buffer before writing it to * buffer so the demo task can check the data received from the topic * matches the data previously written to the same topic. */ configASSERT( pxPublishInfo->payloadLength <= mqttexampleMAX_PAYLOAD_LENGTH ); memcpy( pxApplicationDefinedContext->pvTag, ( void * ) pxPublishInfo->pPayload, pxPublishInfo->payloadLength ); /* Send a notification to the task in case it is waiting for this incoming * message. */ xTaskNotifyGive( pxApplicationDefinedContext->xTaskToNotify ); } /*-----------------------------------------------------------*/ static uint32_t prvWaitForCommandAcknowledgment( void ) { uint32_t ulReturn; /* Wait for this task to get notified. */ ulReturn = ulTaskNotifyTake( pdFALSE, mqttexampleMS_TO_WAIT_FOR_NOTIFICATION ); return ulReturn; } /*-----------------------------------------------------------*/ static void prvCreateMQTTPayload( char * pcBuffer, size_t xBufferSize ) { size_t x; /* Create a large buffer of data that is easy to see in Wireshark - the * first half of the buffer is different to the second half to make it * obvious that all the data is on the wire when viewed. */ for( x = 0; x < xBufferSize; x++ ) { if( x < xBufferSize / 2U ) { pcBuffer[ x ] = 'a'; } else { pcBuffer[ x ] = 'b'; } } } /*-----------------------------------------------------------*/ static void prvSubscribeToTopic( char * pcReceivedPublishPayload ) { MQTTAgentSubscribeArgs_t xSubscribeArgs; MQTTSubscribeInfo_t xSubscribeInfo; MQTTStatus_t xStatus; uint32_t ulNotificationValue; MQTTAgentCommandInfo_t xCommandParams = { 0 }; /* Context must persist as long as subscription persists. */ static MQTTAgentCommandContext_t xApplicationDefinedContext = { 0 }; /* Record the handle of this task in the context that will be used within * the callbacks so the callbacks can send a notification to this task. */ xApplicationDefinedContext.xTaskToNotify = xTaskGetCurrentTaskHandle(); xApplicationDefinedContext.pvTag = ( void * ) pcReceivedPublishPayload; /* Ensure the return status is not accidentally MQTTSuccess already. */ xApplicationDefinedContext.xReturnStatus = MQTTBadParameter; /* Complete the subscribe information. The topic string must persist for * duration of subscription - although in this case is it a static const so * will persist for the lifetime of the application. */ xSubscribeInfo.pTopicFilter = pcTopicFilter; xSubscribeInfo.topicFilterLength = ( uint16_t ) strlen( pcTopicFilter ); xSubscribeInfo.qos = MQTTQoS1; xSubscribeArgs.pSubscribeInfo = &xSubscribeInfo; xSubscribeArgs.numSubscriptions = 1; /* Loop in case the queue used to communicate with the MQTT agent is full and * attempts to post to it time out. The queue will not become full if the * priority of the MQTT agent task is higher than the priority of the task * calling this function. */ xTaskNotifyStateClear( NULL ); xCommandParams.blockTimeMs = mqttexampleMAX_COMMAND_SEND_BLOCK_TIME_MS; xCommandParams.cmdCompleteCallback = prvSubscribeCommandCallback; xCommandParams.pCmdCompleteCallbackContext = &xApplicationDefinedContext; LogInfo( ( "Sending subscribe request to agent for topic filter: %s", pcTopicFilter ) ); do { xStatus = MQTTAgent_Subscribe( &xGlobalMqttAgentContext, &( xSubscribeArgs ), &xCommandParams ); } while( xStatus != MQTTSuccess ); /* Wait for acks from subscribe messages - this is optional. If the * returned value is zero then the wait timed out. */ ulNotificationValue = prvWaitForCommandAcknowledgment(); configASSERT( ulNotificationValue != 0UL ); /* The callback sets the xReturnStatus member of the context. */ if( xApplicationDefinedContext.xReturnStatus == MQTTSuccess ) { LogInfo( ( "Received subscribe ack for topic %s", pcTopicFilter ) ); } else { LogError( ( "Failed to subscribe to topic %s", pcTopicFilter ) ); } } /*-----------------------------------------------------------*/ static void prvLargeMessageSubscribePublishTask( void * pvParameters ) { static char pcMaxPayloadMessage[ mqttexampleMAX_PAYLOAD_LENGTH ]; static char pcReceivedPublishPayload[ mqttexampleMAX_PAYLOAD_LENGTH ]; uint32_t ulLargeMessageFailures = 0, ulLargeMessagePasses = 0; MQTTPublishInfo_t xPublishInfo = { 0 }; BaseType_t x; MQTTStatus_t xCommandAdded; uint32_t ulNotificationValue; MQTTAgentCommandInfo_t xCommandParams = { 0 }; ( void ) pvParameters; prvCreateMQTTPayload( pcMaxPayloadMessage, mqttexampleMAX_PAYLOAD_LENGTH ); /* Subscribe to the topic that this task will also publish to so all * outgoing publishes to that topic are published back to this task * effectively echoed back to this task). */ prvSubscribeToTopic( pcReceivedPublishPayload ); /* Prepare the publish message. */ memset( ( void * ) &xPublishInfo, 0x00, sizeof( xPublishInfo ) ); xPublishInfo.qos = MQTTQoS1; xPublishInfo.pTopicName = pcTopicFilter; xPublishInfo.topicNameLength = ( uint16_t ) strlen( pcTopicFilter ); xPublishInfo.pPayload = pcMaxPayloadMessage; xPublishInfo.payloadLength = mqttexampleMAX_PAYLOAD_LENGTH; for( ; ; ) { /* Clear out the buffer used to receive incoming publishes. The * prvIncomingPublishCallback() callback writes to this buffer. */ memset( pcReceivedPublishPayload, 0x00, sizeof( pcReceivedPublishPayload ) ); /* Publish to the topic to which this task is also subscribed to * receive an echo back. Note the command callback is left NULL so this * task will not be notified of when the PUBLISH ack is received - instead * it just waits for a notification from prvSubscribeCommandCallback() that * the incoming publish (the message being echoed back) has been received. */ LogInfo( ( "Sending large publish request to agent with message on topic \"%s\"", pcTopicFilter ) ); xCommandParams.blockTimeMs = mqttexampleMAX_COMMAND_SEND_BLOCK_TIME_MS; xCommandParams.cmdCompleteCallback = NULL; /* Note not used as going to wait for the echo anyway. */ xCommandAdded = MQTTAgent_Publish( &xGlobalMqttAgentContext, &xPublishInfo, &xCommandParams ); /* Ensure the messages was sent to the MQTT agent task. */ configASSERT( xCommandAdded == MQTTSuccess ); /* Wait for the publish back to this task. prvSubscribeCommandCallback() * will notify this task. */ ulNotificationValue = ulTaskNotifyTake( pdFALSE, mqttexampleMS_TO_WAIT_FOR_NOTIFICATION ); /* Only expect a single notification from the callback that executes * when the incoming publish (the message being echoed back) is * received. */ if( ulNotificationValue != 1 ) { ulLargeMessageFailures++; LogError( ( "Error - unexpected number of notifications (P%d:F%d).", ( int ) ulLargeMessagePasses, ( int ) ulLargeMessageFailures ) ); } else { /* prvSubscribeCommandCallback() should have copied the payload form * the incoming MQTT publish to receivedEchoPayload. Check this matches * the data that was published by this task. */ x = memcmp( pcMaxPayloadMessage, pcReceivedPublishPayload, mqttexampleMAX_PAYLOAD_LENGTH ); if( x == 0 ) { ulLargeMessagePasses++; LogInfo( ( "Rx'ed ack from Tx to %s (P%d:F%d).", pcTopicFilter, ( int ) ulLargeMessagePasses, ( int ) ulLargeMessageFailures ) ); } else { LogError( ( "Timed out Rx'ing ack from Tx to %s (P%d:F%d)", pcTopicFilter, ( int ) ulLargeMessagePasses, ( int ) ulLargeMessageFailures ) ); } } vTaskDelay( pdMS_TO_TICKS( mqttexampleDELAY_BETWEEN_PUBLISH_OPERATIONS_MS ) ); } }