/*
 * Amazon FreeRTOS MQTT AFQP V1.1.4
 * Copyright (C) 2018 Amazon.com, Inc. or its affiliates.  All Rights Reserved.
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy of
 * this software and associated documentation files (the "Software"), to deal in
 * the Software without restriction, including without limitation the rights to
 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
 * the Software, and to permit persons to whom the Software is furnished to do so,
 * subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in all
 * copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
 * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
 * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
 * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 *
 * http://aws.amazon.com/freertos
 * http://www.FreeRTOS.org
 */

/**
 * @file aws_test_mqtt_agent.c
 * @brief Tests for MQTT Agent.
 */

/* Standard includes. */
#include <stdio.h>
#include <string.h>

/* Amazon FreeRTOS includes. */
#include "FreeRTOS.h"
#include "semphr.h"
#include "aws_mqtt_agent.h"
#include "task.h"
#include "queue.h"
#include "event_groups.h"
#include "aws_clientcredential.h"

/* Unity framework includes. */
#include "unity_fixture.h"

/* MQTT agent connection timeout. */
#define mqttagenttestTIMEOUT       pdMS_TO_TICKS( 10000UL )

/* Topic name for ping-pong testing. */
#define mqttagenttestTOPIC_NAME    ( ( const uint8_t * ) "freertos/tests/echo" )

#define mqttagenttestMESSAGE       "Hello from the test."
#define mqttagenttestFAILUREPRINTF( x )    vLoggingPrintf x

/* The parameters below are definable so the test can run on most target. */
/* Size in bytes of the messages published.  */
#ifndef  mqttagenttestMULTI_TASK_TEST_PUB_DATA_SIZE
    #define mqttagenttestMULTI_TASK_TEST_PUB_DATA_SIZE    ( 256 )
#endif

/* Number of Receive Tasks that are created. */
#ifndef  mqttagenttestMULTI_TASK_TEST_NUM_RX_TASKS
    #define mqttagenttestMULTI_TASK_TEST_NUM_RX_TASKS    ( 2 )
#endif

/* Number of Transmit Tasks that are created. */
#ifndef  mqttagenttestMULTI_TASK_TEST_NUM_TX_TASKS
    #define mqttagenttestMULTI_TASK_TEST_NUM_TX_TASKS    ( 2 )
#endif

/* Total number of tasks. */
#define mqttagenttestMULTI_TASK_TEST_NUM_TASKS    ( mqttagenttestMULTI_TASK_TEST_NUM_TX_TASKS + mqttagenttestMULTI_TASK_TEST_NUM_RX_TASKS )



/* TCP Echo Client tasks multitask example parameters. */
#define  mqttagenttestMULTI_TASK_TEST_TASKS_STACK_SIZE    ( configMINIMAL_STACK_SIZE * 4 )
#define  mqttagenttestMULTI_TASK_TEST_TASKS_PRIORITY      ( tskIDLE_PRIORITY )


/* Timeout for sync between Transmit and Receive Tasks. */
#ifndef  mqttagenttestMULTI_TASK_TEST_SYNC_TIMEOUT
    #define  mqttagenttestMULTI_TASK_TEST_SYNC_TIMEOUT    ( 40000 )
#endif

/* This represents the time to wait to complete the multitask test.
 * It will vary from one platform to another. */
#ifndef mqttagenttestMULTI_TASK_TEST_COMPLETE_TIMEOUT
    #define mqttagenttestMULTI_TASK_TEST_COMPLETE_TIMEOUT    ( 400000 )
#endif


/* Event mask used for the sync group. It represents all Tasks.*/
#define mqttagenttestMULTI_TASK_TEST_EVENT_MASK                ( ( 1 << mqttagenttestMULTI_TASK_TEST_NUM_TASKS ) - 1 )
/* Event mask used for the sync group. It represents all Transmit Tasks. */
#define mqttagenttestMULTI_TASK_TEST_TX_EVENT_MASK             ( ( ( 1 << mqttagenttestMULTI_TASK_TEST_NUM_TASKS ) - 1 ) - ( ( 1 << mqttagenttestMULTI_TASK_TEST_NUM_RX_TASKS ) - 1 ) )

#define  mqttagenttestMULTI_TASK_TEST_SYNC_TIMEOUT_TICKS       ( pdMS_TO_TICKS( mqttagenttestMULTI_TASK_TEST_SYNC_TIMEOUT ) )
#define mqttagenttestMULTI_TASK_TEST_COMPLETE_TIMEOUT_TICKS    ( pdMS_TO_TICKS( mqttagenttestMULTI_TASK_TEST_COMPLETE_TIMEOUT ) )

/* Receive task priority is changed on bit 0, while Transmit is changed on bit 1.
 * Loop needs to be at least 4 to cover every cases. */
#define mqttagenttestMULTI_TASK_TEST_MAX_LOOP                  ( 4 )

#define mqttagenttestMULTI_TASK_TEST_LOW_PRIORITY              tskIDLE_PRIORITY
#define mqttagenttestMULTI_TASK_TEST_HIGH_PRIORITY             ( configMAX_PRIORITIES - 1 )

/* Receive task toggle priority on bit 0 while Transmit does on bit 1. This allows covering all possible scenario of low/high priority. */
#define mqttagenttestMULTI_TASK_TEST_RX_LOWP_SWITCH_MASK       ( 0 )
#define mqttagenttestMULTI_TASK_TEST_TX_LOWP_SWITCH_MASK       ( 1 )

/* Every Receive task will create one client. */
#define mqttagenttestMULTI_TASK_TEST_CLIENT_ID                 "ClientId%d"
#define mqttagenttestMULTI_TASK_TEST_MAX_CLIENT_ID_SIZE        ( 14 )

/* Number of publishing made by the Transmit task every loop. */
#define mqttagenttestMULTI_TASK_TEST_PUBLISH_PER_LOOP          ( 10 )

/* Topic name for ping-pong testing. */
#define mqttagenttestMULTI_TASK_TEST_TOPIC_NAME                ( ( const uint8_t * ) "freertos/tests/multiTask/%d" )
#define mqttagenttestMULTI_TASK_TEST_MAX_TOPIC_NAME_SIZE       ( 30 )


/* Default connection parameters. */
static const MQTTAgentConnectParams_t xDefaultConnectParameters =
{
    clientcredentialMQTT_BROKER_ENDPOINT,
    mqttagentREQUIRE_TLS,
    pdFALSE,
    clientcredentialMQTT_BROKER_PORT,
    ( const uint8_t * ) clientcredentialIOT_THING_NAME,
    0,
    pdTRUE,
    NULL,
    NULL,
    NULL,
    0
};

/* Parameters used in Muti task call back function. Data is checked and status is passed down. */
typedef struct
{
    SemaphoreHandle_t xSemaphore;
    BaseType_t xStatus;
} MQTTtestAgentCbParam_t;


/* Task parameters for multitask test. */
typedef struct
{
    uint16_t usTaskTag;
    BaseType_t xStatus;
    TaskHandle_t xTaskHandle;
    uint8_t * pucTransmittedData;
    SemaphoreHandle_t xSemaphore;
    BaseType_t xTaskCreated;
} MQTTtestAgentTaskParams_t;


/* Parameters for the Receive tasks. They are bundles into one structure to improve readability. */
typedef struct
{
    MQTTAgentHandle_t xMQTTHandle;
    EventGroupHandle_t xSyncEventRx;
    uint8_t cTopic[ mqttagenttestMULTI_TASK_TEST_MAX_TOPIC_NAME_SIZE ];
    uint16_t usTopicLength;
    uint8_t cClientID[ mqttagenttestMULTI_TASK_TEST_MAX_CLIENT_ID_SIZE ];
    uint16_t usClientIDLength;
} MQTTtestAgentMultiTestRxParam_t;
/* Bundle of global static used by Receive tasks. Note that Transmit task will also use the mqtt handle and the sync semaphore. */
static MQTTtestAgentMultiTestRxParam_t MQTTtestAgentMultiTestRxParam[ mqttagenttestMULTI_TASK_TEST_NUM_RX_TASKS ];

static void prvMultiTaskTest_Rx_Task( void * pvParameters );
static void prvMultiTaskTest_Tx_Task( void * pvParameters );

/* The event group used to wait for the multitaks test completion.*/
static EventGroupHandle_t xSyncEventGroup = NULL;

/**
 * @brief Callback for MQTT subscription.
 */
static MQTTBool_t prvMultiTaskTestMQTTCallback( void * pvUserData,
                                                const MQTTPublishData_t * const pxPublishParameters )
{
    MQTTtestAgentCbParam_t * pxMQTTtestAgentCbParam;
    BaseType_t xDataIndex;

    pxMQTTtestAgentCbParam = ( MQTTtestAgentCbParam_t * ) pvUserData;

    pxMQTTtestAgentCbParam->xStatus = pdPASS;

    if( pxPublishParameters->ulDataLength != mqttagenttestMULTI_TASK_TEST_PUB_DATA_SIZE )
    {
        pxMQTTtestAgentCbParam->xStatus = pdFAIL;
    }
    else
    {
        for( xDataIndex = 0; xDataIndex < mqttagenttestMULTI_TASK_TEST_PUB_DATA_SIZE; xDataIndex++ )
        {
            if( ( ( uint8_t * ) ( pxPublishParameters->pvData ) )[ xDataIndex ] != xDataIndex )
            {
                pxMQTTtestAgentCbParam->xStatus = pdFAIL;
                break;
            }
        }
    }

    /* Give the semaphore to signal message receipt. */
    xSemaphoreGive( pxMQTTtestAgentCbParam->xSemaphore );

    return eMQTTFalse;
}

/**
 * @brief Callback for MQTT subscription.
 */
static MQTTBool_t prvMQTTCallback( void * pvUserData,
                                   const MQTTPublishData_t * const pxPublishParameters )
{
    /* Give the semaphore to signal message receipt. */
    xSemaphoreGive( ( SemaphoreHandle_t ) pvUserData );

    return eMQTTFalse;
}

/*-----------------------------------------------------------*/


/**
 * @brief Test helper routine for MQTT connect, subcribe, publish, and
 * tear-down.
 */
static BaseType_t prvCreateClientAndConnectToBroker( BaseType_t xUseAlpn )
{
    MQTTAgentReturnCode_t xReturned = eMQTTAgentFailure;
    StaticSemaphore_t xSemaphore = { 0 };
    MQTTAgentHandle_t xMQTTHandle = NULL;
    MQTTAgentSubscribeParams_t xSubscribeParams;
    MQTTAgentPublishParams_t xPublishParameters;
    BaseType_t xMQTTAgentCreated = pdFALSE;
    MQTTAgentConnectParams_t xConnectParameters;

    memcpy( &xConnectParameters, &xDefaultConnectParameters, sizeof( MQTTAgentConnectParams_t ) );

    /* Initialize the semaphore as unavailable. */
    TEST_ASSERT_NOT_NULL( xSemaphoreCreateCountingStatic( 1, 0, &xSemaphore ) );

    /* Fill in the MQTTAgentConnectParams_t member that is not const. */
    xConnectParameters.usClientIdLength = ( uint16_t ) strlen(
        ( char * ) xConnectParameters.pucClientId );

    /* Switch ports if requested. */
    if( pdTRUE == xUseAlpn )
    {
        xConnectParameters.xFlags |= mqttagentUSE_AWS_IOT_ALPN_443;
    }

    if( TEST_PROTECT() )
    {
        /* The MQTT client object must be created before it can be used. */
        xReturned = MQTT_AGENT_Create( &xMQTTHandle );
        TEST_ASSERT_EQUAL_INT( xReturned, eMQTTAgentSuccess );
        xMQTTAgentCreated = pdTRUE;

        /* Connect to the broker. */
        xReturned = MQTT_AGENT_Connect( xMQTTHandle,
                                        &xConnectParameters,
                                        mqttagenttestTIMEOUT );
        TEST_ASSERT_EQUAL_INT_MESSAGE( xReturned, eMQTTAgentSuccess, "Failed to connect to the MQTT broker with MQTT_AGENT_Connect()." );

        /* Setup subscribe parameters to subscribe to echo topic. */
        xSubscribeParams.pucTopic = mqttagenttestTOPIC_NAME;
        xSubscribeParams.pvPublishCallbackContext = &xSemaphore;
        xSubscribeParams.pxPublishCallback = prvMQTTCallback;
        xSubscribeParams.usTopicLength = ( uint16_t ) strlen( ( const char * ) mqttagenttestTOPIC_NAME );
        xSubscribeParams.xQoS = eMQTTQoS1;

        /* Subscribe to the topic. */
        xReturned = MQTT_AGENT_Subscribe( xMQTTHandle,
                                          &xSubscribeParams,
                                          mqttagenttestTIMEOUT );
        TEST_ASSERT_EQUAL_INT( xReturned, eMQTTAgentSuccess );

        /* Setup the publish parameters. */
        memset( &( xPublishParameters ), 0x00, sizeof( xPublishParameters ) );
        xPublishParameters.pucTopic = mqttagenttestTOPIC_NAME;
        xPublishParameters.pvData = mqttagenttestMESSAGE;
        xPublishParameters.usTopicLength = ( uint16_t ) strlen( ( const char * ) mqttagenttestTOPIC_NAME );
        xPublishParameters.ulDataLength = ( uint32_t ) strlen( mqttagenttestMESSAGE );
        xPublishParameters.xQoS = eMQTTQoS1;

        /* Publish the message. */
        xReturned = MQTT_AGENT_Publish( xMQTTHandle,
                                        &( xPublishParameters ),
                                        mqttagenttestTIMEOUT );
        TEST_ASSERT_EQUAL_INT( xReturned, eMQTTAgentSuccess );

        /* Take the semaphore to ensure the message is Received. */
        if( pdFALSE == xSemaphoreTake( ( QueueHandle_t ) &( xSemaphore ), mqttagenttestTIMEOUT ) )
        {
            TEST_FAIL();
        }

        /* Disconnect the client. */
        xReturned = MQTT_AGENT_Disconnect( xMQTTHandle, mqttagenttestTIMEOUT );
        TEST_ASSERT_EQUAL_INT( xReturned, eMQTTAgentSuccess );
    }
    else
    {
        TEST_FAIL();
    }

    /*Don't forget to reset the flag, since connect parameters are global, all test afterwards would use ALPN. */
    xConnectParameters.xFlags &= ~mqttagentUSE_AWS_IOT_ALPN_443;

    if( xMQTTAgentCreated == pdTRUE )
    {
        /* Delete the MQTT client. */
        xReturned = MQTT_AGENT_Delete( xMQTTHandle );
        TEST_ASSERT_EQUAL_INT( xReturned, eMQTTAgentSuccess );
    }

    return eMQTTAgentSuccess == xReturned;
}
/*-----------------------------------------------------------*/

/* Define Test Group. */
TEST_GROUP( Full_MQTT_Agent );
TEST_GROUP( Full_MQTT_Agent_ALPN );
TEST_GROUP( Full_MQTT_Agent_Stress_Tests );

/*-----------------------------------------------------------*/

/**
 * @brief Setup function called before each test in this group is executed.
 */
TEST_SETUP( Full_MQTT_Agent )
{
}
TEST_SETUP( Full_MQTT_Agent_Stress_Tests )
{
}
TEST_SETUP( Full_MQTT_Agent_ALPN )
{
}
/*-----------------------------------------------------------*/

/**
 * @brief Tear down function called after each test in this group is executed.
 */
TEST_TEAR_DOWN( Full_MQTT_Agent )
{
}
TEST_TEAR_DOWN( Full_MQTT_Agent_Stress_Tests )
{
}
TEST_TEAR_DOWN( Full_MQTT_Agent_ALPN )
{
}
/*-----------------------------------------------------------*/

/**
 * @brief Function to define which tests to execute as part of this group.
 */
TEST_GROUP_RUNNER( Full_MQTT_Agent )
{
    RUN_TEST_CASE( Full_MQTT_Agent, AFQP_MQTT_Agent_SubscribePublishDefaultPort );
    RUN_TEST_CASE( Full_MQTT_Agent, AFQP_MQTT_Agent_InvalidCredentials );
}
TEST_GROUP_RUNNER( Full_MQTT_Agent_Stress_Tests )
{
    RUN_TEST_CASE( Full_MQTT_Agent_Stress_Tests, MQTT_Agent_MultiTaskTest );
}
TEST_GROUP_RUNNER( Full_MQTT_Agent_ALPN )
{
    RUN_TEST_CASE( Full_MQTT_Agent_ALPN, MQTT_Agent_SubscribePublishAlpn );
}
/*-----------------------------------------------------------*/

/* Test for ping-ponging a message using the default AWS IoT port for MQTT. */
TEST( Full_MQTT_Agent, AFQP_MQTT_Agent_SubscribePublishDefaultPort )
{
    prvCreateClientAndConnectToBroker( pdFALSE );
}

/* Test for ping-ponging a message using the default AWS IoT port for MQTT. */
TEST( Full_MQTT_Agent, AFQP_MQTT_Agent_InvalidCredentials )
{
    MQTTAgentReturnCode_t xReturned;
    MQTTAgentHandle_t xMQTTHandle = NULL;
    BaseType_t xClientCreated, xClientConnected;
    MQTTAgentConnectParams_t xConnectParameters;

    memcpy( &xConnectParameters, &xDefaultConnectParameters, sizeof( MQTTAgentConnectParams_t ) );

    /* Initialize the semaphore. */
    if( TEST_PROTECT() )
    {
        xClientCreated = pdFALSE;
        /* Arbitrary invalid credentials. */
        xConnectParameters.pcCertificate = "abcd";
        xConnectParameters.ulCertificateSize = ( uint16_t ) strlen( xConnectParameters.pcCertificate );
        /* Fill in the MQTTAgentConnectParams_t member that is not const. */
        xConnectParameters.usClientIdLength = ( uint16_t ) strlen(
            ( char * ) xConnectParameters.pucClientId );


        /* The MQTT client object must be created before it can be used. */
        xReturned = MQTT_AGENT_Create( &xMQTTHandle );
        TEST_ASSERT_EQUAL_INT( xReturned, eMQTTAgentSuccess );
        xClientCreated = pdTRUE;

        /* Connect to the broker. */
        xClientConnected = pdTRUE;
        xReturned = MQTT_AGENT_Connect( xMQTTHandle,
                                        &xConnectParameters,
                                        mqttagenttestTIMEOUT );
        TEST_ASSERT_NOT_EQUAL( xReturned, eMQTTAgentSuccess );
        xClientConnected = pdFALSE;
    }

    /* Connect to the broker. */
    if( xClientConnected == pdTRUE )
    {
        /* If enter here, test has already failed. */
        xReturned = MQTT_AGENT_Disconnect( xMQTTHandle,
                                           mqttagenttestTIMEOUT );

        if( xReturned != eMQTTAgentSuccess )
        {
            mqttagenttestFAILUREPRINTF( ( "%s: Could not disconnect client.\r\n", __FUNCTION__ ) );
        }
    }

    if( xClientCreated == pdTRUE )
    {
        /* Delete the MQTT client. */
        xReturned = MQTT_AGENT_Delete( xMQTTHandle );
        TEST_ASSERT_EQUAL_INT( xReturned, eMQTTAgentSuccess );
    }
}
/*-----------------------------------------------------------*/

/* Test for ping-ponging a message using AWS IoT MQTT broker support for port 443. */
TEST( Full_MQTT_Agent_ALPN, MQTT_Agent_SubscribePublishAlpn )
{
    prvCreateClientAndConnectToBroker( pdTRUE );
}
/*-----------------------------------------------------------*/

/**
 * @brief multitask test for MQTT
 *
 * This test create x number of Receive tasks that will create x different clients and
 * subscribe to x different topic.
 * The Receive tasks will take care of create, connecting, subscribing, disconnecting and deleting clients.
 * Also, y number of Transmit task are going to be created, each tasks are going to publish n number of MQTT messages
 * to each one of the x tasks.
 * Once each y Transmit tasks have submitted n messages to each of the x Receive task, it starts all over again. What varies from cycle to
 * cycle are the priorities. Receive and Transmit tasks will swich priorities in such a way as to cover all possible combination of
 * high and low prioritie.
 *
 * At the end of the test, Tasks do not self delete. They get suspended. The main test function will first wait for every
 * task to complete through Amazon FreeRTOS synchronization mechanism then it will delete all the created tasks.
 *
 * For simplicity sake, memory is not allocated in the Receive or Transmit tasks. The allocation is centralized in one place, the main task.
 */
TEST( Full_MQTT_Agent_Stress_Tests, MQTT_Agent_MultiTaskTest )
{
    uint16_t usIndex;
    MQTTtestAgentTaskParams_t xTaskParams[ mqttagenttestMULTI_TASK_TEST_NUM_TASKS ];
    uint32_t usEventMask;
    BaseType_t xResult;
    uint8_t * pucTransmittedData;

    xSyncEventGroup = NULL;
    pucTransmittedData = NULL;

    /* Initialize to unallocated. */
    for( usIndex = 0; usIndex < mqttagenttestMULTI_TASK_TEST_NUM_RX_TASKS; usIndex++ )
    {
        xTaskParams[ usIndex ].xSemaphore = NULL;
        MQTTtestAgentMultiTestRxParam[ usIndex ].xSyncEventRx = NULL;
    }

    if( TEST_PROTECT() )
    {
        /* Create the event group used by the Transmit and Receive tasks to synchronize prior
         * to commencing a cycle using a new socket. */
        xSyncEventGroup = xEventGroupCreate();
        TEST_ASSERT_NOT_NULL_MESSAGE( xSyncEventGroup, "Couldn't create Sync event." );

        for( usIndex = 0; usIndex < mqttagenttestMULTI_TASK_TEST_NUM_RX_TASKS; usIndex++ )
        {
            MQTTtestAgentMultiTestRxParam[ usIndex ].xSyncEventRx = xEventGroupCreate();
            TEST_ASSERT_NOT_NULL_MESSAGE( MQTTtestAgentMultiTestRxParam[ usIndex ].xSyncEventRx, "Couldn't create Sync event." );
            /* Flag memory as allocated for later Free. */
        }

        pucTransmittedData = pvPortMalloc( mqttagenttestMULTI_TASK_TEST_PUB_DATA_SIZE * sizeof( uint8_t ) );
        TEST_ASSERT_NOT_NULL_MESSAGE( pucTransmittedData, "Couldn't allocate data." );

        /* Data sent by Transmit tasks. And a counter to check integrity of the data when Received by Receive task. */
        for( usIndex = 0; usIndex < mqttagenttestMULTI_TASK_TEST_PUB_DATA_SIZE; usIndex++ )
        {
            pucTransmittedData[ usIndex ] = ( uint8_t ) ( usIndex & 0xFF );
        }

        /* Create Receive tasks. */
        for( usIndex = 0; usIndex < mqttagenttestMULTI_TASK_TEST_NUM_RX_TASKS; usIndex++ )
        {
            xTaskParams[ usIndex ].usTaskTag = usIndex;
            xTaskParams[ usIndex ].xStatus = pdFAIL;
            xTaskParams[ usIndex ].pucTransmittedData = pucTransmittedData;

            /* Each Receive task will Receive mqttagenttestMULTI_TASK_TEST_PUBLISH_PER_LOOP * mqttagenttestMULTI_TASK_TEST_NUM_TX_TASKS messages.
             * It is possible that all of them may be Received before callback can trigger, so make semaphore count enough. */
            xTaskParams[ usIndex ].xSemaphore = xSemaphoreCreateCounting( mqttagenttestMULTI_TASK_TEST_PUBLISH_PER_LOOP * mqttagenttestMULTI_TASK_TEST_NUM_TX_TASKS, 0 );
            TEST_ASSERT_NOT_NULL( xTaskParams[ usIndex ].xSemaphore );

            /* Create topic for each Receive task. */
            MQTTtestAgentMultiTestRxParam[ usIndex ].usTopicLength =
                snprintf( ( char * ) &MQTTtestAgentMultiTestRxParam[ usIndex ].cTopic,
                          mqttagenttestMULTI_TASK_TEST_MAX_TOPIC_NAME_SIZE, /* Limit size, will vary depending of the number of tasks. */
                          ( char * ) mqttagenttestMULTI_TASK_TEST_TOPIC_NAME,
                          usIndex );
            /* Create client ID for each Receive task. */
            MQTTtestAgentMultiTestRxParam[ usIndex ].usClientIDLength =
                snprintf( ( char * ) &MQTTtestAgentMultiTestRxParam[ usIndex ].cClientID,
                          mqttagenttestMULTI_TASK_TEST_MAX_CLIENT_ID_SIZE, /* Limit size, will vary depending of the number of tasks. */
                          ( char * ) mqttagenttestMULTI_TASK_TEST_CLIENT_ID,
                          usIndex );

            /* Create Receive Task. When created, flag it for later deletion. */
            xTaskParams[ usIndex ].xTaskCreated = pdFALSE;
            xResult = xTaskCreate( prvMultiTaskTest_Rx_Task, /* The function that implements the task. */
                                   "ClientTask",             /* Just a text name for the task to aid debugging. */
                                   mqttagenttestMULTI_TASK_TEST_TASKS_STACK_SIZE,
                                   &( xTaskParams[ usIndex ] ),
                                   mqttagenttestMULTI_TASK_TEST_TASKS_PRIORITY,
                                   &( xTaskParams[ usIndex ].xTaskHandle ) );
            TEST_ASSERT_EQUAL_MESSAGE( pdPASS, xResult, "Task creation failed" );
            /* Flag Task as created for later delete. */
            xTaskParams[ usIndex ].xTaskCreated = pdTRUE;
        }

        /* Create Transmit tasks. */
        for( usIndex = mqttagenttestMULTI_TASK_TEST_NUM_RX_TASKS; usIndex < mqttagenttestMULTI_TASK_TEST_NUM_TASKS; usIndex++ )
        {
            xTaskParams[ usIndex ].usTaskTag = usIndex;
            xTaskParams[ usIndex ].xStatus = pdFAIL;
            xTaskParams[ usIndex ].pucTransmittedData = pucTransmittedData;

            xTaskParams[ usIndex ].xTaskCreated = pdFALSE;
            xResult = xTaskCreate( prvMultiTaskTest_Tx_Task, /* The function that implements the task. */
                                   "ClientTask",             /* Just a text name for the task to aid debugging. */
                                   mqttagenttestMULTI_TASK_TEST_TASKS_STACK_SIZE,
                                   &( xTaskParams[ usIndex ] ),
                                   mqttagenttestMULTI_TASK_TEST_TASKS_PRIORITY,
                                   &( xTaskParams[ usIndex ].xTaskHandle ) );
            TEST_ASSERT_EQUAL_MESSAGE( pdPASS, xResult, "Task creation failed" );
            /* Flag Task as created for later delete. */
            xTaskParams[ usIndex ].xTaskCreated = pdTRUE;
        }

        /* When all task reaches the sync group, then the test is complete. */
        usEventMask = xEventGroupSync( xSyncEventGroup, /* The event group used for the rendezvous. */
                                       0,
                                       mqttagenttestMULTI_TASK_TEST_EVENT_MASK,
                                       mqttagenttestMULTI_TASK_TEST_COMPLETE_TIMEOUT_TICKS );
        TEST_ASSERT_EQUAL_MESSAGE( mqttagenttestMULTI_TASK_TEST_EVENT_MASK, usEventMask, "Timeout waiting for tasks" );

        /* Check which tasks have failed. If one failed, the test is failed. */
        for( usIndex = 0; usIndex < mqttagenttestMULTI_TASK_TEST_NUM_TASKS; usIndex++ )
        {
            if( xTaskParams[ usIndex ].xStatus != pdPASS )
            {
                TEST_FAIL_MESSAGE( "Not all tasks completed successfully." );
            }
        }
    }

    /* The code below will delete all the memory that has been successfully allocated. */
    /* For each task not completed, delete the task.  */
    for( usIndex = 0; usIndex < mqttagenttestMULTI_TASK_TEST_NUM_TASKS; usIndex++ )
    {
        /* Check task has been created before deleting it. */
        if( xTaskParams[ usIndex ].xTaskCreated == pdTRUE )
        {
            vTaskDelete( xTaskParams[ usIndex ].xTaskHandle );
        }
    }

    /* Delete global group event. */
    if( xSyncEventGroup != NULL )
    {
        vEventGroupDelete( xSyncEventGroup );
    }

    for( usIndex = 0; usIndex < mqttagenttestMULTI_TASK_TEST_NUM_RX_TASKS; usIndex++ )
    {
        if( MQTTtestAgentMultiTestRxParam[ usIndex ].xSyncEventRx != NULL )
        {
            vEventGroupDelete( MQTTtestAgentMultiTestRxParam[ usIndex ].xSyncEventRx );
        }

        if( xTaskParams[ usIndex ].xSemaphore != NULL )
        {
            vSemaphoreDelete( xTaskParams[ usIndex ].xSemaphore );
        }
    }

    if( pucTransmittedData != NULL )
    {
        vPortFree( pucTransmittedData );
    }
}

/*-----------------------------------------------------------*/

/**
 * @brief Receive Task
 *
 * The Receive tasks will take care of create, connecting, subscribing, disconnecting and deleting clients.
 * Also, y number of Transmit task are going to be created, each tasks are going to publish n number of MQTT messages
 * to each one of the x tasks.
 *
 * Once the task has connected, it will synch with the Transmit tasks. Since the task will receice y messages (from y Transmit task),
 * it will pend on Receive semaphore y times. Data is a simple counter and is checked in the callback.
 */
static void prvMultiTaskTest_Rx_Task( void * pvParameters )
{
    MQTTAgentSubscribeParams_t xSubscribeParams;
    MQTTAgentUnsubscribeParams_t xUnSubscribeParams;
    BaseType_t xResult, xStatus, xAgentCreated = pdFALSE;
    size_t xRecvLoop, xPubSubIndex;
    MQTTtestAgentTaskParams_t * pxTcptestEchoClientsTaskParams;
    EventGroupHandle_t * pxSyncEventRx;
    MQTTAgentHandle_t * pxMQTTHandle;
    MQTTAgentConnectParams_t xConnectParametersMultiTaskTest;
    MQTTtestAgentCbParam_t MQTTtestAgentCbParam;
    uint16_t usTaskTag;

    /* Copy over default parameters, then change for specific clientID connection. */
    memcpy( &xConnectParametersMultiTaskTest, &xDefaultConnectParameters, sizeof( MQTTAgentConnectParams_t ) );


    pxTcptestEchoClientsTaskParams = ( ( MQTTtestAgentTaskParams_t * ) pvParameters );
    usTaskTag = pxTcptestEchoClientsTaskParams->usTaskTag;
    pxSyncEventRx = &MQTTtestAgentMultiTestRxParam[ usTaskTag ].xSyncEventRx;
    pxMQTTHandle = &MQTTtestAgentMultiTestRxParam[ usTaskTag ].xMQTTHandle;
    xConnectParametersMultiTaskTest.pucClientId = MQTTtestAgentMultiTestRxParam[ usTaskTag ].cClientID;
    xConnectParametersMultiTaskTest.usClientIdLength = MQTTtestAgentMultiTestRxParam[ usTaskTag ].usClientIDLength;

    /* Setup subscribe parameters to subscribe to echo topic. */
    xSubscribeParams.pvPublishCallbackContext = &MQTTtestAgentCbParam;
    xSubscribeParams.pxPublishCallback = prvMultiTaskTestMQTTCallback;
    xSubscribeParams.xQoS = eMQTTQoS1;
    xSubscribeParams.pucTopic = MQTTtestAgentMultiTestRxParam[ usTaskTag ].cTopic;
    xSubscribeParams.usTopicLength = MQTTtestAgentMultiTestRxParam[ usTaskTag ].usTopicLength;

    xUnSubscribeParams.pucTopic = xSubscribeParams.pucTopic;
    xUnSubscribeParams.usTopicLength = xSubscribeParams.usTopicLength;

    MQTTtestAgentCbParam.xSemaphore = pxTcptestEchoClientsTaskParams->xSemaphore;

    /* Default is pass to avoid nesting. */
    xStatus = pdPASS;

    for( xRecvLoop = 0; xRecvLoop < mqttagenttestMULTI_TASK_TEST_MAX_LOOP; xRecvLoop++ )
    {
        if( mqttagenttestMULTI_TASK_TEST_RX_LOWP_SWITCH_MASK & xRecvLoop )
        {
            vTaskPrioritySet( NULL, mqttagenttestMULTI_TASK_TEST_LOW_PRIORITY );
        }
        else
        {
            vTaskPrioritySet( NULL, mqttagenttestMULTI_TASK_TEST_HIGH_PRIORITY );
        }

        /* The MQTT client object must be created before it can be used. */
        xResult = MQTT_AGENT_Create( pxMQTTHandle );

        if( xResult != eMQTTAgentSuccess )
        {
            mqttagenttestFAILUREPRINTF( ( "%s: Could not create MQTT client.\r\n", __FUNCTION__ ) );
            xStatus = pdFAIL;
            break;
        }

        xAgentCreated = pdTRUE;

        /* Connect to the broker. */
        xResult = MQTT_AGENT_Connect( *pxMQTTHandle,
                                      &xConnectParametersMultiTaskTest,
                                      mqttagenttestTIMEOUT );

        if( xResult != eMQTTAgentSuccess )
        {
            mqttagenttestFAILUREPRINTF( ( "%s: Could not connect MQTT client.\r\n", __FUNCTION__ ) );
            xStatus = pdFAIL;
            break;
        }

        /* Subscribe to the topic. */
        xResult = MQTT_AGENT_Subscribe( *pxMQTTHandle,
                                        &xSubscribeParams,
                                        mqttagenttestTIMEOUT );

        if( xResult != eMQTTAgentSuccess )
        {
            mqttagenttestFAILUREPRINTF( ( "%s: Could not subscribe to topic.\r\n", __FUNCTION__ ) );
            xStatus = pdFAIL;
            break;
        }

        if( xEventGroupSync( *pxSyncEventRx, /* The event group used for the rendezvous. */
                             ( 1 << usTaskTag ),
                             ( 1 << usTaskTag ) | mqttagenttestMULTI_TASK_TEST_TX_EVENT_MASK,
                             mqttagenttestMULTI_TASK_TEST_SYNC_TIMEOUT_TICKS ) != ( ( 1 << usTaskTag ) | mqttagenttestMULTI_TASK_TEST_TX_EVENT_MASK ) )
        {
            mqttagenttestFAILUREPRINTF( ( "%s: Timed out waiting for Transmit tasks.\r\n", __FUNCTION__ ) );
            xStatus = pdFAIL;
            break;
        }

        /* Take the semaphore to ensure the message is Received. */
        for( xPubSubIndex = 0; xPubSubIndex < mqttagenttestMULTI_TASK_TEST_PUBLISH_PER_LOOP * mqttagenttestMULTI_TASK_TEST_NUM_RX_TASKS; xPubSubIndex++ )
        {
            if( pdFALSE == xSemaphoreTake( pxTcptestEchoClientsTaskParams->xSemaphore, mqttagenttestMULTI_TASK_TEST_SYNC_TIMEOUT_TICKS ) )
            {
                mqttagenttestFAILUREPRINTF( ( "%s: Timed out waiting for pub from Transmit tasks.\r\n", __FUNCTION__ ) );
                xStatus = pdFAIL;
                break;
            }
            else /* Get status from the callback*/
            {
                if( MQTTtestAgentCbParam.xStatus != pdPASS )
                {
                    mqttagenttestFAILUREPRINTF( ( "%s: Error, corrupted data Received.\r\n", __FUNCTION__ ) );
                    xStatus = pdFAIL;
                    break;
                }
            }
        }

        /*If fail happened in for loop above, break out. */
        if( xStatus == pdFAIL )
        {
            break;
        }

        if( xEventGroupSync( *pxSyncEventRx, /* The event group used for the rendezvous. */
                             ( 1 << usTaskTag ),
                             ( 1 << usTaskTag ) | mqttagenttestMULTI_TASK_TEST_TX_EVENT_MASK,
                             mqttagenttestMULTI_TASK_TEST_SYNC_TIMEOUT_TICKS ) != ( ( 1 << usTaskTag ) | mqttagenttestMULTI_TASK_TEST_TX_EVENT_MASK ) )
        {
            mqttagenttestFAILUREPRINTF( ( "%s: Timed out waiting for Transmit tasks.\r\n", __FUNCTION__ ) );
            xStatus = pdFAIL;
            break;
        }

        /* Unsubscribe to the topic. */
        xResult = MQTT_AGENT_Unsubscribe( *pxMQTTHandle,
                                          &xUnSubscribeParams,
                                          mqttagenttestTIMEOUT );

        if( xResult != eMQTTAgentSuccess )
        {
            mqttagenttestFAILUREPRINTF( ( "%s: Failed to unsubscribe.\r\n", __FUNCTION__ ) );
            xStatus = pdFAIL;
            break;
        }

        /* Disconnect. */
        xResult = MQTT_AGENT_Disconnect( *pxMQTTHandle, mqttagenttestTIMEOUT );

        if( xResult != eMQTTAgentSuccess )
        {
            mqttagenttestFAILUREPRINTF( ( "%s: Failed to disconnect.\r\n", __FUNCTION__ ) );
            xStatus = pdFAIL;
            break;
        }

        xResult = MQTT_AGENT_Delete( *pxMQTTHandle );

        if( xResult != eMQTTAgentSuccess )
        {
            mqttagenttestFAILUREPRINTF( ( "%s: Failed to delete MQTT client.\r\n", __FUNCTION__ ) );
            xStatus = pdFAIL;
            break;
        }

        xAgentCreated = pdFALSE;
    }

    if( xAgentCreated == pdTRUE )
    {
        ( void ) MQTT_AGENT_Delete( *pxMQTTHandle );
    }

    pxTcptestEchoClientsTaskParams->xStatus = xStatus;
    xEventGroupSync( xSyncEventGroup, /* The event group used for the rendezvous. */
                     ( 1 << usTaskTag ),
                     mqttagenttestMULTI_TASK_TEST_EVENT_MASK,
                     mqttagenttestMULTI_TASK_TEST_COMPLETE_TIMEOUT_TICKS );
    vTaskSuspend( NULL ); /* Suspend this task. Will be deleted by the mainTask */
}

/*-----------------------------------------------------------*/

/**
 * @brief Transmit Task
 *
 * y number of Transmit task are going to be created, each tasks are going to publish n number of MQTT messages
 * to each one of the x tasks.
 */
static void prvMultiTaskTest_Tx_Task( void * pvParameters )
{
    size_t xSendLoop;
    BaseType_t xStatus, xReturn;
    size_t xPubSubIndex, usIndex;
    MQTTAgentPublishParams_t xPublishParameters;
    MQTTtestAgentTaskParams_t * xTcptestEchoClientsTaskParams;

    xTcptestEchoClientsTaskParams = ( ( MQTTtestAgentTaskParams_t * ) pvParameters );

    xStatus = pdPASS;

    /* Transmit and Receive loop switch priorities on a different bit, so all combination of high and low priorities will be covered. */
    for( xSendLoop = 1; xSendLoop < mqttagenttestMULTI_TASK_TEST_MAX_LOOP + 1; xSendLoop++ )
    {
        if( mqttagenttestMULTI_TASK_TEST_TX_LOWP_SWITCH_MASK & xSendLoop )
        {
            vTaskPrioritySet( NULL, mqttagenttestMULTI_TASK_TEST_LOW_PRIORITY );
        }
        else
        {
            vTaskPrioritySet( NULL, mqttagenttestMULTI_TASK_TEST_HIGH_PRIORITY );
        }

        /* Wait for the Receive tasks to create and connect the agent. */

        for( usIndex = 0; usIndex < mqttagenttestMULTI_TASK_TEST_NUM_RX_TASKS; usIndex++ )
        {
            if( xEventGroupSync( MQTTtestAgentMultiTestRxParam[ usIndex ].xSyncEventRx,             /* The event group used for the rendezvous. */
                                 ( 1 << xTcptestEchoClientsTaskParams->usTaskTag ),                 /* The bit representing the Transmit task reaching the rendezvous. */
                                 ( mqttagenttestMULTI_TASK_TEST_TX_EVENT_MASK | ( 1 << usIndex ) ), /* Also wait for the Receive tasks. */
                                 mqttagenttestMULTI_TASK_TEST_SYNC_TIMEOUT_TICKS ) != ( mqttagenttestMULTI_TASK_TEST_TX_EVENT_MASK | ( 1 << usIndex ) ) )
            {
                mqttagenttestFAILUREPRINTF( ( "%s: Failed synching with Transmit and Receive tasks.\r\n", __FUNCTION__ ) );
                xStatus = pdFAIL;
                break;
            }
        }

        if( xStatus == pdFAIL )
        {
            break;
        }

        for( usIndex = 0; usIndex < mqttagenttestMULTI_TASK_TEST_NUM_RX_TASKS; usIndex++ )
        {
            /* Setup the publish parameters. */
            memset( &( xPublishParameters ), 0x00, sizeof( xPublishParameters ) );
            xPublishParameters.pucTopic = MQTTtestAgentMultiTestRxParam[ usIndex ].cTopic;
            xPublishParameters.pvData = xTcptestEchoClientsTaskParams->pucTransmittedData;
            xPublishParameters.usTopicLength = MQTTtestAgentMultiTestRxParam[ usIndex ].usTopicLength;
            xPublishParameters.ulDataLength = ( uint32_t ) mqttagenttestMULTI_TASK_TEST_PUB_DATA_SIZE;
            xPublishParameters.xQoS = eMQTTQoS1;

            /* Publish the message. */
            for( xPubSubIndex = 0; xPubSubIndex < mqttagenttestMULTI_TASK_TEST_PUBLISH_PER_LOOP; xPubSubIndex++ )
            {
                xReturn = MQTT_AGENT_Publish( MQTTtestAgentMultiTestRxParam[ usIndex ].xMQTTHandle,
                                              &( xPublishParameters ),
                                              mqttagenttestTIMEOUT );

                if( xReturn != eMQTTAgentSuccess )
                {
                    mqttagenttestFAILUREPRINTF( ( "%s: Failed publishing to topic.\r\n", __FUNCTION__ ) );
                    xStatus = pdFAIL;
                    break;
                }
            }

            if( xStatus == pdFAIL )
            {
                break;
            }
        }

        if( xStatus == pdFAIL )
        {
            break;
        }

        for( usIndex = 0; usIndex < mqttagenttestMULTI_TASK_TEST_NUM_RX_TASKS; usIndex++ )
        {
            if( xEventGroupSync( MQTTtestAgentMultiTestRxParam[ usIndex ].xSyncEventRx,             /* The event group used for the rendezvous. */
                                 ( 1 << xTcptestEchoClientsTaskParams->usTaskTag ),                 /* The bit representing the Transmit task reaching the rendezvous. */
                                 ( mqttagenttestMULTI_TASK_TEST_TX_EVENT_MASK | ( 1 << usIndex ) ), /* Also wait for the Receive tasks. */
                                 mqttagenttestMULTI_TASK_TEST_SYNC_TIMEOUT_TICKS ) != ( mqttagenttestMULTI_TASK_TEST_TX_EVENT_MASK | ( 1 << usIndex ) ) )
            {
                mqttagenttestFAILUREPRINTF( ( "%s: Failed synching with Transmit and Receive tasks.\r\n", __FUNCTION__ ) );
                xStatus = pdFAIL;
                break;
            }
        }
    }

    xTcptestEchoClientsTaskParams->xStatus = xStatus;
    xEventGroupSync( xSyncEventGroup, /* The event group used for the rendezvous. */
                     ( 1 << xTcptestEchoClientsTaskParams->usTaskTag ),
                     mqttagenttestMULTI_TASK_TEST_EVENT_MASK,
                     mqttagenttestMULTI_TASK_TEST_COMPLETE_TIMEOUT_TICKS );
    vTaskSuspend( NULL ); /* Suspend this task. Will be deleted by the mainTask */
}