/* * FreeRTOS Common V1.2.0 * Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Permission is hereby granted, free of charge, to any person obtaining a copy of * this software and associated documentation files (the "Software"), to deal in * the Software without restriction, including without limitation the rights to * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of * the Software, and to permit persons to whom the Software is furnished to do so, * subject to the following conditions: * * The above copyright notice and this permission notice shall be included in all * copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * * http://aws.amazon.com/freertos * http://www.FreeRTOS.org */ /** * @file iot_taskpool.c * @brief Implements the task pool functions in iot_taskpool.h */ /* The config header is always included first. */ #include "iot_config.h" /* Standard includes. */ #include #include #include #include /* Platform layer includes. */ #include "platform/iot_threads.h" #include "platform/iot_clock.h" /* Task pool internal include. */ #include "private/iot_taskpool_internal.h" /** * @brief Enter a critical section by locking a mutex. * */ #define TASKPOOL_ENTER_CRITICAL() IotMutex_Lock( &( pTaskPool->lock ) ) /** * @brief Exit a critical section by unlocking a mutex. * */ #define TASKPOOL_EXIT_CRITICAL() IotMutex_Unlock( &( pTaskPool->lock ) ) /** * @brief Maximum semaphore value for wait operations. */ #define TASKPOOL_MAX_SEM_VALUE 0xFFFF /** * @brief Reschedule delay in milliseconds for deferred jobs. */ #define TASKPOOL_JOB_RESCHEDULE_DELAY_MS ( 10ULL ) /* ---------------------------------------------------------------------------------- */ /** * Doxygen should ignore this section. * * @brief The system task pool handle for all libraries to use. * User application can use the system task pool as well knowing that the usage will be shared with * the system libraries as well. The system task pool needs to be initialized before any library is used or * before any code that posts jobs to the task pool runs. */ _taskPool_t _IotSystemTaskPool = { .dispatchQueue = IOT_DEQUEUE_INITIALIZER }; /* -------------- Convenience functions to create/recycle/destroy jobs -------------- */ /** * @brief Initializes one instance of a Task pool cache. * * @param[in] pCache The pre-allocated instance of the cache to initialize. */ static void _initJobsCache( _taskPoolCache_t * const pCache ); /** * @brief Initialize a job. * * @param[in] pJob The job to initialize. * @param[in] userCallback The user callback for the job. * @param[in] pUserContext The context tp be passed to the callback. * @param[in] isStatic A flag to indicate whether the job is statically or dynamically allocated. */ static void _initializeJob( _taskPoolJob_t * const pJob, IotTaskPoolRoutine_t userCallback, void * pUserContext, bool isStatic ); /** * @brief Extracts and initializes one instance of a job from the cache or, if there is none available, it allocates and initializes a new one. * * @param[in] pCache The instance of the cache to extract the job from. */ static _taskPoolJob_t * _fetchOrAllocateJob( _taskPoolCache_t * const pCache ); /** * Recycles one instance of a job into the cache or, if the cache is full, it destroys it. * * @param[in] pCache The instance of the cache to recycle the job into. * @param[in] pJob The job to recycle. * */ static void _recycleJob( _taskPoolCache_t * const pCache, _taskPoolJob_t * const pJob ); /** * Destroys one instance of a job. * * @param[in] pJob The job to destroy. * */ static void _destroyJob( _taskPoolJob_t * const pJob ); /* -------------- The worker thread procedure for a task pool thread -------------- */ /** * The procedure for a task pool worker thread. * * @param[in] pUserContext The user context. * */ static void _taskPoolWorker( void * pUserContext ); /* -------------- Convenience functions to handle timer events -------------- */ /** * Comparer for the time list. * * param[in] pTimerEventLink1 The link to the first timer event. * param[in] pTimerEventLink1 The link to the first timer event. */ static int32_t _timerEventCompare( const IotLink_t * const pTimerEventLink1, const IotLink_t * const pTimerEventLink2 ); /** * Reschedules the timer for handling deferred jobs to the next timeout. * * param[in] pTimer The timer to reschedule. * param[in] pFirstTimerEvent The timer event that carries the timeout and job information. */ static void _rescheduleDeferredJobsTimer( IotTimer_t * const pTimer, _taskPoolTimerEvent_t * const pFirstTimerEvent ); /** * The task pool timer procedure for scheduling deferred jobs. * * param[in] pArgument An opaque pointer for timer procedure context. */ static void _timerThread( void * pArgument ); /* -------------- Convenience functions to create/initialize/destroy the task pool -------------- */ /** * Parameter validation for a task pool initialization. * * @param[in] pInfo The initialization information for the task pool. * */ IotTaskPoolError_t _performTaskPoolParameterValidation( const IotTaskPoolInfo_t * const pInfo ); /** * Initializes a pre-allocated instance of a task pool. * * @param[in] pInfo The initialization information for the task pool. * @param[in] pTaskPool The pre-allocated instance of the task pool to initialize. * */ static IotTaskPoolError_t _initTaskPoolControlStructures( const IotTaskPoolInfo_t * const pInfo, _taskPool_t * const pTaskPool ); /** * Initializes a pre-allocated instance of a task pool. * * @param[in] pInfo The initialization information for the task pool. * @param[out] pTaskPool A pointer to the task pool data structure to initialize. * */ static IotTaskPoolError_t _createTaskPool( const IotTaskPoolInfo_t * const pInfo, _taskPool_t * const pTaskPool ); /** * Destroys one instance of a task pool. * * @param[in] pTaskPool The task pool to destroy. * */ static void _destroyTaskPool( _taskPool_t * const pTaskPool ); /** * Check for the exit condition. * * @param[in] pTaskPool The task pool to destroy. * */ static bool _IsShutdownStarted( const _taskPool_t * const pTaskPool ); /** * Set the exit condition. * * @param[in] pTaskPool The task pool to destroy. * @param[in] threads The number of threads active in the task pool at shutdown time. * */ static void _signalShutdown( _taskPool_t * const pTaskPool, uint32_t threads ); /** * Places a job in the dispatch queue. * * @param[in] pTaskPool The task pool to schedule the job with. * @param[in] pJob The job to schedule. * @param[in] flags The job flags. * */ static IotTaskPoolError_t _scheduleInternal( _taskPool_t * const pTaskPool, _taskPoolJob_t * const pJob, uint32_t flags ); /** * Matches a deferred job in the timer queue with its timer event wrapper. * * @param[in] pLink A pointer to the timer event link in the timer queue. * @param[in] pMatch A pointer to the job to match. * */ static bool _matchJobByPointer( const IotLink_t * const pLink, void * pMatch ); /** * Tries to cancel a job. * * @param[in] pTaskPool The task pool to cancel an operation against. * @param[in] pJob The job to cancel. * @param[out] pStatus The status of the job at the time of cancellation. * */ static IotTaskPoolError_t _tryCancelInternal( _taskPool_t * const pTaskPool, _taskPoolJob_t * const pJob, IotTaskPoolJobStatus_t * const pStatus ); /** * Try to safely cancel and/or remove a job from the cache when the user calls API out of order. * * @param[in] pTaskPool The task pool to safely extract a job from. * @param[in] pJob The job to extract. * @param[in] atCompletion A flag to indicate whether the job is being scheduled or * was completed already. * */ static IotTaskPoolError_t _trySafeExtraction( _taskPool_t * const pTaskPool, _taskPoolJob_t * const pJob, bool atCompletion ); /* ---------------------------------------------------------------------------------------------- */ IotTaskPool_t IotTaskPool_GetSystemTaskPool( void ) { return &_IotSystemTaskPool; } /*-----------------------------------------------------------*/ IotTaskPoolError_t IotTaskPool_CreateSystemTaskPool( const IotTaskPoolInfo_t * const pInfo ) { TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS ); /* Parameter checking. */ TASKPOOL_ON_ERROR_GOTO_CLEANUP( _performTaskPoolParameterValidation( pInfo ) ); /* Create the system task pool pool. */ TASKPOOL_SET_AND_GOTO_CLEANUP( _createTaskPool( pInfo, &_IotSystemTaskPool ) ); TASKPOOL_NO_FUNCTION_CLEANUP(); } /*-----------------------------------------------------------*/ IotTaskPoolError_t IotTaskPool_Create( const IotTaskPoolInfo_t * const pInfo, IotTaskPool_t * const pTaskPool ) { TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS ); _taskPool_t * pTempTaskPool = NULL; /* Verify that the task pool storage is valid. */ TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pTaskPool ); /* Parameter checking. */ TASKPOOL_ON_ERROR_GOTO_CLEANUP( _performTaskPoolParameterValidation( pInfo ) ); /* Allocate the memory for the task pool */ pTempTaskPool = ( _taskPool_t * ) IotTaskPool_MallocTaskPool( sizeof( _taskPool_t ) ); if( pTempTaskPool == NULL ) { TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_NO_MEMORY ); } memset( pTempTaskPool, 0x00, sizeof( _taskPool_t ) ); TASKPOOL_SET_AND_GOTO_CLEANUP( _createTaskPool( pInfo, pTempTaskPool ) ); TASKPOOL_FUNCTION_CLEANUP(); if( TASKPOOL_FAILED( status ) ) { if( pTempTaskPool != NULL ) { IotTaskPool_FreeTaskPool( pTempTaskPool ); } } else { *pTaskPool = pTempTaskPool; } TASKPOOL_FUNCTION_CLEANUP_END(); } /*-----------------------------------------------------------*/ IotTaskPoolError_t IotTaskPool_Destroy( IotTaskPool_t taskPoolHandle ) { TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS ); uint32_t count; bool completeShutdown = true; _taskPool_t * pTaskPool = ( _taskPool_t * ) taskPoolHandle; /* Track how many threads the task pool owns. */ uint32_t activeThreads; /* Parameter checking. */ TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pTaskPool ); /* Destroying the task pool should be safe, and therefore we will grab the task pool lock. * No worker thread or application thread should access any data structure * in the task pool while the task pool is being destroyed. */ TASKPOOL_ENTER_CRITICAL(); { IotLink_t * pItemLink; /* Record how many active threads in the task pool. */ activeThreads = pTaskPool->activeThreads; /* Destroying a Task pool happens in six (6) stages: First, (1) we clear the job queue and (2) the timer queue. * Then (3) we clear the jobs cache. We will then (4) wait for all worker threads to signal exit, * before (5) setting the exit condition and wake up all active worker threads. Finally (6) destroying * all task pool data structures and release the associated memory. */ /* (1) Clear the job queue. */ do { pItemLink = NULL; pItemLink = IotDeQueue_DequeueHead( &pTaskPool->dispatchQueue ); if( pItemLink != NULL ) { _taskPoolJob_t * pJob = IotLink_Container( _taskPoolJob_t, pItemLink, link ); _destroyJob( pJob ); } } while( pItemLink ); /* (2) Clear the timer queue. */ { _taskPoolTimerEvent_t * pTimerEvent; /* A deferred job may have fired already. Since deferred jobs will go through the same mutex * the shutdown sequence is holding at this stage, there is no risk for race conditions. Yet, we * need to let the deferred job to destroy the task pool. */ pItemLink = IotListDouble_PeekHead( &pTaskPool->timerEventsList ); if( pItemLink != NULL ) { uint64_t now = IotClock_GetTimeMs(); pTimerEvent = IotLink_Container( _taskPoolTimerEvent_t, pItemLink, link ); if( pTimerEvent->expirationTime <= now ) { IotLogDebug( "Shutdown will be deferred to the timer thread" ); /* Timer may have fired already! Let the timer thread destroy * complete the taskpool destruction sequence. */ completeShutdown = false; } /* Remove all timers from the timeout list. */ for( ; ; ) { pItemLink = IotListDouble_RemoveHead( &pTaskPool->timerEventsList ); if( pItemLink == NULL ) { break; } pTimerEvent = IotLink_Container( _taskPoolTimerEvent_t, pItemLink, link ); _destroyJob( pTimerEvent->pJob ); IotTaskPool_FreeTimerEvent( pTimerEvent ); } } } /* (3) Clear the job cache. */ do { pItemLink = NULL; pItemLink = IotListDouble_RemoveHead( &pTaskPool->jobsCache.freeList ); if( pItemLink != NULL ) { _taskPoolJob_t * pJob = IotLink_Container( _taskPoolJob_t, pItemLink, link ); _destroyJob( pJob ); } } while( pItemLink ); /* (4) Set the exit condition. */ _signalShutdown( pTaskPool, activeThreads ); } TASKPOOL_EXIT_CRITICAL(); /* (5) Wait for all active threads to reach the end of their life-span. */ for( count = 0; count < activeThreads; ++count ) { IotSemaphore_Wait( &pTaskPool->startStopSignal ); } IotTaskPool_Assert( IotSemaphore_GetCount( &pTaskPool->startStopSignal ) == 0 ); IotTaskPool_Assert( pTaskPool->activeThreads == 0 ); /* (6) Destroy all signaling objects. */ if( completeShutdown == true ) { _destroyTaskPool( pTaskPool ); /* Do not free the system task pool which is statically allocated. */ if( pTaskPool != &_IotSystemTaskPool ) { IotTaskPool_FreeTaskPool( pTaskPool ); } } TASKPOOL_NO_FUNCTION_CLEANUP(); } /*-----------------------------------------------------------*/ IotTaskPoolError_t IotTaskPool_SetMaxThreads( IotTaskPool_t taskPoolHandle, uint32_t maxThreads ) { TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS ); uint32_t count, i; _taskPool_t * pTaskPool = ( _taskPool_t * ) taskPoolHandle; /* Parameter checking. */ TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pTaskPool ); TASKPOOL_ON_ARG_ERROR_GOTO_CLEANUP( pTaskPool->minThreads > maxThreads ); TASKPOOL_ON_ARG_ERROR_GOTO_CLEANUP( maxThreads < 1UL ); TASKPOOL_ENTER_CRITICAL(); { /* Bail out early if this task pool is shutting down. */ if( _IsShutdownStarted( pTaskPool ) ) { TASKPOOL_EXIT_CRITICAL(); TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_SHUTDOWN_IN_PROGRESS ); } uint32_t previousMaxThreads = pTaskPool->maxThreads; /* Reset the max threads counter. */ pTaskPool->maxThreads = maxThreads; count = previousMaxThreads - maxThreads; /* If the number of maximum threads in the pool is set to be smaller than the current value, * then we need to signal all redundant threads to exit. */ if( maxThreads < previousMaxThreads ) { IotLogDebug( "Setting max threads caused %d thread to exit.", count ); i = count; while( i > 0UL ) { IotSemaphore_Post( &pTaskPool->dispatchSignal ); --i; } } } TASKPOOL_EXIT_CRITICAL(); TASKPOOL_NO_FUNCTION_CLEANUP(); } /*-----------------------------------------------------------*/ IotTaskPoolError_t IotTaskPool_CreateJob( IotTaskPoolRoutine_t userCallback, void * pUserContext, IotTaskPoolJobStorage_t * const pJobStorage, IotTaskPoolJob_t * const ppJob ) { TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS ); /* Parameter checking. */ TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( userCallback ); TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pJobStorage ); TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( ppJob ); /* Build a job around the user-provided storage. */ _initializeJob( ( _taskPoolJob_t * ) pJobStorage, userCallback, pUserContext, true ); *ppJob = ( IotTaskPoolJob_t ) pJobStorage; TASKPOOL_NO_FUNCTION_CLEANUP(); } /*-----------------------------------------------------------*/ IotTaskPoolError_t IotTaskPool_CreateRecyclableJob( IotTaskPool_t taskPoolHandle, IotTaskPoolRoutine_t userCallback, void * pUserContext, IotTaskPoolJob_t * const ppJob ) { _taskPool_t * pTaskPool = NULL; TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS ); /* Parameter checking. */ TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( taskPoolHandle ); TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( userCallback ); TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( ppJob ); pTaskPool = ( _taskPool_t * ) taskPoolHandle; { _taskPoolJob_t * pTempJob = NULL; TASKPOOL_ENTER_CRITICAL(); { /* Bail out early if this task pool is shutting down. */ if( _IsShutdownStarted( pTaskPool ) ) { TASKPOOL_EXIT_CRITICAL(); TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_SHUTDOWN_IN_PROGRESS ); } pTempJob = _fetchOrAllocateJob( &pTaskPool->jobsCache ); } TASKPOOL_EXIT_CRITICAL(); if( pTempJob == NULL ) { IotLogInfo( "Failed to allocate a job." ); TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_NO_MEMORY ); } _initializeJob( pTempJob, userCallback, pUserContext, false ); *ppJob = pTempJob; } TASKPOOL_NO_FUNCTION_CLEANUP(); } /*-----------------------------------------------------------*/ IotTaskPoolError_t IotTaskPool_DestroyRecyclableJob( IotTaskPool_t taskPoolHandle, IotTaskPoolJob_t pJobHandle ) { TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS ); _taskPool_t * pTaskPool = NULL; _taskPoolJob_t * pJob1 = NULL; /* Parameter checking. */ TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( taskPoolHandle ); TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pJobHandle ); pTaskPool = ( _taskPool_t * ) taskPoolHandle; pJob1 = ( _taskPoolJob_t * ) pJobHandle; TASKPOOL_ENTER_CRITICAL(); { /* Bail out early if this task pool is shutting down. */ if( _IsShutdownStarted( pTaskPool ) ) { status = IOT_TASKPOOL_SHUTDOWN_IN_PROGRESS; } /* Do not destroy statically allocated jobs. */ else if( ( pJob1->flags & IOT_TASK_POOL_INTERNAL_STATIC ) == IOT_TASK_POOL_INTERNAL_STATIC ) { IotLogWarn( "Attempt to destroy a statically allocated job." ); status = IOT_TASKPOOL_ILLEGAL_OPERATION; } else { status = _trySafeExtraction( pTaskPool, pJob1, true ); } } TASKPOOL_EXIT_CRITICAL(); if( TASKPOOL_SUCCEEDED( status ) ) { /* At this point, the job must not be in any queue or list. */ IotTaskPool_Assert( IotLink_IsLinked( &pJob1->link ) == false ); _destroyJob( pJob1 ); } TASKPOOL_NO_FUNCTION_CLEANUP(); } /*-----------------------------------------------------------*/ IotTaskPoolError_t IotTaskPool_RecycleJob( IotTaskPool_t taskPoolHandle, IotTaskPoolJob_t pJob ) { TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS ); _taskPool_t * pTaskPool = NULL; /* Parameter checking. */ TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( taskPoolHandle ); TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pJob ); pTaskPool = ( _taskPool_t * ) taskPoolHandle; TASKPOOL_ENTER_CRITICAL(); { /* Bail out early if this task pool is shutting down. */ if( _IsShutdownStarted( pTaskPool ) ) { status = IOT_TASKPOOL_SHUTDOWN_IN_PROGRESS; } /* Do not recycle statically allocated jobs. */ else if( ( pJob->flags & IOT_TASK_POOL_INTERNAL_STATIC ) == 0UL ) { status = _trySafeExtraction( pTaskPool, pJob, true ); } else { IotLogWarn( "Attempt to recycle a statically allocated job." ); status = IOT_TASKPOOL_ILLEGAL_OPERATION; } /* If all safety checks completed, proceed. */ if( TASKPOOL_SUCCEEDED( status ) ) { /* At this point, the job must not be in any queue or list. */ IotTaskPool_Assert( IotLink_IsLinked( &pJob->link ) == false ); _recycleJob( &pTaskPool->jobsCache, pJob ); } } TASKPOOL_EXIT_CRITICAL(); TASKPOOL_NO_FUNCTION_CLEANUP(); } /*-----------------------------------------------------------*/ IotTaskPoolError_t IotTaskPool_Schedule( IotTaskPool_t taskPoolHandle, IotTaskPoolJob_t pJob, uint32_t flags ) { TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS ); _taskPool_t * pTaskPool = NULL; /* Parameter checking. */ TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( taskPoolHandle ); TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pJob ); TASKPOOL_ON_ARG_ERROR_GOTO_CLEANUP( ( flags != 0UL ) && ( flags != IOT_TASKPOOL_JOB_HIGH_PRIORITY ) ); pTaskPool = ( _taskPool_t * ) taskPoolHandle; TASKPOOL_ENTER_CRITICAL(); { /* Bail out early if this task pool is shutting down. */ if( _IsShutdownStarted( pTaskPool ) ) { status = IOT_TASKPOOL_SHUTDOWN_IN_PROGRESS; } else { status = _trySafeExtraction( pTaskPool, pJob, false ); } /* If all safety checks completed, proceed. */ if( TASKPOOL_SUCCEEDED( status ) ) { status = _scheduleInternal( pTaskPool, pJob, flags ); } } TASKPOOL_EXIT_CRITICAL(); TASKPOOL_NO_FUNCTION_CLEANUP(); } /*-----------------------------------------------------------*/ IotTaskPoolError_t IotTaskPool_ScheduleDeferred( IotTaskPool_t taskPoolHandle, IotTaskPoolJob_t pJob, uint32_t timeMs ) { TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS ); _taskPool_t * pTaskPool = NULL; /* Parameter checking. */ TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( taskPoolHandle ); TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pJob ); pTaskPool = ( _taskPool_t * ) taskPoolHandle; if( timeMs == 0UL ) { TASKPOOL_SET_AND_GOTO_CLEANUP( IotTaskPool_Schedule( pTaskPool, pJob, 0 ) ); } TASKPOOL_ENTER_CRITICAL(); { /* Bail out early if this task pool is shutting down. */ if( _IsShutdownStarted( pTaskPool ) ) { TASKPOOL_EXIT_CRITICAL(); TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_SHUTDOWN_IN_PROGRESS ); } /* If all safety checks completed, proceed. */ if( TASKPOOL_SUCCEEDED( _trySafeExtraction( pTaskPool, pJob, false ) ) ) { IotLink_t * pTimerEventLink; uint64_t now; _taskPoolTimerEvent_t * pTimerEvent = ( _taskPoolTimerEvent_t * ) IotTaskPool_MallocTimerEvent( sizeof( _taskPoolTimerEvent_t ) ); if( pTimerEvent == NULL ) { TASKPOOL_EXIT_CRITICAL(); TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_NO_MEMORY ); } memset( pTimerEvent, 0x00, sizeof( _taskPoolTimerEvent_t ) ); now = IotClock_GetTimeMs(); pTimerEvent->link.pNext = NULL; pTimerEvent->link.pPrevious = NULL; pTimerEvent->expirationTime = now + timeMs; pTimerEvent->pJob = ( _taskPoolJob_t * ) pJob; /* Append the timer event to the timer list. */ IotListDouble_InsertSorted( &pTaskPool->timerEventsList, &pTimerEvent->link, _timerEventCompare ); /* Update the job status to 'scheduled'. */ pJob->status = IOT_TASKPOOL_STATUS_DEFERRED; /* Peek the first event in the timer event list. There must be at least one, * since we just inserted it. */ pTimerEventLink = IotListDouble_PeekHead( &pTaskPool->timerEventsList ); IotTaskPool_Assert( pTimerEventLink != NULL ); /* If the event we inserted is at the front of the queue, then * we need to reschedule the underlying timer. */ if( pTimerEventLink == &pTimerEvent->link ) { pTimerEvent = IotLink_Container( _taskPoolTimerEvent_t, pTimerEventLink, link ); _rescheduleDeferredJobsTimer( &pTaskPool->timer, pTimerEvent ); } } else { TASKPOOL_EXIT_CRITICAL(); TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_ILLEGAL_OPERATION ); } } TASKPOOL_EXIT_CRITICAL(); TASKPOOL_NO_FUNCTION_CLEANUP(); } /*-----------------------------------------------------------*/ IotTaskPoolError_t IotTaskPool_GetStatus( IotTaskPool_t taskPoolHandle, IotTaskPoolJob_t pJob, IotTaskPoolJobStatus_t * const pStatus ) { TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS ); _taskPool_t * pTaskPool = NULL; /* Parameter checking. */ TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( taskPoolHandle ); TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pJob ); TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pStatus ); pTaskPool = ( _taskPool_t * ) taskPoolHandle; *pStatus = IOT_TASKPOOL_STATUS_UNDEFINED; TASKPOOL_ENTER_CRITICAL(); { /* Bail out early if this task pool is shutting down. */ if( _IsShutdownStarted( pTaskPool ) ) { TASKPOOL_EXIT_CRITICAL(); TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_SHUTDOWN_IN_PROGRESS ); } *pStatus = pJob->status; } TASKPOOL_EXIT_CRITICAL(); TASKPOOL_NO_FUNCTION_CLEANUP(); } /*-----------------------------------------------------------*/ IotTaskPoolError_t IotTaskPool_TryCancel( IotTaskPool_t taskPoolHandle, IotTaskPoolJob_t pJob, IotTaskPoolJobStatus_t * const pStatus ) { TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS ); _taskPool_t * pTaskPool = NULL; /* Parameter checking. */ TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( taskPoolHandle ); TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pJob ); pTaskPool = ( _taskPool_t * ) taskPoolHandle; if( pStatus != NULL ) { *pStatus = IOT_TASKPOOL_STATUS_UNDEFINED; } TASKPOOL_ENTER_CRITICAL(); { /* Check again if this task pool is shutting down. */ if( _IsShutdownStarted( pTaskPool ) ) { TASKPOOL_EXIT_CRITICAL(); TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_SHUTDOWN_IN_PROGRESS ); } status = _tryCancelInternal( pTaskPool, pJob, pStatus ); } TASKPOOL_EXIT_CRITICAL(); TASKPOOL_NO_FUNCTION_CLEANUP(); } IotTaskPoolJobStorage_t * IotTaskPool_GetJobStorageFromHandle( IotTaskPoolJob_t pJob ) { return ( IotTaskPoolJobStorage_t * ) pJob; } const char * IotTaskPool_strerror( IotTaskPoolError_t status ) { const char * pMessage = NULL; switch( status ) { case IOT_TASKPOOL_SUCCESS: pMessage = "SUCCESS"; break; case IOT_TASKPOOL_BAD_PARAMETER: pMessage = "BAD PARAMETER"; break; case IOT_TASKPOOL_ILLEGAL_OPERATION: pMessage = "ILLEGAL OPERATION"; break; case IOT_TASKPOOL_NO_MEMORY: pMessage = "NO MEMORY"; break; case IOT_TASKPOOL_SHUTDOWN_IN_PROGRESS: pMessage = "SHUTDOWN IN PROGRESS"; break; case IOT_TASKPOOL_CANCEL_FAILED: pMessage = "CANCEL FAILED"; break; default: pMessage = "INVALID STATUS"; break; } return pMessage; } /* ---------------------------------------------------------------------------------------------- */ /* ---------------------------------------------------------------------------------------------- */ /* ---------------------------------------------------------------------------------------------- */ IotTaskPoolError_t _performTaskPoolParameterValidation( const IotTaskPoolInfo_t * const pInfo ) { TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS ); /* Check input values for consistency. */ TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pInfo ); TASKPOOL_ON_ARG_ERROR_GOTO_CLEANUP( pInfo->minThreads > pInfo->maxThreads ); TASKPOOL_ON_ARG_ERROR_GOTO_CLEANUP( pInfo->minThreads < 1UL ); TASKPOOL_ON_ARG_ERROR_GOTO_CLEANUP( pInfo->maxThreads < 1UL ); TASKPOOL_NO_FUNCTION_CLEANUP(); } static IotTaskPoolError_t _initTaskPoolControlStructures( const IotTaskPoolInfo_t * const pInfo, _taskPool_t * const pTaskPool ) { TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS ); bool semStartStopInit = false; bool lockInit = false; bool semDispatchInit = false; bool timerInit = false; /* Zero out all data structures. */ memset( ( void * ) pTaskPool, 0x00, sizeof( _taskPool_t ) ); /* Initialize a job data structures that require no de-initialization. * All other data structures carry a value of 'NULL' before initialization. */ IotDeQueue_Create( &pTaskPool->dispatchQueue ); IotListDouble_Create( &pTaskPool->timerEventsList ); pTaskPool->minThreads = pInfo->minThreads; pTaskPool->maxThreads = pInfo->maxThreads; pTaskPool->stackSize = pInfo->stackSize; pTaskPool->priority = pInfo->priority; _initJobsCache( &pTaskPool->jobsCache ); /* Initialize the semaphore to ensure all threads have started. */ if( IotSemaphore_Create( &pTaskPool->startStopSignal, 0, TASKPOOL_MAX_SEM_VALUE ) == true ) { semStartStopInit = true; if( IotMutex_Create( &pTaskPool->lock, true ) == true ) { lockInit = true; /* Initialize the semaphore for waiting for incoming work. */ if( IotSemaphore_Create( &pTaskPool->dispatchSignal, 0, TASKPOOL_MAX_SEM_VALUE ) == true ) { semDispatchInit = true; /* Create the timer mutex for a new connection. */ if( IotClock_TimerCreate( &( pTaskPool->timer ), _timerThread, pTaskPool ) == true ) { timerInit = true; } else { TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_NO_MEMORY ); } } else { TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_NO_MEMORY ); } } else { TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_NO_MEMORY ); } } else { TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_NO_MEMORY ); } TASKPOOL_FUNCTION_CLEANUP(); if( TASKPOOL_FAILED( status ) ) { if( semStartStopInit == true ) { IotSemaphore_Destroy( &pTaskPool->startStopSignal ); } if( lockInit == true ) { IotMutex_Destroy( &pTaskPool->lock ); } if( semDispatchInit == true ) { IotSemaphore_Destroy( &pTaskPool->dispatchSignal ); } if( timerInit == true ) { IotClock_TimerDestroy( &pTaskPool->timer ); } } TASKPOOL_FUNCTION_CLEANUP_END(); } static IotTaskPoolError_t _createTaskPool( const IotTaskPoolInfo_t * const pInfo, _taskPool_t * const pTaskPool ) { TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS ); uint32_t count; uint32_t threadsCreated = 0; bool controlInit = false; /* Initialize all internal data structure prior to creating all threads. */ TASKPOOL_ON_ERROR_GOTO_CLEANUP( _initTaskPoolControlStructures( pInfo, pTaskPool ) ); controlInit = true; IotTaskPool_Assert( pInfo->minThreads == pTaskPool->minThreads ); IotTaskPool_Assert( pInfo->maxThreads == pTaskPool->maxThreads ); /* The task pool will initialize the minimum number of threads requested by the user upon start. */ /* When a thread is created, it will signal a semaphore to signify that it is about to wait on incoming */ /* jobs. A thread can be woken up for exit or for new jobs only at that point in time. */ /* The exit condition is setting the maximum number of threads to 0. */ /* Create the minimum number of threads specified by the user, and if one fails shutdown and return error. */ for( ; threadsCreated < pTaskPool->minThreads; ) { /* Create one thread. */ if( Iot_CreateDetachedThread( _taskPoolWorker, pTaskPool, pTaskPool->priority, pTaskPool->stackSize ) == false ) { IotLogError( "Could not create worker thread! Exiting..." ); /* If creating one thread fails, set error condition and exit the loop. */ TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_NO_MEMORY ); } /* Upon successful thread creation, increase the number of active threads. */ pTaskPool->activeThreads++; ++threadsCreated; } TASKPOOL_FUNCTION_CLEANUP(); /* Wait for threads to be ready to wait on the condition, so that threads are actually able to receive messages. */ for( count = 0; count < threadsCreated; ++count ) { IotSemaphore_Wait( &pTaskPool->startStopSignal ); } /* In case of failure, wait on the created threads to exit. */ if( TASKPOOL_FAILED( status ) ) { /* Set the exit condition for the newly created threads. */ _signalShutdown( pTaskPool, threadsCreated ); /* Signal all threads to exit. */ for( count = 0; count < threadsCreated; ++count ) { IotSemaphore_Wait( &pTaskPool->startStopSignal ); } if( controlInit == true ) { _destroyTaskPool( pTaskPool ); } } TASKPOOL_FUNCTION_CLEANUP_END(); } /*-----------------------------------------------------------*/ static void _destroyTaskPool( _taskPool_t * const pTaskPool ) { IotClock_TimerDestroy( &pTaskPool->timer ); IotSemaphore_Destroy( &pTaskPool->dispatchSignal ); IotSemaphore_Destroy( &pTaskPool->startStopSignal ); IotMutex_Destroy( &pTaskPool->lock ); } /* ---------------------------------------------------------------------------------------------- */ static void _taskPoolWorker( void * pUserContext ) { IotTaskPool_Assert( pUserContext != NULL ); IotTaskPoolRoutine_t userCallback = NULL; bool running = true; /* Extract pTaskPool pointer from context. */ _taskPool_t * pTaskPool = ( _taskPool_t * ) pUserContext; /* Signal that this worker completed initialization and it is ready to receive notifications. */ IotSemaphore_Post( &pTaskPool->startStopSignal ); /* OUTER LOOP: it controls the lifetime of the worker thread: exit condition for a worker thread * is setting maxThreads to zero. A worker thread is running until the maximum number of allowed * threads is not zero and the active threads are less than the maximum number of allowed threads. */ do { bool jobAvailable; IotLink_t * pFirst; _taskPoolJob_t * pJob = NULL; /* Wait on incoming notifications. If waiting on the semaphore return with timeout, then * it means that this thread should consider shutting down for the task pool to fold back * to its minimum number of threads. */ jobAvailable = IotSemaphore_TimedWait( &pTaskPool->dispatchSignal, IOT_TASKPOOL_JOB_WAIT_TIMEOUT_MS ); /* Acquire the lock to check the exit condition, and release the lock if the exit condition is verified, * or before waiting for incoming notifications. */ TASKPOOL_ENTER_CRITICAL(); { /* If the exit condition is verified, update the number of active threads and exit the loop. */ if( _IsShutdownStarted( pTaskPool ) ) { IotLogDebug( "Worker thread exiting because shutdown condition was set." ); /* Decrease the number of active threads. */ pTaskPool->activeThreads--; TASKPOOL_EXIT_CRITICAL(); /* Signal that this worker is exiting. */ IotSemaphore_Post( &pTaskPool->startStopSignal ); /* On shutdown, abandon the OUTER LOOP immediately. */ break; } /* Check if this thread needs to exit because 'max threads' quota was exceeded. * In that case, let it run once, so we can support the case for scheduling 'high priority' * jobs that causes exceeding the max threads quota for the purpose of executing * the high-priority task. */ if( pTaskPool->activeThreads > pTaskPool->maxThreads ) { IotLogDebug( "Worker thread will exit because maximum quota was exceeded." ); /* Decrease the number of active threads pro-actively. */ pTaskPool->activeThreads--; /* Mark this thread as dead. */ running = false; } /* Check if this thread needs to exit because the worker woke up after a timeout. */ else if( jobAvailable == false ) { /* If there was a timeout, shrink back the task pool to the minimum number of threads. */ if( pTaskPool->activeThreads > pTaskPool->minThreads ) { /* After waking up from a timeout, the thread will try and pick up a new job. * But if there is no job available, the thread will exit to ensure that * the taskpool does not have more than minimum number of active threads. */ IotLogDebug( "Worker will exit because task pool is shrinking." ); /* Decrease the number of active threads pro-actively. */ pTaskPool->activeThreads--; /* Mark this thread as dead. */ running = false; } } /* Only look for a job if waiting did not timed out. */ if( jobAvailable == true ) { /* Dequeue the first job in FIFO order. */ pFirst = IotDeQueue_DequeueHead( &pTaskPool->dispatchQueue ); /* If there is indeed a job, then update status under lock, and release the lock before processing the job. */ if( pFirst != NULL ) { /* Extract the job from its link. */ pJob = IotLink_Container( _taskPoolJob_t, pFirst, link ); /* Update status to 'executing'. */ pJob->status = IOT_TASKPOOL_STATUS_COMPLETED; userCallback = pJob->userCallback; } } } TASKPOOL_EXIT_CRITICAL(); /* INNER LOOP: it controls the execution of jobs: the exit condition is the lack of a job to execute. */ while( pJob != NULL ) { /* Process the job by invoking the associated callback with the user context. * This task pool thread will not be available until the user callback returns. */ { IotTaskPool_Assert( IotLink_IsLinked( &pJob->link ) == false ); IotTaskPool_Assert( userCallback != NULL ); userCallback( pTaskPool, pJob, pJob->pUserContext ); /* This job is finished, clear its pointer. */ pJob = NULL; userCallback = NULL; /* If this thread exceeded the quota, then let it terminate. */ if( running == false ) { /* Abandon the INNER LOOP. Execution will tranfer back to the OUTER LOOP condition. */ break; } } /* Acquire the lock before updating the job status. */ TASKPOOL_ENTER_CRITICAL(); { /* Update the number of busy threads, so new requests can be served by creating new threads, up to maxThreads. */ pTaskPool->activeJobs--; /* Try and dequeue the next job in the dispatch queue. */ IotLink_t * pItem = NULL; /* Dequeue the next job from the dispatch queue. */ pItem = IotDeQueue_DequeueHead( &pTaskPool->dispatchQueue ); /* If there is no job left in the dispatch queue, update the worker status and leave. */ if( pItem == NULL ) { TASKPOOL_EXIT_CRITICAL(); /* Abandon the INNER LOOP. Execution will tranfer back to the OUTER LOOP condition. */ break; } else { pJob = IotLink_Container( _taskPoolJob_t, pItem, link ); userCallback = pJob->userCallback; } pJob->status = IOT_TASKPOOL_STATUS_COMPLETED; } TASKPOOL_EXIT_CRITICAL(); } } while( running == true ); } /* ---------------------------------------------------------------------------------------------- */ static void _initJobsCache( _taskPoolCache_t * const pCache ) { IotDeQueue_Create( &pCache->freeList ); pCache->freeCount = 0; } /*-----------------------------------------------------------*/ static void _initializeJob( _taskPoolJob_t * const pJob, IotTaskPoolRoutine_t userCallback, void * pUserContext, bool isStatic ) { pJob->link.pNext = NULL; pJob->link.pPrevious = NULL; pJob->userCallback = userCallback; pJob->pUserContext = pUserContext; if( isStatic ) { pJob->flags = IOT_TASK_POOL_INTERNAL_STATIC; pJob->status = IOT_TASKPOOL_STATUS_READY; } else { pJob->status = IOT_TASKPOOL_STATUS_READY; } } static _taskPoolJob_t * _fetchOrAllocateJob( _taskPoolCache_t * const pCache ) { _taskPoolJob_t * pJob = NULL; IotLink_t * pLink = IotListDouble_RemoveHead( &( pCache->freeList ) ); if( pLink != NULL ) { pJob = IotLink_Container( _taskPoolJob_t, pLink, link ); } /* If there is no available job in the cache, then allocate one. */ if( pJob == NULL ) { pJob = ( _taskPoolJob_t * ) IotTaskPool_MallocJob( sizeof( _taskPoolJob_t ) ); if( pJob != NULL ) { memset( pJob, 0x00, sizeof( _taskPoolJob_t ) ); } else { /* Log allocation failure for troubleshooting purposes. */ IotLogInfo( "Failed to allocate job." ); } } /* If there was a job in the cache, then make sure we keep the counters up-to-date. */ else { IotTaskPool_Assert( pCache->freeCount > 0 ); pCache->freeCount--; } return pJob; } /*-----------------------------------------------------------*/ static void _recycleJob( _taskPoolCache_t * const pCache, _taskPoolJob_t * const pJob ) { /* We should never try and recycling a job that is linked into some queue. */ IotTaskPool_Assert( IotLink_IsLinked( &pJob->link ) == false ); /* We will recycle the job if there is space in the cache. */ if( pCache->freeCount < IOT_TASKPOOL_JOBS_RECYCLE_LIMIT ) { /* Destroy user data, for added safety & security. */ pJob->userCallback = NULL; pJob->pUserContext = NULL; /* Reset the status for added debugability. */ pJob->status = IOT_TASKPOOL_STATUS_UNDEFINED; IotListDouble_InsertTail( &pCache->freeList, &pJob->link ); pCache->freeCount++; IotTaskPool_Assert( pCache->freeCount >= 1 ); } else { _destroyJob( pJob ); } } /*-----------------------------------------------------------*/ static void _destroyJob( _taskPoolJob_t * const pJob ) { /* Destroy user data, for added safety & security. */ pJob->userCallback = NULL; pJob->pUserContext = NULL; /* Reset the status for added debugability. */ pJob->status = IOT_TASKPOOL_STATUS_UNDEFINED; /* Only dispose of dynamically allocated jobs. */ if( ( pJob->flags & IOT_TASK_POOL_INTERNAL_STATIC ) == 0UL ) { IotTaskPool_FreeJob( pJob ); } } /* ---------------------------------------------------------------------------------------------- */ static bool _IsShutdownStarted( const _taskPool_t * const pTaskPool ) { return( pTaskPool->maxThreads == 0UL ); } /*-----------------------------------------------------------*/ static void _signalShutdown( _taskPool_t * const pTaskPool, uint32_t threads ) { uint32_t count; /* Set the exit condition. */ pTaskPool->maxThreads = 0; /* Broadcast to all active threads to wake-up. Active threads do check the exit condition right after waking up. */ for( count = 0; count < threads; ++count ) { IotSemaphore_Post( &pTaskPool->dispatchSignal ); } } /* ---------------------------------------------------------------------------------------------- */ static IotTaskPoolError_t _scheduleInternal( _taskPool_t * const pTaskPool, _taskPoolJob_t * const pJob, uint32_t flags ) { TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS ); bool mustGrow = false; bool shouldGrow = false; /* Update the job status to 'scheduled'. */ pJob->status = IOT_TASKPOOL_STATUS_SCHEDULED; /* Update the number of active jobs optimistically, so new requests can be served by creating new threads. */ pTaskPool->activeJobs++; /* If all threads are busy, try and create a new one. Failing to create a new thread * only has performance implications on correctly executing the scheduled job. */ uint32_t activeThreads = pTaskPool->activeThreads; if( activeThreads <= pTaskPool->activeJobs ) { /* If the job scheduling is tagged as high priority, then we must grow the task pool, * no matter how many threads are active already. */ if( ( flags & IOT_TASKPOOL_JOB_HIGH_PRIORITY ) == IOT_TASKPOOL_JOB_HIGH_PRIORITY ) { mustGrow = true; } /* Grow the task pool up to the maximum number of threads indicated by the user. * Growing the taskpool can safely fail, the existing threads will eventually pick up * the job sometimes later. */ else if( activeThreads < pTaskPool->maxThreads ) { shouldGrow = true; } else { /* Nothing to do. */ } if( ( mustGrow == true ) || ( shouldGrow == true ) ) { IotLogInfo( "Growing a Task pool with a new worker thread..." ); if( Iot_CreateDetachedThread( _taskPoolWorker, pTaskPool, pTaskPool->priority, pTaskPool->stackSize ) ) { IotSemaphore_Wait( &pTaskPool->startStopSignal ); pTaskPool->activeThreads++; } else { /* Failure to create a worker thread may not hinder functional correctness, but rather just responsiveness. */ IotLogWarn( "Task pool failed to create a worker thread." ); /* Failure to create a worker thread for a high priority job is considered a failure. */ if( mustGrow ) { TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_NO_MEMORY ); } } } } TASKPOOL_FUNCTION_CLEANUP(); if( TASKPOOL_SUCCEEDED( status ) ) { /* Append the job to the dispatch queue. * Put the job at the front, if it is a high priority job. */ if( mustGrow == true ) { IotLogDebug( "High priority job: placing job at the head of the queue." ); IotDeQueue_EnqueueHead( &pTaskPool->dispatchQueue, &pJob->link ); } else { IotDeQueue_EnqueueTail( &pTaskPool->dispatchQueue, &pJob->link ); } /* Signal a worker to pick up the job. */ IotSemaphore_Post( &pTaskPool->dispatchSignal ); } else { /* Scheduling can only fail to allocate a new worker, which is an error * only for high priority tasks. */ IotTaskPool_Assert( mustGrow == true ); /* Revert updating the number of active jobs. */ pTaskPool->activeJobs--; } TASKPOOL_FUNCTION_CLEANUP_END(); } /*-----------------------------------------------------------*/ static bool _matchJobByPointer( const IotLink_t * const pLink, void * pMatch ) { const _taskPoolJob_t * const pJob = ( _taskPoolJob_t * ) pMatch; const _taskPoolTimerEvent_t * const pTimerEvent = IotLink_Container( _taskPoolTimerEvent_t, pLink, link ); if( pJob == pTimerEvent->pJob ) { return true; } return false; } /*-----------------------------------------------------------*/ static IotTaskPoolError_t _tryCancelInternal( _taskPool_t * const pTaskPool, _taskPoolJob_t * const pJob, IotTaskPoolJobStatus_t * const pStatus ) { TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS ); bool cancelable = false; /* We can only cancel jobs that are either 'ready' (waiting to be scheduled). 'deferred', or 'scheduled'. */ IotTaskPoolJobStatus_t currentStatus = pJob->status; switch( currentStatus ) { case IOT_TASKPOOL_STATUS_READY: case IOT_TASKPOOL_STATUS_DEFERRED: case IOT_TASKPOOL_STATUS_SCHEDULED: case IOT_TASKPOOL_STATUS_CANCELED: cancelable = true; break; case IOT_TASKPOOL_STATUS_COMPLETED: /* Log message for debugging purposes. */ IotLogWarn( "Attempt to cancel a job that is already executing, or canceled." ); break; default: /* Log message for debugging purposes. */ IotLogError( "Attempt to cancel a job with an undefined state." ); break; } /* Update the returned status to the current status of the job. */ if( pStatus != NULL ) { *pStatus = currentStatus; } if( cancelable == false ) { TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_CANCEL_FAILED ); } else { /* Update the status of the job. */ pJob->status = IOT_TASKPOOL_STATUS_CANCELED; /* If the job is cancelable and its current status is 'scheduled' then unlink it from the dispatch * queue and signal any waiting threads. */ if( currentStatus == IOT_TASKPOOL_STATUS_SCHEDULED ) { /* A scheduled work items must be in the dispatch queue. */ IotTaskPool_Assert( IotLink_IsLinked( &pJob->link ) ); IotDeQueue_Remove( &pJob->link ); } /* If the job current status is 'deferred' then the job has to be pending * in the timeouts queue. */ else if( currentStatus == IOT_TASKPOOL_STATUS_DEFERRED ) { /* Find the timer event associated with the current job. There MUST be one, hence assert if not. */ IotLink_t * pTimerEventLink = IotListDouble_FindFirstMatch( &pTaskPool->timerEventsList, NULL, _matchJobByPointer, pJob ); IotTaskPool_Assert( pTimerEventLink != NULL ); if( pTimerEventLink != NULL ) { bool shouldReschedule = false; /* If the job being cancelled was at the head of the timeouts queue, then we need to reschedule the timer * with the next job timeout */ IotLink_t * pHeadLink = IotListDouble_PeekHead( &pTaskPool->timerEventsList ); if( pHeadLink == pTimerEventLink ) { shouldReschedule = true; } /* Remove the timer event associated with the canceled job and free the associated memory. */ IotListDouble_Remove( pTimerEventLink ); IotTaskPool_FreeTimerEvent( IotLink_Container( _taskPoolTimerEvent_t, pTimerEventLink, link ) ); if( shouldReschedule ) { IotLink_t * pNextTimerEventLink = IotListDouble_PeekHead( &pTaskPool->timerEventsList ); if( pNextTimerEventLink != NULL ) { _rescheduleDeferredJobsTimer( &pTaskPool->timer, IotLink_Container( _taskPoolTimerEvent_t, pNextTimerEventLink, link ) ); } } } } else { /* A cancelable job status should be either 'scheduled' or 'deferrred'. */ IotTaskPool_Assert( ( currentStatus == IOT_TASKPOOL_STATUS_READY ) || ( currentStatus == IOT_TASKPOOL_STATUS_CANCELED ) ); } } TASKPOOL_NO_FUNCTION_CLEANUP(); } /*-----------------------------------------------------------*/ static IotTaskPoolError_t _trySafeExtraction( _taskPool_t * const pTaskPool, _taskPoolJob_t * const pJob, bool atCompletion ) { TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS ); IotTaskPoolJobStatus_t currentStatus = pJob->status; /* if the job is executing, we cannot touch it. */ if( ( atCompletion == false ) && ( currentStatus == IOT_TASKPOOL_STATUS_COMPLETED ) ) { TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_ILLEGAL_OPERATION ); } /* Do not destroy a job in the dispatch queue or the timer queue without cancelling first. */ else if( ( currentStatus == IOT_TASKPOOL_STATUS_SCHEDULED ) || ( currentStatus == IOT_TASKPOOL_STATUS_DEFERRED ) ) { IotTaskPoolJobStatus_t statusAtCancellation; /* Cancellation can fail, e.g. if a job is being executed when we are trying to cancel it. */ status = _tryCancelInternal( pTaskPool, pJob, &statusAtCancellation ); switch( status ) { case IOT_TASKPOOL_SUCCESS: /* Nothing to do. */ break; case IOT_TASKPOOL_CANCEL_FAILED: IotLogWarn( "Removing a scheduled job failed because the job could not be canceled, error %s.", IotTaskPool_strerror( status ) ); status = IOT_TASKPOOL_ILLEGAL_OPERATION; break; default: /* Nothing to do. */ break; } } else if( IotLink_IsLinked( &pJob->link ) ) { /* If the job is not in the dispatch or timer queue, it must be in the cache. */ IotTaskPool_Assert( ( pJob->flags & IOT_TASK_POOL_INTERNAL_STATIC ) == 0 ); IotListDouble_Remove( &pJob->link ); } else { /* Nothing to do */ } TASKPOOL_NO_FUNCTION_CLEANUP(); } /*-----------------------------------------------------------*/ static int32_t _timerEventCompare( const IotLink_t * const pTimerEventLink1, const IotLink_t * const pTimerEventLink2 ) { const _taskPoolTimerEvent_t * const pTimerEvent1 = IotLink_Container( _taskPoolTimerEvent_t, pTimerEventLink1, link ); const _taskPoolTimerEvent_t * const pTimerEvent2 = IotLink_Container( _taskPoolTimerEvent_t, pTimerEventLink2, link ); if( pTimerEvent1->expirationTime < pTimerEvent2->expirationTime ) { return -1; } if( pTimerEvent1->expirationTime > pTimerEvent2->expirationTime ) { return 1; } return 0; } /*-----------------------------------------------------------*/ static void _rescheduleDeferredJobsTimer( IotTimer_t * const pTimer, _taskPoolTimerEvent_t * const pFirstTimerEvent ) { uint64_t delta = 0; uint64_t now = IotClock_GetTimeMs(); if( pFirstTimerEvent->expirationTime > now ) { delta = pFirstTimerEvent->expirationTime - now; } if( delta < TASKPOOL_JOB_RESCHEDULE_DELAY_MS ) { delta = TASKPOOL_JOB_RESCHEDULE_DELAY_MS; /* The job will be late... */ } IotTaskPool_Assert( delta > 0 ); if( IotClock_TimerArm( pTimer, ( uint32_t ) delta, 0 ) == false ) { IotLogWarn( "Failed to re-arm timer for task pool" ); } } /*-----------------------------------------------------------*/ static void _timerThread( void * pArgument ) { _taskPool_t * pTaskPool = ( _taskPool_t * ) pArgument; _taskPoolTimerEvent_t * pTimerEvent = NULL; IotLogDebug( "Timer thread started for task pool %p.", pTaskPool ); /* Attempt to lock the timer mutex. Return immediately if the mutex cannot be locked. * If this mutex cannot be locked it means that another thread is manipulating the * timeouts list, and will reset the timer to fire again, although it will be late. */ TASKPOOL_ENTER_CRITICAL(); { /* Check again for shutdown and bail out early in case. */ if( _IsShutdownStarted( pTaskPool ) ) { TASKPOOL_EXIT_CRITICAL(); /* Complete the shutdown sequence. */ _destroyTaskPool( pTaskPool ); IotTaskPool_FreeTaskPool( pTaskPool ); return; } /* Dispatch all deferred job whose timer expired, then reset the timer for the next * job down the line. */ for( ; ; ) { /* Peek the first event in the timer event list. */ IotLink_t * pLink = IotListDouble_PeekHead( &pTaskPool->timerEventsList ); /* Check if the timer misfired for any reason. */ if( pLink != NULL ) { /* Record the current time. */ uint64_t now = IotClock_GetTimeMs(); /* Extract the job from its envelope. */ pTimerEvent = IotLink_Container( _taskPoolTimerEvent_t, pLink, link ); /* Check if the first event should be processed now. */ if( pTimerEvent->expirationTime <= now ) { /* Remove the timer event for immediate processing. */ IotListDouble_Remove( &( pTimerEvent->link ) ); } else { /* The first element in the timer queue shouldn't be processed yet. * Arm the timer for when it should be processed and leave altogether. */ _rescheduleDeferredJobsTimer( &pTaskPool->timer, pTimerEvent ); break; } } /* If there are no timer events to process, terminate this thread. */ else { IotLogDebug( "No further timer events to process. Exiting timer thread." ); break; } IotLogDebug( "Scheduling job from timer event." ); /* Queue the job associated with the received timer event. */ ( void ) _scheduleInternal( pTaskPool, pTimerEvent->pJob, 0 ); /* Free the timer event. */ IotTaskPool_FreeTimerEvent( pTimerEvent ); } } TASKPOOL_EXIT_CRITICAL(); }