/*
 * ESP32-C3 Featured FreeRTOS IoT Integration V202204.00
 * Copyright (C) 2022 Amazon.com, Inc. or its affiliates.  All Rights Reserved.
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy of
 * this software and associated documentation files (the "Software"), to deal in
 * the Software without restriction, including without limitation the rights to
 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
 * the Software, and to permit persons to whom the Software is furnished to do so,
 * subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in all
 * copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
 * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
 * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
 * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 *
 * https://www.FreeRTOS.org
 * https://github.com/FreeRTOS
 *
 */

/*
 * This file demonstrates numerous tasks all of which use the core-MQTT Agent API
 * to send unique MQTT payloads to unique topics over the same MQTT connection
 * to the same coreMQTT-Agent.
 *
 * Each created task is a unique instance of the task implemented by
 * prvSubscribePublishUnsubscribeTask().  prvSubscribePublishUnsubscribeTask()
 * subscribes to a topic, publishes a message to the same
 * topic, receives the message, then unsubscribes from the topic in a loop.
 * The command context sent to MQTTAgent_Publish(), MQTTAgent_Subscribe(), and
 * MQTTAgent_Unsubscribe contains a unique number that is sent back to the task
 * as a task notification from the callback function that executes when the
 * operations are acknowledged (or just sent in the case of QoS 0).  The
 * task checks the number it receives from the callback equals the number it
 * previously set in the command context before printing out either a success
 * or failure message.
 */

/* Includes *******************************************************************/

/* Standard includes. */
#include <string.h>
#include <stdio.h>
#include <assert.h>

/* FreeRTOS includes. */
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "freertos/event_groups.h"

/* ESP-IDF includes. */
#include "esp_log.h"
#include "esp_event.h"
#include "sdkconfig.h"

/* coreMQTT library include. */
#include "core_mqtt.h"

/* coreMQTT-Agent include. */
#include "core_mqtt_agent.h"

/* coreMQTT-Agent network manager include. */
#include "core_mqtt_agent_manager.h"
#include "core_mqtt_agent_manager_events.h"

/* Subscription manager include. */
#include "subscription_manager.h"

/* Public functions include. */
#include "sub_pub_unsub_demo.h"

/* Demo task configurations include. */
#include "sub_pub_unsub_demo_config.h"

/* Preprocessor definitions ***************************************************/

/* coreMQTT-Agent event group bit definitions */
#define CORE_MQTT_AGENT_CONNECTED_BIT              ( 1 << 0 )
#define CORE_MQTT_AGENT_OTA_NOT_IN_PROGRESS_BIT    ( 1 << 1 )

/* Struct definitions *********************************************************/

/**
 * @brief Defines the structure to use as the incoming publish callback context
 * when data from a subscribed topic is received.
 */
typedef struct IncomingPublishCallbackContext
{
    TaskHandle_t xTaskToNotify;
    uint32_t ulNotificationValue;
    char pcIncomingPublish[ subpubunsubconfigSTRING_BUFFER_LENGTH ];
} IncomingPublishCallbackContext_t;

/**
 * @brief Defines the structure to use as the command callback context in this
 * demo.
 */
struct MQTTAgentCommandContext
{
    MQTTStatus_t xReturnStatus;
    TaskHandle_t xTaskToNotify;
    uint32_t ulNotificationValue;
    IncomingPublishCallbackContext_t * pxIncomingPublishCallbackContext;
    void * pArgs;
};

/**
 * @brief Parameters for this task.
 */
struct DemoParams
{
    uint32_t ulTaskNumber;
};

/* Global variables ***********************************************************/

/**
 * @brief Logging tag for ESP-IDF logging functions.
 */
static const char * TAG = "sub_pub_unsub_demo";

/**
 * @brief Static handle used for MQTT agent context.
 */
extern MQTTAgentContext_t xGlobalMqttAgentContext;

/**
 * @brief The buffer to hold the topic filter. The topic is generated at runtime
 * by adding the task names.
 *
 * @note The topic strings must persist until unsubscribed.
 */
static char topicBuf[ subpubunsubconfigNUM_TASKS_TO_CREATE ][ subpubunsubconfigSTRING_BUFFER_LENGTH ];

/**
 * @brief The event group used to manage coreMQTT-Agent events.
 */
static EventGroupHandle_t xNetworkEventGroup;

/**
 * @brief The semaphore used to lock access to ulMessageID to eliminate a race
 * condition in which multiple tasks try to increment/get ulMessageID.
 */
static SemaphoreHandle_t xMessageIdSemaphore;

/**
 * @brief The message ID for the next message sent by this demo.
 */
static uint32_t ulMessageId = 0;

/* Static function declarations ***********************************************/

/**
 * @brief ESP Event Loop library handler for coreMQTT-Agent events.
 *
 * This handles events defined in core_mqtt_agent_events.h.
 */
static void prvCoreMqttAgentEventHandler( void * pvHandlerArg,
                                          esp_event_base_t xEventBase,
                                          int32_t lEventId,
                                          void * pvEventData );

/**
 * @brief Passed into MQTTAgent_Subscribe() as the callback to execute when the
 * broker ACKs the SUBSCRIBE message.  Its implementation sends a notification
 * to the task that called MQTTAgent_Subscribe() to let the task know the
 * SUBSCRIBE operation completed.  It also sets the xReturnStatus of the
 * structure passed in as the command's context to the value of the
 * xReturnStatus parameter - which enables the task to check the status of the
 * operation.
 *
 * See https://freertos.org/mqtt/mqtt-agent-demo.html#example_mqtt_api_call
 *
 * @param[in] pxCommandContext Context of the initial command.
 * @param[in].xReturnStatus The result of the command.
 */
static void prvSubscribeCommandCallback( MQTTAgentCommandContext_t * pxCommandContext,
                                         MQTTAgentReturnInfo_t * pxReturnInfo );

static void prvUnsubscribeCommandCallback( MQTTAgentCommandContext_t * pxCommandContext,
                                           MQTTAgentReturnInfo_t * pxReturnInfo );

/**
 * @brief Passed into MQTTAgent_Publish() as the callback to execute when the
 * broker ACKs the PUBLISH message.  Its implementation sends a notification
 * to the task that called MQTTAgent_Publish() to let the task know the
 * PUBLISH operation completed.  It also sets the xReturnStatus of the
 * structure passed in as the command's context to the value of the
 * xReturnStatus parameter - which enables the task to check the status of the
 * operation.
 *
 * See https://freertos.org/mqtt/mqtt-agent-demo.html#example_mqtt_api_call
 *
 * @param[in] pxCommandContext Context of the initial command.
 * @param[in].xReturnStatus The result of the command.
 */
static void prvPublishCommandCallback( MQTTAgentCommandContext_t * pxCommandContext,
                                       MQTTAgentReturnInfo_t * pxReturnInfo );

/**
 * @brief Called by the task to wait for a notification from a callback function
 * after the task first executes either MQTTAgent_Publish()* or
 * MQTTAgent_Subscribe().
 *
 * See https://freertos.org/mqtt/mqtt-agent-demo.html#example_mqtt_api_call
 *
 * @param[in] pxCommandContext Context of the initial command.
 * @param[out] pulNotifiedValue The task's notification value after it receives
 * a notification from the callback.
 *
 * @return pdTRUE if the task received a notification, otherwise pdFALSE.
 */
static BaseType_t prvWaitForNotification( uint32_t * pulNotifiedValue );

/**
 * @brief Passed into MQTTAgent_Subscribe() as the callback to execute when
 * there is an incoming publish on the topic being subscribed to.  Its
 * implementation just logs information about the incoming publish including
 * the publish messages source topic and payload.
 *
 * See https://freertos.org/mqtt/mqtt-agent-demo.html#example_mqtt_api_call
 *
 * @param[in] pvIncomingPublishCallbackContext Context of the initial command.
 * @param[in] pxPublishInfo Deserialized publish.
 */
static void prvIncomingPublishCallback( void * pvIncomingPublishCallbackContext,
                                        MQTTPublishInfo_t * pxPublishInfo );

/**
 * @brief Subscribe to the topic the demo task will also publish to - that
 * results in all outgoing publishes being published back to the task
 * (effectively echoed back).
 *
 * @param[in] pxIncomingPublishCallbackContext The callback context used when
 * data is received from pcTopicFilter.
 * @param[in] xQoS The quality of service (QoS) to use.  Can be zero or one
 * for all MQTT brokers.  Can also be QoS2 if supported by the broker.  AWS IoT
 * does not support QoS2.
 * @param[in] pcTopicFilter Topic filter to subscribe to.
 */
static void prvSubscribeToTopic( IncomingPublishCallbackContext_t * pxIncomingPublishCallbackContext,
                                 MQTTQoS_t xQoS,
                                 char * pcTopicFilter );

/**
 * @brief Unsubscribe to the topic the demo task will also publish to.
 *
 * @param[in] xQoS The quality of service (QoS) to use.  Can be zero or one
 * for all MQTT brokers.  Can also be QoS2 if supported by the broker.  AWS IoT
 * does not support QoS2.
 * @param[in] pcTopicFilter Topic filter to unsubscribe from.
 */
static void prvUnsubscribeToTopic( MQTTQoS_t xQoS,
                                   char * pcTopicFilter );

/**
 * @brief The function that implements the task demonstrated by this file.
 */
static void prvSubscribePublishUnsubscribeTask( void * pvParameters );

/* Static function definitions ************************************************/

static void prvCoreMqttAgentEventHandler( void * pvHandlerArg,
                                          esp_event_base_t xEventBase,
                                          int32_t lEventId,
                                          void * pvEventData )
{
    ( void ) pvHandlerArg;
    ( void ) xEventBase;
    ( void ) pvEventData;

    switch( lEventId )
    {
        case CORE_MQTT_AGENT_CONNECTED_EVENT:
            ESP_LOGI( TAG,
                      "coreMQTT-Agent connected." );
            xEventGroupSetBits( xNetworkEventGroup,
                                CORE_MQTT_AGENT_CONNECTED_BIT );
            break;

        case CORE_MQTT_AGENT_DISCONNECTED_EVENT:
            ESP_LOGI( TAG,
                      "coreMQTT-Agent disconnected. Preventing coreMQTT-Agent "
                      "commands from being enqueued." );
            xEventGroupClearBits( xNetworkEventGroup,
                                  CORE_MQTT_AGENT_CONNECTED_BIT );
            break;

        case CORE_MQTT_AGENT_OTA_STARTED_EVENT:
            ESP_LOGI( TAG,
                      "OTA started. Preventing coreMQTT-Agent commands from "
                      "being enqueued." );
            xEventGroupClearBits( xNetworkEventGroup,
                                  CORE_MQTT_AGENT_OTA_NOT_IN_PROGRESS_BIT );
            break;

        case CORE_MQTT_AGENT_OTA_STOPPED_EVENT:
            ESP_LOGI( TAG,
                      "OTA stopped. No longer preventing coreMQTT-Agent "
                      "commands from being enqueued." );
            xEventGroupSetBits( xNetworkEventGroup,
                                CORE_MQTT_AGENT_OTA_NOT_IN_PROGRESS_BIT );
            break;

        default:
            ESP_LOGE( TAG,
                      "coreMQTT-Agent event handler received unexpected event: %"PRIu32"",
                      lEventId );
            break;
    }
}

static void prvPublishCommandCallback( MQTTAgentCommandContext_t * pxCommandContext,
                                       MQTTAgentReturnInfo_t * pxReturnInfo )
{
    /* Store the result in the application defined context so the task that
     * initiated the publish can check the operation's status. */
    pxCommandContext->xReturnStatus = pxReturnInfo->returnCode;

    if( pxCommandContext->xTaskToNotify != NULL )
    {
        /* Send the context's ulNotificationValue as the notification value so
         * the receiving task can check the value it set in the context matches
         * the value it receives in the notification. */
        xTaskNotify( pxCommandContext->xTaskToNotify,
                     pxCommandContext->ulNotificationValue,
                     eSetValueWithOverwrite );
    }
}

static void prvSubscribeCommandCallback( MQTTAgentCommandContext_t * pxCommandContext,
                                         MQTTAgentReturnInfo_t * pxReturnInfo )
{
    bool xSubscriptionAdded = false;
    MQTTAgentSubscribeArgs_t * pxSubscribeArgs = ( MQTTAgentSubscribeArgs_t * ) pxCommandContext->pArgs;

    /* Store the result in the application defined context so the task that
     * initiated the subscribe can check the operation's status. */
    pxCommandContext->xReturnStatus = pxReturnInfo->returnCode;

    /* Check if the subscribe operation is a success. */
    if( pxReturnInfo->returnCode == MQTTSuccess )
    {
        /* Add subscription so that incoming publishes are routed to the application
         * callback. */
        xSubscriptionAdded = addSubscription( ( SubscriptionElement_t * ) xGlobalMqttAgentContext.pIncomingCallbackContext,
                                              pxSubscribeArgs->pSubscribeInfo->pTopicFilter,
                                              pxSubscribeArgs->pSubscribeInfo->topicFilterLength,
                                              prvIncomingPublishCallback,
                                              ( void * ) ( pxCommandContext->pxIncomingPublishCallbackContext ) );

        if( xSubscriptionAdded == false )
        {
            ESP_LOGE( TAG,
                      "Failed to register an incoming publish callback for topic %.*s.",
                      pxSubscribeArgs->pSubscribeInfo->topicFilterLength,
                      pxSubscribeArgs->pSubscribeInfo->pTopicFilter );
        }
    }

    if( pxCommandContext->xTaskToNotify != NULL )
    {
        xTaskNotify( pxCommandContext->xTaskToNotify,
                     pxCommandContext->ulNotificationValue,
                     eSetValueWithOverwrite );
    }
}

static void prvUnsubscribeCommandCallback( MQTTAgentCommandContext_t * pxCommandContext,
                                           MQTTAgentReturnInfo_t * pxReturnInfo )
{
    MQTTAgentSubscribeArgs_t * pxUnsubscribeArgs = ( MQTTAgentSubscribeArgs_t * ) pxCommandContext->pArgs;

    /* Store the result in the application defined context so the task that
     * initiated the subscribe can check the operation's status. */
    pxCommandContext->xReturnStatus = pxReturnInfo->returnCode;

    /* Check if the unsubscribe operation is a success. */
    if( pxReturnInfo->returnCode == MQTTSuccess )
    {
        /* Remove subscription from subscription manager. */
        removeSubscription( ( SubscriptionElement_t * ) xGlobalMqttAgentContext.pIncomingCallbackContext,
                            pxUnsubscribeArgs->pSubscribeInfo->pTopicFilter,
                            pxUnsubscribeArgs->pSubscribeInfo->topicFilterLength );
    }

    if( pxCommandContext->xTaskToNotify != NULL )
    {
        xTaskNotify( pxCommandContext->xTaskToNotify,
                     pxCommandContext->ulNotificationValue,
                     eSetValueWithOverwrite );
    }
}

static BaseType_t prvWaitForNotification( uint32_t * pulNotifiedValue )
{
    BaseType_t xReturn;

    /* Wait for this task to get notified, passing out the value it gets
     * notified with. */
    xReturn = xTaskNotifyWait( 0,
                               0,
                               pulNotifiedValue,
                               portMAX_DELAY );
    return xReturn;
}

static void prvIncomingPublishCallback( void * pvIncomingPublishCallbackContext,
                                        MQTTPublishInfo_t * pxPublishInfo )
{
    IncomingPublishCallbackContext_t * pxIncomingPublishCallbackContext = ( IncomingPublishCallbackContext_t * ) pvIncomingPublishCallbackContext;

    /* Create a message that contains the incoming MQTT payload to the logger,
     * terminating the string first. */
    if( pxPublishInfo->payloadLength < subpubunsubconfigSTRING_BUFFER_LENGTH )
    {
        memcpy( ( void * ) ( pxIncomingPublishCallbackContext->pcIncomingPublish ),
                pxPublishInfo->pPayload,
                pxPublishInfo->payloadLength );

        ( pxIncomingPublishCallbackContext->pcIncomingPublish )[ pxPublishInfo->payloadLength ] = 0x00;
    }
    else
    {
        memcpy( ( void * ) ( pxIncomingPublishCallbackContext->pcIncomingPublish ),
                pxPublishInfo->pPayload,
                subpubunsubconfigSTRING_BUFFER_LENGTH );

        ( pxIncomingPublishCallbackContext->pcIncomingPublish )[ subpubunsubconfigSTRING_BUFFER_LENGTH - 1 ] = 0x00;
    }

    xTaskNotify( pxIncomingPublishCallbackContext->xTaskToNotify,
                 pxIncomingPublishCallbackContext->ulNotificationValue,
                 eSetValueWithOverwrite );
}

static void prvPublishToTopic( MQTTQoS_t xQoS,
                               char * pcTopicName,
                               char * pcPayload )
{
    uint32_t ulPublishMessageId, ulNotifiedValue = 0;

    MQTTStatus_t xCommandAdded;
    BaseType_t xCommandAcknowledged = pdFALSE;

    MQTTPublishInfo_t xPublishInfo = { 0 };

    MQTTAgentCommandContext_t xCommandContext = { 0 };
    MQTTAgentCommandInfo_t xCommandParams = { 0 };

    xTaskNotifyStateClear( NULL );

    /* Create a unique number for the publish that is about to be sent.
     * This number is used in the command context and is sent back to this task
     * as a notification in the callback that's executed upon receipt of the
     * publish from coreMQTT-Agent.
     * That way this task can match an acknowledgment to the message being sent.
     */
    xSemaphoreTake( xMessageIdSemaphore, portMAX_DELAY );
    {
        ++ulMessageId;
        ulPublishMessageId = ulMessageId;
    }
    xSemaphoreGive( xMessageIdSemaphore );

    /* Configure the publish operation. The topic name string must persist for
     * duration of publish! */
    xPublishInfo.qos = xQoS;
    xPublishInfo.pTopicName = pcTopicName;
    xPublishInfo.topicNameLength = ( uint16_t ) strlen( pcTopicName );
    xPublishInfo.pPayload = pcPayload;
    xPublishInfo.payloadLength = ( uint16_t ) strlen( pcPayload );

    /* Complete an application defined context associated with this publish
     * message.
     * This gets updated in the callback function so the variable must persist
     * until the callback executes. */
    xCommandContext.ulNotificationValue = ulPublishMessageId;
    xCommandContext.xTaskToNotify = xTaskGetCurrentTaskHandle();

    xCommandParams.blockTimeMs = subpubunsubconfigMAX_COMMAND_SEND_BLOCK_TIME_MS;
    xCommandParams.cmdCompleteCallback = prvPublishCommandCallback;
    xCommandParams.pCmdCompleteCallbackContext = &xCommandContext;

    do
    {
        /* Wait for coreMQTT-Agent task to have working network connection and
         * not be performing an OTA update. */
        xEventGroupWaitBits( xNetworkEventGroup,
                             CORE_MQTT_AGENT_CONNECTED_BIT | CORE_MQTT_AGENT_OTA_NOT_IN_PROGRESS_BIT,
                             pdFALSE,
                             pdTRUE,
                             portMAX_DELAY );

        ESP_LOGI( TAG,
                  "Task \"%s\" sending publish request to coreMQTT-Agent with message \"%s\" on topic \"%s\" with ID %"PRIu32".",
                  pcTaskGetName( xCommandContext.xTaskToNotify ),
                  pcPayload,
                  pcTopicName,
                  ulPublishMessageId );

        /* To ensure ulNotification doesn't accidentally hold the expected value
         * as it is to be checked against the value sent from the callback.. */
        ulNotifiedValue = ~ulPublishMessageId;

        xCommandAcknowledged = pdFALSE;

        xCommandAdded = MQTTAgent_Publish( &xGlobalMqttAgentContext,
                                           &xPublishInfo,
                                           &xCommandParams );

        if( xCommandAdded == MQTTSuccess )
        {
            /* For QoS 1 and 2, wait for the publish acknowledgment.  For QoS0,
             * wait for the publish to be sent. */
            ESP_LOGI( TAG,
                      "Task \"%s\" waiting for publish %"PRIu32" to complete.",
                      pcTaskGetName( xCommandContext.xTaskToNotify ),
                      ulPublishMessageId );

            xCommandAcknowledged = prvWaitForNotification( &ulNotifiedValue );
        }
        else
        {
            ESP_LOGE( TAG,
                      "Failed to enqueue publish command. Error code=%s",
                      MQTT_Status_strerror( xCommandAdded ) );
        }

        /* Check all ways the status was passed back just for demonstration
         * purposes. */
        if( ( xCommandAcknowledged != pdTRUE ) ||
            ( xCommandContext.xReturnStatus != MQTTSuccess ) ||
            ( ulNotifiedValue != ulPublishMessageId ) )
        {
            ESP_LOGW( TAG,
                      "Error or timed out waiting for ack for publish message %"PRIu32". Re-attempting publish.",
                      ulPublishMessageId );
        }
        else
        {
            ESP_LOGI( TAG,
                      "Publish %"PRIu32" succeeded for task \"%s\".",
                      ulPublishMessageId,
                      pcTaskGetName( xCommandContext.xTaskToNotify ) );
        }
    } while( ( xCommandAcknowledged != pdTRUE ) ||
             ( xCommandContext.xReturnStatus != MQTTSuccess ) ||
             ( ulNotifiedValue != ulPublishMessageId ) );
}

static void prvSubscribeToTopic( IncomingPublishCallbackContext_t * pxIncomingPublishCallbackContext,
                                 MQTTQoS_t xQoS,
                                 char * pcTopicFilter )
{
    uint32_t ulSubscribeMessageId, ulNotifiedValue = 0;

    MQTTStatus_t xCommandAdded;
    BaseType_t xCommandAcknowledged = pdFALSE;

    MQTTAgentSubscribeArgs_t xSubscribeArgs = { 0 };
    MQTTSubscribeInfo_t xSubscribeInfo = { 0 };

    MQTTAgentCommandContext_t xCommandContext = { 0 };
    MQTTAgentCommandInfo_t xCommandParams = { 0 };

    xTaskNotifyStateClear( NULL );

    /* Create a unique number for the subscribe that is about to be sent.
     * This number is used in the command context and is sent back to this task
     * as a notification in the callback that's executed upon receipt of the
     * publish from coreMQTT-Agent.
     * That way this task can match an acknowledgment to the message being sent.
     */
    xSemaphoreTake( xMessageIdSemaphore, portMAX_DELAY );
    {
        ++ulMessageId;
        ulSubscribeMessageId = ulMessageId;
    }
    xSemaphoreGive( xMessageIdSemaphore );

    /* Configure the subscribe operation.  The topic string must persist for
     * duration of subscription! */
    xSubscribeInfo.qos = xQoS;
    xSubscribeInfo.pTopicFilter = pcTopicFilter;
    xSubscribeInfo.topicFilterLength = ( uint16_t ) strlen( pcTopicFilter );

    xSubscribeArgs.pSubscribeInfo = &xSubscribeInfo;
    xSubscribeArgs.numSubscriptions = 1;

    /* Complete an application defined context associated with this subscribe
     * message.
     * This gets updated in the callback function so the variable must persist
     * until the callback executes. */
    xCommandContext.ulNotificationValue = ulSubscribeMessageId;
    xCommandContext.xTaskToNotify = xTaskGetCurrentTaskHandle();
    xCommandContext.pxIncomingPublishCallbackContext = pxIncomingPublishCallbackContext;
    xCommandContext.pArgs = ( void * ) &xSubscribeArgs;

    xCommandParams.blockTimeMs = subpubunsubconfigMAX_COMMAND_SEND_BLOCK_TIME_MS;
    xCommandParams.cmdCompleteCallback = prvSubscribeCommandCallback;
    xCommandParams.pCmdCompleteCallbackContext = ( void * ) &xCommandContext;

    do
    {
        /* Wait for coreMQTT-Agent task to have working network connection and
         * not be performing an OTA update. */
        xEventGroupWaitBits( xNetworkEventGroup,
                             CORE_MQTT_AGENT_CONNECTED_BIT | CORE_MQTT_AGENT_OTA_NOT_IN_PROGRESS_BIT,
                             pdFALSE,
                             pdTRUE,
                             portMAX_DELAY );

        ESP_LOGI( TAG,
                  "Task \"%s\" sending subscribe request to coreMQTT-Agent for topic filter: %s with id %"PRIu32"",
                  pcTaskGetName( xCommandContext.xTaskToNotify ),
                  pcTopicFilter,
                  ulSubscribeMessageId );

        xCommandAcknowledged = pdFALSE;

        xCommandAdded = MQTTAgent_Subscribe( &xGlobalMqttAgentContext,
                                             &xSubscribeArgs,
                                             &xCommandParams );

        if( xCommandAdded == MQTTSuccess )
        {
            /* For QoS 1 and 2, wait for the subscription acknowledgment. For QoS0,
             * wait for the subscribe to be sent. */
            xCommandAcknowledged = prvWaitForNotification( &ulNotifiedValue );
        }
        else
        {
            ESP_LOGE( TAG,
                      "Failed to enqueue subscribe command. Error code=%s",
                      MQTT_Status_strerror( xCommandAdded ) );
        }

        /* Check all ways the status was passed back just for demonstration
         * purposes. */
        if( ( xCommandAcknowledged != pdTRUE ) ||
            ( xCommandContext.xReturnStatus != MQTTSuccess ) ||
            ( ulNotifiedValue != ulSubscribeMessageId ) )
        {
            ESP_LOGW( TAG,
                      "Error or timed out waiting for ack to subscribe message %"PRIu32". Re-attempting subscribe.",
                      ulSubscribeMessageId );
        }
        else
        {
            ESP_LOGI( TAG,
                      "Subscribe %"PRIu32" for topic filter %s succeeded for task \"%s\".",
                      ulSubscribeMessageId,
                      pcTopicFilter,
                      pcTaskGetName( xCommandContext.xTaskToNotify ) );
        }
    } while( ( xCommandAcknowledged != pdTRUE ) ||
             ( xCommandContext.xReturnStatus != MQTTSuccess ) ||
             ( ulNotifiedValue != ulSubscribeMessageId ) );
}

static void prvUnsubscribeToTopic( MQTTQoS_t xQoS,
                                   char * pcTopicFilter )
{
    uint32_t ulUnsubscribeMessageId, ulNotifiedValue = 0;

    MQTTStatus_t xCommandAdded;
    BaseType_t xCommandAcknowledged = pdFALSE;

    MQTTAgentSubscribeArgs_t xUnsubscribeArgs = { 0 };
    MQTTSubscribeInfo_t xUnsubscribeInfo = { 0 };

    MQTTAgentCommandContext_t xCommandContext = { 0 };
    MQTTAgentCommandInfo_t xCommandParams = { 0 };

    xTaskNotifyStateClear( NULL );

    /* Create a unique number for the subscribe that is about to be sent.
     * This number is used in the command context and is sent back to this task
     * as a notification in the callback that's executed upon receipt of the
     * publish from coreMQTT-Agent.
     * That way this task can match an acknowledgment to the message being sent.
     */
    xSemaphoreTake( xMessageIdSemaphore, portMAX_DELAY );
    {
        ++ulMessageId;
        ulUnsubscribeMessageId = ulMessageId;
    }
    xSemaphoreGive( xMessageIdSemaphore );

    /* Configure the subscribe operation.  The topic string must persist for
     * duration of subscription! */
    xUnsubscribeInfo.qos = xQoS;
    xUnsubscribeInfo.pTopicFilter = pcTopicFilter;
    xUnsubscribeInfo.topicFilterLength = ( uint16_t ) strlen( pcTopicFilter );

    xUnsubscribeArgs.pSubscribeInfo = &xUnsubscribeInfo;
    xUnsubscribeArgs.numSubscriptions = 1;

    /* Complete an application defined context associated with this subscribe
     * message.
     * This gets updated in the callback function so the variable must persist
     * until the callback executes. */
    xCommandContext.ulNotificationValue = ulUnsubscribeMessageId;
    xCommandContext.xTaskToNotify = xTaskGetCurrentTaskHandle();
    xCommandContext.pArgs = ( void * ) &xUnsubscribeArgs;

    xCommandParams.blockTimeMs = subpubunsubconfigMAX_COMMAND_SEND_BLOCK_TIME_MS;
    xCommandParams.cmdCompleteCallback = prvUnsubscribeCommandCallback;
    xCommandParams.pCmdCompleteCallbackContext = ( void * ) &xCommandContext;

    do
    {
        /* Wait for coreMQTT-Agent task to have working network connection and
         * not be performing an OTA update. */
        xEventGroupWaitBits( xNetworkEventGroup,
                             CORE_MQTT_AGENT_CONNECTED_BIT | CORE_MQTT_AGENT_OTA_NOT_IN_PROGRESS_BIT,
                             pdFALSE,
                             pdTRUE,
                             portMAX_DELAY );
        ESP_LOGI( TAG,
                  "Task \"%s\" sending unsubscribe request to coreMQTT-Agent for topic filter: %s with id %"PRIu32"",
                  pcTaskGetName( xCommandContext.xTaskToNotify ),
                  pcTopicFilter,
                  ulUnsubscribeMessageId );

        xCommandAcknowledged = pdFALSE;

        xCommandAdded = MQTTAgent_Unsubscribe( &xGlobalMqttAgentContext,
                                               &xUnsubscribeArgs,
                                               &xCommandParams );

        if( xCommandAdded == MQTTSuccess )
        {
            /* For QoS 1 and 2, wait for the subscription acknowledgment.  For QoS0,
             * wait for the subscribe to be sent. */
            xCommandAcknowledged = prvWaitForNotification( &ulNotifiedValue );
        }
        else
        {
            ESP_LOGE( TAG,
                      "Failed to enqueue unsubscribe command. Error code=%s",
                      MQTT_Status_strerror( xCommandAdded ) );
        }

        /* Check all ways the status was passed back just for demonstration
         * purposes. */
        if( ( xCommandAcknowledged != pdTRUE ) ||
            ( xCommandContext.xReturnStatus != MQTTSuccess ) ||
            ( ulNotifiedValue != ulUnsubscribeMessageId ) )
        {
            ESP_LOGW( TAG,
                      "Error or timed out waiting for ack to unsubscribe message %"PRIu32". Re-attempting subscribe.",
                      ulUnsubscribeMessageId );
        }
        else
        {
            ESP_LOGI( TAG,
                      "Unsubscribe %"PRIu32" for topic filter %s succeeded for task \"%s\".",
                      ulUnsubscribeMessageId,
                      pcTopicFilter,
                      pcTaskGetName( xCommandContext.xTaskToNotify ) );
        }
    } while( ( xCommandAcknowledged != pdTRUE ) ||
             ( xCommandContext.xReturnStatus != MQTTSuccess ) ||
             ( ulNotifiedValue != ulUnsubscribeMessageId ) );
}

static void prvSubscribePublishUnsubscribeTask( void * pvParameters )
{
    struct DemoParams * pxParams = ( struct DemoParams * ) pvParameters;
    uint32_t ulNotifiedValue;
    uint32_t ulTaskNumber = pxParams->ulTaskNumber;

    IncomingPublishCallbackContext_t xIncomingPublishCallbackContext;

    MQTTQoS_t xQoS;
    char * pcTopicBuffer = topicBuf[ ulTaskNumber ];
    char pcPayload[ subpubunsubconfigSTRING_BUFFER_LENGTH ];

    xIncomingPublishCallbackContext.ulNotificationValue = ulTaskNumber;
    xIncomingPublishCallbackContext.xTaskToNotify = xTaskGetCurrentTaskHandle();

    xQoS = ( MQTTQoS_t ) subpubunsubconfigQOS_LEVEL;

    /* Create a topic name for this task to publish to. */
    snprintf( pcTopicBuffer,
              subpubunsubconfigSTRING_BUFFER_LENGTH,
              "/filter/%s",
              pcTaskGetName( xIncomingPublishCallbackContext.xTaskToNotify ) );

    while( 1 )
    {
        /* Subscribe to the same topic to which this task will publish.  That will
         * result in each published message being published from the server back to
         * the target. */
        prvSubscribeToTopic( &xIncomingPublishCallbackContext,
                             xQoS,
                             pcTopicBuffer );

        snprintf( pcPayload,
                  subpubunsubconfigSTRING_BUFFER_LENGTH,
                  "%s",
                  pcTaskGetName( xIncomingPublishCallbackContext.xTaskToNotify ) );

        prvPublishToTopic( xQoS,
                           pcTopicBuffer,
                           pcPayload );

        prvWaitForNotification( &ulNotifiedValue );

        ESP_LOGI( TAG,
                  "Task \"%s\" received: %s",
                  pcTaskGetName( xIncomingPublishCallbackContext.xTaskToNotify ),
                  xIncomingPublishCallbackContext.pcIncomingPublish );

        prvUnsubscribeToTopic( xQoS, pcTopicBuffer );

        ESP_LOGI( TAG,
                  "Task \"%s\" completed a loop. Delaying before next loop.",
                  pcTaskGetName( xIncomingPublishCallbackContext.xTaskToNotify ) );

        vTaskDelay( pdMS_TO_TICKS( subpubunsubconfigDELAY_BETWEEN_SUB_PUB_UNSUB_LOOPS_MS ) );
    }

    vTaskDelete( NULL );
}

/* Public function definitions ************************************************/

void vStartSubscribePublishUnsubscribeDemo( void )
{
    static struct DemoParams pxParams[ subpubunsubconfigNUM_TASKS_TO_CREATE ];
    char pcTaskNameBuf[ 15 ];
    uint32_t ulTaskNumber;

    xMessageIdSemaphore = xSemaphoreCreateMutex();
    xNetworkEventGroup = xEventGroupCreate();
    xCoreMqttAgentManagerRegisterHandler( prvCoreMqttAgentEventHandler );

    /* Initialize the coreMQTT-Agent event group. */
    xEventGroupSetBits( xNetworkEventGroup,
                        CORE_MQTT_AGENT_OTA_NOT_IN_PROGRESS_BIT );

    /* Each instance of prvSubscribePublishUnsubscribeTask() generates a unique
     * name and topic filter for itself from the number passed in as the task
     * parameter. */
    /* Create a few instances of prvSubscribePublishUnsubscribeTask(). */
    for( ulTaskNumber = 0; ulTaskNumber < subpubunsubconfigNUM_TASKS_TO_CREATE; ulTaskNumber++ )
    {
        memset( pcTaskNameBuf,
                0x00,
                sizeof( pcTaskNameBuf ) );

        snprintf( pcTaskNameBuf,
                  10,
                  "SubPub%d",
                  ( int ) ulTaskNumber );

        pxParams[ ulTaskNumber ].ulTaskNumber = ulTaskNumber;

        xTaskCreate( prvSubscribePublishUnsubscribeTask,
                     pcTaskNameBuf,
                     subpubunsubconfigTASK_STACK_SIZE,
                     ( void * ) &pxParams[ ulTaskNumber ],
                     subpubunsubconfigTASK_PRIORITY,
                     NULL );
    }
}