/* * FreeRTOS V202011.00 * Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Permission is hereby granted, free of charge, to any person obtaining a copy of * this software and associated documentation files (the "Software"), to deal in * the Software without restriction, including without limitation the rights to * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of * the Software, and to permit persons to whom the Software is furnished to do so, * subject to the following conditions: * * The above copyright notice and this permission notice shall be included in all * copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * * http://aws.amazon.com/freertos * http://www.FreeRTOS.org */ /** * @file ota_over_mqtt_demo.c * @brief Over The Air Update demo using coreMQTT Agent. * * The file demonstrates how to perform Over The Air update using OTA agent and coreMQTT * library. It creates an OTA agent task which manages the OTA firmware update * for the device. The example also provides implementations to subscribe, publish, * and receive data from an MQTT broker. The implementation uses coreMQTT agent which manages * thread safety of the MQTT operations and allows OTA agent to share the same MQTT * broker connection with other tasks. OTA agent invokes the callback implementations to * publish job related control information, as well as receive chunks * of presigned firmware image from the MQTT broker. * * See https://freertos.org/mqtt/mqtt-agent-demo.html * See https://freertos.org/ota/ota-mqtt-agent-demo.html */ /* Standard includes. */ #include #include #include #include /* Kernel includes. */ #include "FreeRTOS.h" #include "task.h" #include "semphr.h" #include "ota_config.h" #include "demo_config.h" /* MQTT library includes. */ #include "core_mqtt_agent.h" /* Subscription manager header include. */ #include "subscription_manager.h" /* OTA Library include. */ #include "ota.h" /* OTA Library Interface include. */ #include "ota_os_freertos.h" #include "ota_mqtt_interface.h" #include "ota_platform_interface.h" /* Include firmware version struct definition. */ #include "ota_appversion32.h" /* Include platform abstraction header. */ #include "ota_pal.h" /*------------- Demo configurations -------------------------*/ #ifndef democonfigCLIENT_IDENTIFIER #error "Please define the democonfigCLIENT_IDENTIFIER with the thing name for which OTA is performed" #endif #if ( democonfigCREATE_DEFENDER_DEMO != 0 ) #if !defined( MQTT_AGENT_NETWORK_BUFFER_SIZE ) || ( MQTT_AGENT_NETWORK_BUFFER_SIZE < OTA_DATA_BLOCK_SIZE ) #error "MQTT agent buffer is too small. Please increase the buffer size to atleast the size required for OTA data block." #endif #endif /** * @brief The maximum size of the file paths used in the demo. */ #define otaexampleMAX_FILE_PATH_SIZE ( 260 ) /** * @brief The maximum size of the stream name required for downloading update file * from streaming service. */ #define otaexampleMAX_STREAM_NAME_SIZE ( 128 ) /** * @brief The delay used in the OTA demo task to periodically output the OTA * statistics like number of packets received, dropped, processed and queued per connection. */ #define otaexampleTASK_DELAY_MS ( 1000U ) /** * @brief The maximum time for which OTA demo waits for an MQTT operation to be complete. * This involves receiving an acknowledgment for broker for SUBSCRIBE, UNSUBSCRIBE and non * QOS0 publishes. */ #define otaexampleMQTT_TIMEOUT_MS ( 5000U ) /** * @brief The common prefix for all OTA topics. * * Thing name is substituted with a wildcard symbol `+`. OTA agent * registers with MQTT broker with the thing name in the topic. This topic * filter is used to match incoming packet received and route them to OTA. * Thing name is not needed for this matching. */ #define OTA_TOPIC_PREFIX "$aws/things/+/" /** * @brief Wildcard topic filter for job notification. * The filter is used to match the constructed job notify topic filter from OTA agent and register * appropriate callback for it. */ #define OTA_JOB_NOTIFY_TOPIC_FILTER OTA_TOPIC_PREFIX "jobs/notify-next" /** * @brief Length of job notification topic filter. */ #define OTA_JOB_NOTIFY_TOPIC_FILTER_LENGTH ( ( uint16_t ) ( sizeof( OTA_JOB_NOTIFY_TOPIC_FILTER ) - 1 ) ) /** * @brief Wildcard topic filter for matching job response messages. * This topic filter is used to match the responses from OTA service for OTA agent job requests. THe * topic filter is a reserved topic which is not subscribed with MQTT broker. * */ #define OTA_JOB_ACCEPTED_RESPONSE_TOPIC_FILTER OTA_TOPIC_PREFIX "jobs/$next/get/accepted" /** * @brief Length of job accepted response topic filter. */ #define OTA_JOB_ACCEPTED_RESPONSE_TOPIC_FILTER_LENGTH ( ( uint16_t ) ( sizeof( OTA_JOB_ACCEPTED_RESPONSE_TOPIC_FILTER ) - 1 ) ) /** * @brief Wildcard topic filter for matching OTA data packets. * The filter is used to match the constructed data stream topic filter from OTA agent and register * appropriate callback for it. */ #define OTA_DATA_STREAM_TOPIC_FILTER OTA_TOPIC_PREFIX "streams/#" /** * @brief Length of data stream topic filter. */ #define OTA_DATA_STREAM_TOPIC_FILTER_LENGTH ( ( uint16_t ) ( sizeof( OTA_DATA_STREAM_TOPIC_FILTER ) - 1 ) ) /** * @brief Starting index of client identifier within OTA topic. */ #define OTA_TOPIC_CLIENT_IDENTIFIER_START_IDX ( 12U ) /** * @brief Used to clear bits in a task's notification value. */ #define otaexampleMAX_UINT32 ( 0xffffffff ) /** * @brief Task priority of OTA agent. */ #define otaexampleAGENT_TASK_PRIORITY ( tskIDLE_PRIORITY + 1 ) /** * @brief Maximum stack size of OTA agent task. */ #define otaexampleAGENT_TASK_STACK_SIZE ( 4096 ) /** * @brief The version for the firmware which is running. OTA agent uses this * version number to perform anti-rollback validation. The firmware version for the * download image should be higher than the current version, otherwise the new image is * rejected in self test phase. */ #define APP_VERSION_MAJOR 0 #define APP_VERSION_MINOR 9 #define APP_VERSION_BUILD 2 /** * @brief Defines the structure to use as the command callback context in this * demo. */ struct MQTTAgentCommandContext { MQTTStatus_t xReturnStatus; TaskHandle_t xTaskToNotify; void * pArgs; }; /** * @brief Function used by OTA agent to publish control messages to the MQTT broker. * * The implementation uses MQTT agent to queue a publish request. It then waits * for the request complete notification from the agent. The notification along with result of the * operation is sent back to the caller task using xTaskNotify API. For publishes involving QOS 1 and * QOS2 the operation is complete once an acknowledgment (PUBACK) is received. OTA agent uses this function * to fetch new job, provide status update and send other control related messages to the MQTT broker. * * @param[in] pacTopic Topic to publish the control packet to. * @param[in] topicLen Length of the topic string. * @param[in] pMsg Message to publish. * @param[in] msgSize Size of the message to publish. * @param[in] qos Qos for the publish. * @return OtaMqttSuccess if successful. Appropriate error code otherwise. */ static OtaMqttStatus_t prvMQTTPublish( const char * const pacTopic, uint16_t topicLen, const char * pMsg, uint32_t msgSize, uint8_t qos ); /** * @brief Function used by OTA agent to subscribe for a control or data packet from the MQTT broker. * * The implementation queues a SUBSCRIBE request for the topic filter with the MQTT agent. It then waits for * a notification of the request completion. Notification will be sent back to caller task, * using xTaskNotify APIs. MQTT agent also stores a callback provided by this function with * the associated topic filter. The callback will be used to * route any data received on the matching topic to the OTA agent. OTA agent uses this function * to subscribe to all topic filters necessary for receiving job related control messages as * well as firmware image chunks from MQTT broker. * * @param[in] pTopicFilter The topic filter used to subscribe for packets. * @param[in] topicFilterLength Length of the topic filter string. * @param[in] ucQoS Intended qos value for the messages received on this topic. * @return OtaMqttSuccess if successful. Appropriate error code otherwise. */ static OtaMqttStatus_t prvMQTTSubscribe( const char * pTopicFilter, uint16_t topicFilterLength, uint8_t ucQoS ); /** * @brief Function is used by OTA agent to unsubscribe a topicfilter from MQTT broker. * * The implementation queues an UNSUBSCRIBE request for the topic filter with the MQTT agent. It then waits * for a successful completion of the request from the agent. Notification along with results of * operation is sent using xTaskNotify API to the caller task. MQTT agent also removes the topic filter * subscription from its memory so any future * packets on this topic will not be routed to the OTA agent. * * @param[in] pTopicFilter Topic filter to be unsubscribed. * @param[in] topicFilterLength Length of the topic filter. * @param[in] ucQos Qos value for the topic. * @return OtaMqttSuccess if successful. Appropriate error code otherwise. * */ static OtaMqttStatus_t prvMQTTUnsubscribe( const char * pTopicFilter, uint16_t topicFilterLength, uint8_t ucQoS ); /** * @brief Fetch an unused OTA event buffer from the pool. * * Demo uses a simple statically allocated array of fixed size event buffers. The * number of event buffers is configured by the param otaconfigMAX_NUM_OTA_DATA_BUFFERS * within ota_config.h. This function is used to fetch a free buffer from the pool for processing * by the OTA agent task. It uses a mutex for thread safe access to the pool. * * @return A pointer to an unused buffer. NULL if there are no buffers available. */ static OtaEventData_t * prvOTAEventBufferGet( void ); /** * @brief Free an event buffer back to pool * * OTA demo uses a statically allocated array of fixed size event buffers . The * number of event buffers is configured by the param otaconfigMAX_NUM_OTA_DATA_BUFFERS * within ota_config.h. The function is used by the OTA application callback to free a buffer, * after OTA agent has completed processing with the event. The access to the pool is made thread safe * using a mutex. * * @param[in] pxBuffer Pointer to the buffer to be freed. */ static void prvOTAEventBufferFree( OtaEventData_t * const pxBuffer ); /** * @brief The function which runs the OTA agent task. * * The function runs the OTA Agent Event processing loop, which waits for * any events for OTA agent and process them. The loop never returns until the OTA agent * is shutdown. The tasks exits gracefully by freeing up all resources in the event of an * OTA agent shutdown. * * @param[in] pvParam Any parameters to be passed to OTA agent task. */ static void prvOTAAgentTask( void * pvParam ); /** * @brief The function which runs the OTA demo task. * * The demo task initializes the OTA agent an loops until OTA agent is shutdown. * It reports OTA update statistics (which includes number of blocks received, processed and dropped), * at regular intervals. * * @param[in] pvParam Any parameters to be passed to OTA demo task. */ static void prvOTADemoTask( void * pvParam ); /** * @brief Callback invoked for firmware image chunks received from MQTT broker. * * Function gets invoked for the firmware image blocks received on OTA data stream topic. * The function is registered with MQTT agent's subscription manger along with the * topic filter for data stream. For each packet received, the * function fetches a free event buffer from the pool and queues the firmware image chunk for * OTA agent task processing. * * @param[in] pxSubscriptionContext Context which is passed unmodified from the MQTT agent. * @param[in] pPublishInfo Pointer to the structure containing the details of the MQTT packet. */ static void prvProcessIncomingData( void * pxSubscriptionContext, MQTTPublishInfo_t * pPublishInfo ); /** * @brief Callback invoked for job control messages from MQTT broker. * * Callback gets invoked for any OTA job related control messages from the MQTT broker. * The function is registered with MQTT agent's subscription manger along with the topic filter for * job stream. The function fetches a free event buffer from the pool and queues the appropriate event type * based on the control message received. * * @param[in] pxSubscriptionContext Context which is passed unmodified from the MQTT agent. * @param[in] pPublishInfo Pointer to the structure containing the details of MQTT packet. */ static void prvProcessIncomingJobMessage( void * pxSubscriptionContext, MQTTPublishInfo_t * pPublishInfo ); /** * @brief Matches a client identifier within an OTA topic. * This function is used to validate that topic is valid and intended for this device thing name. * * @param[in] pTopic Pointer to the topic * @param[in] topicNameLength length of the topic * @param[in] pClientIdentifier Client identifier, should be null terminated. * @param[in] clientIdentifierLength Length of the client identifier. * @return true if client identifier is found within the topic at the right index. */ static bool prvMatchClientIdentifierInTopic( const char * pTopic, size_t topicNameLength, const char * pClientIdentifier, size_t clientIdentifierLength ); /** * @brief Default callback used to receive default messages for OTA. * * The callback is not subscribed with MQTT broker, but only with local subscription manager. * A wildcard OTA job topic is used for subscription so that all unsolicited messages related to OTA is * forwarded to this callback for filtration. Right now the callback is used to filter responses to job requests * from the OTA service. * * @param[in] pvIncomingPublishCallbackContext MQTT context which stores the connection. * @param[in] pPublishInfo MQTT packet that stores the information of the file block. * * @return true if the message is processed by OTA. */ bool vOTAProcessMessage( void * pvIncomingPublishCallbackContext, MQTTPublishInfo_t * pxPublishInfo ); /** * @brief Buffer used to store the firmware image file path. * Buffer is passed to the OTA agent during initialization. */ static uint8_t updateFilePath[ otaexampleMAX_FILE_PATH_SIZE ]; /** * @brief Buffer used to store the code signing certificate file path. * Buffer is passed to the OTA agent during initialization. */ static uint8_t certFilePath[ otaexampleMAX_FILE_PATH_SIZE ]; /** * @brief Buffer used to store the name of the data stream. * Buffer is passed to the OTA agent during initialization. */ static uint8_t streamName[ otaexampleMAX_STREAM_NAME_SIZE ]; /** * @brief Buffer used decode the CBOR message from the MQTT payload. * Buffer is passed to the OTA agent during initialization. */ static uint8_t decodeMem[ ( 1U << otaconfigLOG2_FILE_BLOCK_SIZE ) ]; /** * @brief Application buffer used to store the bitmap for requesting firmware image * chunks from MQTT broker. Buffer is passed to the OTA agent during initialization. */ static uint8_t bitmap[ OTA_MAX_BLOCK_BITMAP_SIZE ]; /** * @brief A statically allocated array of event buffers used by the OTA agent. * Maximum number of buffers are determined by how many chunks are requested * by OTA agent at a time along with an extra buffer to handle control message. * The size of each buffer is determined by the maximum size of firmware image * chunk, and other metadata send along with the chunk. */ static OtaEventData_t eventBuffer[ otaconfigMAX_NUM_OTA_DATA_BUFFERS ] = { 0 }; /* * @brief Mutex used to manage thread safe access of OTA event buffers. */ static SemaphoreHandle_t xBufferSemaphore; /** * @brief Static handle used for MQTT agent context. */ extern MQTTAgentContext_t xGlobalMqttAgentContext; /** * @brief Structure containing all application allocated buffers used by the OTA agent. * Structure is passed to the OTA agent during initialization. */ static OtaAppBuffer_t otaBuffer = { .pUpdateFilePath = updateFilePath, .updateFilePathsize = otaexampleMAX_FILE_PATH_SIZE, .pCertFilePath = certFilePath, .certFilePathSize = otaexampleMAX_FILE_PATH_SIZE, .pStreamName = streamName, .streamNameSize = otaexampleMAX_STREAM_NAME_SIZE, .pDecodeMemory = decodeMem, .decodeMemorySize = ( 1U << otaconfigLOG2_FILE_BLOCK_SIZE ), .pFileBitmap = bitmap, .fileBitmapSize = OTA_MAX_BLOCK_BITMAP_SIZE }; /** * @brief Structure used for encoding firmware version. */ const AppVersion32_t appFirmwareVersion = { .u.x.major = APP_VERSION_MAJOR, .u.x.minor = APP_VERSION_MINOR, .u.x.build = APP_VERSION_BUILD, }; /*-----------------------------------------------------------*/ static void prvOTAEventBufferFree( OtaEventData_t * const pxBuffer ) { if( xSemaphoreTake( xBufferSemaphore, portMAX_DELAY ) == pdTRUE ) { pxBuffer->bufferUsed = false; ( void ) xSemaphoreGive( xBufferSemaphore ); } else { LogError( ( "Failed to get buffer semaphore." ) ); } } /*-----------------------------------------------------------*/ static OtaEventData_t * prvOTAEventBufferGet( void ) { uint32_t ulIndex = 0; OtaEventData_t * pFreeBuffer = NULL; if( xSemaphoreTake( xBufferSemaphore, portMAX_DELAY ) == pdTRUE ) { for( ulIndex = 0; ulIndex < otaconfigMAX_NUM_OTA_DATA_BUFFERS; ulIndex++ ) { if( eventBuffer[ ulIndex ].bufferUsed == false ) { eventBuffer[ ulIndex ].bufferUsed = true; pFreeBuffer = &eventBuffer[ ulIndex ]; break; } } ( void ) xSemaphoreGive( xBufferSemaphore ); } else { LogError( ( "Failed to get buffer semaphore." ) ); } return pFreeBuffer; } /*-----------------------------------------------------------*/ static void prvOTAAgentTask( void * pvParam ) { OTA_EventProcessingTask( pvParam ); vTaskDelete( NULL ); } /*-----------------------------------------------------------*/ /** * @brief The OTA agent has completed the update job or it is in * self test mode. If it was accepted, we want to activate the new image. * This typically means we should reset the device to run the new firmware. * If now is not a good time to reset the device, it may be activated later * by your user code. If the update was rejected, just return without doing * anything and we will wait for another job. If it reported that we should * start test mode, normally we would perform some kind of system checks to * make sure our new firmware does the basic things we think it should do * but we will just go ahead and set the image as accepted for demo purposes. * The accept function varies depending on your platform. Refer to the OTA * PAL implementation for your platform in aws_ota_pal.c to see what it * does for you. * * @param[in] event Specify if this demo is running with the AWS IoT * MQTT server. Set this to `false` if using another MQTT server. * @param[in] pData Data associated with the event. * @return None. */ static void otaAppCallback( OtaJobEvent_t event, const void * pData ) { OtaErr_t err = OtaErrUninitialized; switch( event ) { case OtaJobEventActivate: LogInfo( ( "Received OtaJobEventActivate callback from OTA Agent." ) ); /** * Activate the new firmware image immediately. Applications can choose to postpone * the activation to a later stage if needed. */ err = OTA_ActivateNewImage(); /** * Activation of the new image failed. This indicates an error that requires a follow * up through manual activation by resetting the device. The demo reports the error * and shuts down the OTA agent. */ LogError( ( "New image activation failed." ) ); /* Shutdown OTA Agent, if it is required that the unsubscribe operations are not * performed while shutting down please set the second parameter to 0 instead of 1. */ OTA_Shutdown( 0, 1 ); break; case OtaJobEventFail: /** * No user action is needed here. OTA agent handles the job failure event. */ LogInfo( ( "Received an OtaJobEventFail notification from OTA Agent." ) ); break; case OtaJobEventStartTest: /* This demo just accepts the image since it was a good OTA update and networking * and services are all working (or we would not have made it this far). If this * were some custom device that wants to test other things before validating new * image, this would be the place to kick off those tests before calling * OTA_SetImageState() with the final result of either accepted or rejected. */ LogInfo( ( "Received OtaJobEventStartTest callback from OTA Agent." ) ); err = OTA_SetImageState( OtaImageStateAccepted ); if( err == OtaErrNone ) { LogInfo( ( "New image validation succeeded in self test mode." ) ); } else { LogError( ( "Failed to set image state as accepted with error %d.", err ) ); } break; case OtaJobEventProcessed: LogDebug( ( "OTA Event processing completed. Freeing the event buffer to pool." ) ); configASSERT( pData != NULL ); prvOTAEventBufferFree( ( OtaEventData_t * ) pData ); break; case OtaJobEventSelfTestFailed: LogDebug( ( "Received OtaJobEventSelfTestFailed callback from OTA Agent." ) ); /* Requires manual activation of previous image as self-test for * new image downloaded failed.*/ LogError( ( "OTA Self-test failed for new image. shutting down OTA Agent." ) ); /* Shutdown OTA Agent, if it is required that the unsubscribe operations are not * performed while shutting down please set the second parameter to 0 instead of 1. */ OTA_Shutdown( 0, 1 ); break; default: LogWarn( ( "Received an unhandled callback event from OTA Agent, event = %d", event ) ); break; } } /*-----------------------------------------------------------*/ static void prvProcessIncomingData( void * pxSubscriptionContext, MQTTPublishInfo_t * pPublishInfo ) { configASSERT( pPublishInfo != NULL ); ( void ) pxSubscriptionContext; OtaEventData_t * pData; OtaEventMsg_t eventMsg = { 0 }; LogDebug( ( "Received OTA image block, size %d.\n\n", pPublishInfo->payloadLength ) ); configASSERT( pPublishInfo->payloadLength <= OTA_DATA_BLOCK_SIZE ); pData = prvOTAEventBufferGet(); if( pData != NULL ) { memcpy( pData->data, pPublishInfo->pPayload, pPublishInfo->payloadLength ); pData->dataLength = pPublishInfo->payloadLength; eventMsg.eventId = OtaAgentEventReceivedFileBlock; eventMsg.pEventData = pData; /* Send job document received event. */ OTA_SignalEvent( &eventMsg ); } else { LogError( ( "Error: No OTA data buffers available.\r\n" ) ); } } /*-----------------------------------------------------------*/ static void prvProcessIncomingJobMessage( void * pxSubscriptionContext, MQTTPublishInfo_t * pPublishInfo ) { OtaEventData_t * pData; OtaEventMsg_t eventMsg = { 0 }; ( void ) pxSubscriptionContext; configASSERT( pPublishInfo != NULL ); LogInfo( ( "Received job message callback, size %d.\n\n", pPublishInfo->payloadLength ) ); configASSERT( pPublishInfo->payloadLength <= OTA_DATA_BLOCK_SIZE ); pData = prvOTAEventBufferGet(); if( pData != NULL ) { memcpy( pData->data, pPublishInfo->pPayload, pPublishInfo->payloadLength ); pData->dataLength = pPublishInfo->payloadLength; eventMsg.eventId = OtaAgentEventReceivedJobDocument; eventMsg.pEventData = pData; /* Send job document received event. */ OTA_SignalEvent( &eventMsg ); } else { LogError( ( "Error: No OTA data buffers available.\r\n" ) ); } } /*-----------------------------------------------------------*/ static bool prvMatchClientIdentifierInTopic( const char * pTopic, size_t topicNameLength, const char * pClientIdentifier, size_t clientIdentifierLength ) { bool isMatch = false; size_t idx, matchIdx = 0; for( idx = OTA_TOPIC_CLIENT_IDENTIFIER_START_IDX; idx < topicNameLength; idx++ ) { if( matchIdx == clientIdentifierLength ) { if( pTopic[ idx ] == '/' ) { isMatch = true; } break; } else { if( pClientIdentifier[ matchIdx ] != pTopic[ idx ] ) { break; } } matchIdx++; } return isMatch; } /*-----------------------------------------------------------*/ bool vOTAProcessMessage( void * pvIncomingPublishCallbackContext, MQTTPublishInfo_t * pxPublishInfo ) { bool isMatch = false; ( void ) MQTT_MatchTopic( pxPublishInfo->pTopicName, pxPublishInfo->topicNameLength, OTA_JOB_ACCEPTED_RESPONSE_TOPIC_FILTER, OTA_JOB_ACCEPTED_RESPONSE_TOPIC_FILTER_LENGTH, &isMatch ); if( isMatch == true ) { /* validate thing name */ isMatch = prvMatchClientIdentifierInTopic( pxPublishInfo->pTopicName, pxPublishInfo->topicNameLength, democonfigCLIENT_IDENTIFIER, strlen( democonfigCLIENT_IDENTIFIER ) ); if( isMatch == true ) { prvProcessIncomingJobMessage( pvIncomingPublishCallbackContext, pxPublishInfo ); } } if( isMatch == false ) { ( void ) MQTT_MatchTopic( pxPublishInfo->pTopicName, pxPublishInfo->topicNameLength, OTA_JOB_NOTIFY_TOPIC_FILTER, OTA_JOB_NOTIFY_TOPIC_FILTER_LENGTH, &isMatch ); if( isMatch == true ) { prvProcessIncomingJobMessage( pvIncomingPublishCallbackContext, pxPublishInfo ); } } if( isMatch == false ) { ( void ) MQTT_MatchTopic( pxPublishInfo->pTopicName, pxPublishInfo->topicNameLength, OTA_DATA_STREAM_TOPIC_FILTER, OTA_DATA_STREAM_TOPIC_FILTER_LENGTH, &isMatch ); if( isMatch == true ) { prvProcessIncomingData( pvIncomingPublishCallbackContext, pxPublishInfo ); } } return isMatch; } /*-----------------------------------------------------------*/ static void prvCommandCallback( MQTTAgentCommandContext_t * pCommandContext, MQTTAgentReturnInfo_t * pxReturnInfo ) { pCommandContext->xReturnStatus = pxReturnInfo->returnCode; if( pCommandContext->xTaskToNotify != NULL ) { xTaskNotify( pCommandContext->xTaskToNotify, ( uint32_t ) ( pxReturnInfo->returnCode ), eSetValueWithOverwrite ); } } /*-----------------------------------------------------------*/ /*-----------------------------------------------------------*/ static OtaMqttStatus_t prvMQTTSubscribe( const char * pTopicFilter, uint16_t topicFilterLength, uint8_t ucQoS ) { MQTTStatus_t mqttStatus; uint32_t ulNotifiedValue; MQTTAgentSubscribeArgs_t xSubscribeArgs = { 0 }; MQTTSubscribeInfo_t xSubscribeInfo = { 0 }; BaseType_t result; MQTTAgentCommandInfo_t xCommandParams = { 0 }; MQTTAgentCommandContext_t xApplicationDefinedContext = { 0 }; OtaMqttStatus_t otaRet = OtaMqttSuccess; configASSERT( pTopicFilter != NULL ); configASSERT( topicFilterLength > 0 ); xSubscribeInfo.pTopicFilter = pTopicFilter; xSubscribeInfo.topicFilterLength = topicFilterLength; xSubscribeInfo.qos = ucQoS; xSubscribeArgs.pSubscribeInfo = &xSubscribeInfo; xSubscribeArgs.numSubscriptions = 1; xApplicationDefinedContext.xTaskToNotify = xTaskGetCurrentTaskHandle(); xCommandParams.blockTimeMs = otaexampleMQTT_TIMEOUT_MS; xCommandParams.cmdCompleteCallback = prvCommandCallback; xCommandParams.pCmdCompleteCallbackContext = ( void * ) &xApplicationDefinedContext; xTaskNotifyStateClear( NULL ); mqttStatus = MQTTAgent_Subscribe( &xGlobalMqttAgentContext, &xSubscribeArgs, &xCommandParams ); /* Wait for command to complete so MQTTSubscribeInfo_t remains in scope for the * duration of the command. */ if( mqttStatus == MQTTSuccess ) { result = xTaskNotifyWait( 0, otaexampleMAX_UINT32, &ulNotifiedValue, pdMS_TO_TICKS( otaexampleMQTT_TIMEOUT_MS ) ); if( result == pdTRUE ) { mqttStatus = xApplicationDefinedContext.xReturnStatus; } else { mqttStatus = MQTTRecvFailed; } } if( mqttStatus != MQTTSuccess ) { LogError( ( "Failed to SUBSCRIBE to topic with error = %u.", mqttStatus ) ); otaRet = OtaMqttSubscribeFailed; } else { LogInfo( ( "Subscribed to topic %.*s.\n\n", topicFilterLength, pTopicFilter ) ); otaRet = OtaMqttSuccess; } return otaRet; } static OtaMqttStatus_t prvMQTTPublish( const char * const pacTopic, uint16_t topicLen, const char * pMsg, uint32_t msgSize, uint8_t qos ) { OtaMqttStatus_t otaRet = OtaMqttSuccess; BaseType_t result; MQTTStatus_t mqttStatus = MQTTBadParameter; MQTTPublishInfo_t publishInfo = { 0 }; MQTTAgentCommandInfo_t xCommandParams = { 0 }; MQTTAgentCommandContext_t xCommandContext = { 0 }; publishInfo.pTopicName = pacTopic; publishInfo.topicNameLength = topicLen; publishInfo.qos = qos; publishInfo.pPayload = pMsg; publishInfo.payloadLength = msgSize; xCommandContext.xTaskToNotify = xTaskGetCurrentTaskHandle(); xTaskNotifyStateClear( NULL ); xCommandParams.blockTimeMs = otaexampleMQTT_TIMEOUT_MS; xCommandParams.cmdCompleteCallback = prvCommandCallback; xCommandParams.pCmdCompleteCallbackContext = ( void * ) &xCommandContext; mqttStatus = MQTTAgent_Publish( &xGlobalMqttAgentContext, &publishInfo, &xCommandParams ); /* Wait for command to complete so MQTTSubscribeInfo_t remains in scope for the * duration of the command. */ if( mqttStatus == MQTTSuccess ) { result = xTaskNotifyWait( 0, otaexampleMAX_UINT32, NULL, pdMS_TO_TICKS( otaexampleMQTT_TIMEOUT_MS ) ); if( result != pdTRUE ) { mqttStatus = MQTTSendFailed; } else { mqttStatus = xCommandContext.xReturnStatus; } } if( mqttStatus != MQTTSuccess ) { LogError( ( "Failed to send PUBLISH packet to broker with error = %u.", mqttStatus ) ); otaRet = OtaMqttPublishFailed; } else { LogInfo( ( "Sent PUBLISH packet to broker %.*s to broker.\n\n", topicLen, pacTopic ) ); otaRet = OtaMqttSuccess; } return otaRet; } static OtaMqttStatus_t prvMQTTUnsubscribe( const char * pTopicFilter, uint16_t topicFilterLength, uint8_t ucQoS ) { MQTTStatus_t mqttStatus; uint32_t ulNotifiedValue; MQTTAgentSubscribeArgs_t xSubscribeArgs = { 0 }; MQTTSubscribeInfo_t xSubscribeInfo = { 0 }; BaseType_t result; MQTTAgentCommandInfo_t xCommandParams = { 0 }; MQTTAgentCommandContext_t xApplicationDefinedContext = { 0 }; OtaMqttStatus_t otaRet = OtaMqttSuccess; configASSERT( pTopicFilter != NULL ); configASSERT( topicFilterLength > 0 ); xSubscribeInfo.pTopicFilter = pTopicFilter; xSubscribeInfo.topicFilterLength = topicFilterLength; xSubscribeInfo.qos = ucQoS; xSubscribeArgs.pSubscribeInfo = &xSubscribeInfo; xSubscribeArgs.numSubscriptions = 1; xApplicationDefinedContext.xTaskToNotify = xTaskGetCurrentTaskHandle(); xCommandParams.blockTimeMs = otaexampleMQTT_TIMEOUT_MS; xCommandParams.cmdCompleteCallback = prvCommandCallback; xCommandParams.pCmdCompleteCallbackContext = ( void * ) &xApplicationDefinedContext; LogInfo( ( " Unsubscribing to topic filter: %s", pTopicFilter ) ); xTaskNotifyStateClear( NULL ); mqttStatus = MQTTAgent_Unsubscribe( &xGlobalMqttAgentContext, &xSubscribeArgs, &xCommandParams ); /* Wait for command to complete so MQTTSubscribeInfo_t remains in scope for the * duration of the command. */ if( mqttStatus == MQTTSuccess ) { result = xTaskNotifyWait( 0, otaexampleMAX_UINT32, &ulNotifiedValue, pdMS_TO_TICKS( otaexampleMQTT_TIMEOUT_MS ) ); if( result == pdTRUE ) { mqttStatus = xApplicationDefinedContext.xReturnStatus; } else { mqttStatus = MQTTRecvFailed; } } if( mqttStatus != MQTTSuccess ) { LogError( ( "Failed to UNSUBSCRIBE from topic %.*s with error = %u.", topicFilterLength, pTopicFilter, mqttStatus ) ); otaRet = OtaMqttUnsubscribeFailed; } else { LogInfo( ( "UNSUBSCRIBED from topic %.*s.\n\n", topicFilterLength, pTopicFilter ) ); otaRet = OtaMqttSuccess; } return otaRet; } /*-----------------------------------------------------------*/ static void setOtaInterfaces( OtaInterfaces_t * pOtaInterfaces ) { configASSERT( pOtaInterfaces != NULL ); /* Initialize OTA library OS Interface. */ pOtaInterfaces->os.event.init = OtaInitEvent_FreeRTOS; pOtaInterfaces->os.event.send = OtaSendEvent_FreeRTOS; pOtaInterfaces->os.event.recv = OtaReceiveEvent_FreeRTOS; pOtaInterfaces->os.event.deinit = OtaDeinitEvent_FreeRTOS; pOtaInterfaces->os.timer.start = OtaStartTimer_FreeRTOS; pOtaInterfaces->os.timer.stop = OtaStopTimer_FreeRTOS; pOtaInterfaces->os.timer.delete = OtaDeleteTimer_FreeRTOS; pOtaInterfaces->os.mem.malloc = Malloc_FreeRTOS; pOtaInterfaces->os.mem.free = Free_FreeRTOS; /* Initialize the OTA library MQTT Interface.*/ pOtaInterfaces->mqtt.subscribe = prvMQTTSubscribe; pOtaInterfaces->mqtt.publish = prvMQTTPublish; pOtaInterfaces->mqtt.unsubscribe = prvMQTTUnsubscribe; /* Initialize the OTA library PAL Interface.*/ pOtaInterfaces->pal.getPlatformImageState = otaPal_GetPlatformImageState; pOtaInterfaces->pal.setPlatformImageState = otaPal_SetPlatformImageState; pOtaInterfaces->pal.writeBlock = otaPal_WriteBlock; pOtaInterfaces->pal.activate = otaPal_ActivateNewImage; pOtaInterfaces->pal.closeFile = otaPal_CloseFile; pOtaInterfaces->pal.reset = otaPal_ResetDevice; pOtaInterfaces->pal.abort = otaPal_Abort; pOtaInterfaces->pal.createFile = otaPal_CreateFileForRx; } static void prvOTADemoTask( void * pvParam ) { ( void ) pvParam; /* FreeRTOS APIs return status. */ BaseType_t xResult = pdPASS; /* OTA library return status. */ OtaErr_t otaRet = OtaErrNone; /* OTA event message used for sending event to OTA Agent.*/ OtaEventMsg_t eventMsg = { 0 }; /* OTA interface context required for library interface functions.*/ OtaInterfaces_t otaInterfaces; /* OTA library packet statistics per job.*/ OtaAgentStatistics_t otaStatistics = { 0 }; /* OTA Agent state returned from calling OTA_GetAgentState.*/ OtaState_t state = OtaAgentStateStopped; /* Set OTA Library interfaces.*/ setOtaInterfaces( &otaInterfaces ); LogInfo( ( "OTA over MQTT demo, Application version %u.%u.%u", appFirmwareVersion.u.x.major, appFirmwareVersion.u.x.minor, appFirmwareVersion.u.x.build ) ); /****************************** Init OTA Library. ******************************/ xBufferSemaphore = xSemaphoreCreateMutex(); if( xBufferSemaphore == NULL ) { xResult = pdFAIL; } if( xResult == pdPASS ) { memset( eventBuffer, 0x00, sizeof( eventBuffer ) ); if( ( otaRet = OTA_Init( &otaBuffer, &otaInterfaces, ( const uint8_t * ) ( democonfigCLIENT_IDENTIFIER ), otaAppCallback ) ) != OtaErrNone ) { LogError( ( "Failed to initialize OTA Agent, exiting = %u.", otaRet ) ); xResult = pdFAIL; } } if( xResult == pdPASS ) { if( ( xResult = xTaskCreate( prvOTAAgentTask, "OTAAgentTask", otaexampleAGENT_TASK_STACK_SIZE, NULL, otaexampleAGENT_TASK_PRIORITY, NULL ) ) != pdPASS ) { LogError( ( "Failed to start OTA Agent task: " ",errno=%d", xResult ) ); } } /***************************Start OTA demo loop. ******************************/ if( xResult == pdPASS ) { /* Start the OTA Agent.*/ eventMsg.eventId = OtaAgentEventStart; OTA_SignalEvent( &eventMsg ); while( ( ( state = OTA_GetState() ) != OtaAgentStateStopped ) ) { /* Get OTA statistics for currently executing job. */ OTA_GetStatistics( &otaStatistics ); LogInfo( ( " Received: %u Queued: %u Processed: %u Dropped: %u", otaStatistics.otaPacketsReceived, otaStatistics.otaPacketsQueued, otaStatistics.otaPacketsProcessed, otaStatistics.otaPacketsDropped ) ); vTaskDelay( pdMS_TO_TICKS( otaexampleTASK_DELAY_MS ) ); } } LogInfo( ( "OTA agent task stopped. Exiting OTA demo." ) ); vTaskDelete( NULL ); } void vStartOTACodeSigningDemo( configSTACK_DEPTH_TYPE uxStackSize, UBaseType_t uxPriority ) { BaseType_t xResult; if( ( xResult = xTaskCreate( prvOTADemoTask, "OTADemoTask", uxStackSize, NULL, uxPriority, NULL ) ) != pdPASS ) { LogError( ( "Failed to start OTA task: " ",errno=%d", xResult ) ); } configASSERT( xResult == pdPASS ); } void vSuspendOTACodeSigningDemo( void ) { if( ( OTA_GetState() != OtaAgentStateSuspended ) && ( OTA_GetState() != OtaAgentStateStopped ) ) { OTA_Suspend(); while( ( OTA_GetState() != OtaAgentStateSuspended ) && ( OTA_GetState() != OtaAgentStateStopped ) ) { vTaskDelay( pdMS_TO_TICKS( otaexampleTASK_DELAY_MS ) ); } } } void vResumeOTACodeSigningDemo( void ) { if( OTA_GetState() == OtaAgentStateSuspended ) { OTA_Resume(); while( OTA_GetState() == OtaAgentStateSuspended ) { vTaskDelay( pdMS_TO_TICKS( otaexampleTASK_DELAY_MS ) ); } } }