// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 // Standard Library using System; using System.Collections.Generic; using System.Threading; #if UNITY_EDITOR // Unity using UnityEditor; #endif namespace AWS.GameKit.Runtime.Utils { /// /// Utility class that wraps the calls to functions with a predefined signature inside of a thread and then manages the callbacks resulting from the calls.

/// /// Callbacks are called by using the Threader class's Update() method. This method should be called from one of Unity's Update() lifecycle methods. /// This will result in all callbacks being executed by Unity's main thread, thus making Unity calls from the callbacks safe. ///
public class Threader { /// /// This is the count of how many callbacks are currently waiting to be executed but have not been loaded into the main thread safe queue /// public int WaitingQueueCount => _waitingQueue.Count; /// /// This is the count of how many callbacks are currently waiting to be executed and are in the queue to be executed on the main thread /// public int ExecutionQueueCount => _executionQueue.Count; // Queue of callbacks waiting to be called - touched by both the functions thread and main thread private List _waitingQueue = new List(); // Queue of callbacks in the process of being called - touched only by the main thread private List _executionQueue = new List(); // Internal generational counter which increments when Awake is called - callbacks queued up from previous generations are discarded private int _generationCounter = 0; // Internal count of threaded work items outstanding - incremented on main thread, decremented on work thread private int _atomicOutstandingWorkCounter = 0; // Internal identifier for this threader private int _threaderId => GetHashCode(); /// /// Calls, in its own thread, a simple function that returns a RESULT object

/// /// RESULT: An object signature for the output object returned by the function Threader will act on ///
/// The function to wrap in a thread, signature must be RESULT function(void) /// Action that is called on the completion of the thread public virtual void Call(Action function, Action callback) { // lambda captures object references, including "this"; explicitly capture counter value int initialCounter = _generationCounter; IncrementWorkCounterAndRun(() => { function(); AddToWaitingQueue(initialCounter, callback); DecrementWorkCounter(); }); } /// /// Calls, in its own thread, a simple function that returns a RESULT object

/// /// RESULT: An object signature for the output object returned by the function Threader will act on ///
/// The function to wrap in a thread, signature must be RESULT function(void) /// Action that takes a RESULT type object and is called on the completion of the thread public virtual void Call(Func function, Action callback) { // lambda captures object references, including "this"; explicitly capture counter value int initialCounter = _generationCounter; IncrementWorkCounterAndRun(() => { RESULT result = function(); AddToWaitingQueue(initialCounter, callback, result); DecrementWorkCounter(); }); } /// /// Calls, in its own thread, a simple function that takes a DESCRIPTION object and returns a RESULT object

/// /// DESCRIPTION: An object signature for the input object sent into the function Threader will act on
/// RESULT: An object signature for the output object returned by the function Threader will act on ///
/// The function to wrap in a thread, signature must be RESULT function(DESCRIPTION) /// Object provided the the wrapped function when it is called /// Action that takes a RESULT type object and is called on the completion of the thread public virtual void Call(Func function, DESCRIPTION description, Action callback) { // lambda captures object references, including "this"; explicitly capture counter by-value int initialCounter = _generationCounter; IncrementWorkCounterAndRun(() => { RESULT result = function(description); AddToWaitingQueue(initialCounter, callback, result); DecrementWorkCounter(); }); } /// /// Calls, in its own thread, a function that calls multiple callbacks, such as a paginated function. The function must take a DESCRIPTION and callback then return void

/// /// DESCRIPTION: An object signature for the input object sent into the function Threader will act on
/// RESULT: An object signature for the output object returned by the function Threader will act on ///
/// The function to wrap in a thread, signature must be void function(DESCRIPTION, Action<RESULT>) /// Object provided the the wrapped function when it is called /// Action that takes a RESULT type object and is called when the wrapped function has a result to return public virtual void Call( Func, RETURN_RESULT> function, DESCRIPTION description, Action callback, Action onCompleteCallback) { // lambda captures object references, including "this"; explicitly capture counter by-value int initialCounter = _generationCounter; IncrementWorkCounterAndRun(() => { Action wrappedCallback = (RESULT result) => { AddToWaitingQueue(initialCounter, callback, result); }; RETURN_RESULT result = function(description, wrappedCallback); AddToWaitingQueue(initialCounter, onCompleteCallback, result); DecrementWorkCounter(); }); } /// /// Calls, in its own thread, a simple function that takes a DESCRIPTION object and returns nothing

/// /// DESCRIPTION: An object signature for the input object sent into the function Threader will act on ///
/// The function to wrap in a thread, signature must be void function(DESCRIPTION) /// Object provided the the wrapped function when it is called /// Action that takes a RESULT type object and is called on the completion of the thread public virtual void Call(Action function, DESCRIPTION description, Action callback) { // lambda captures object references, including "this"; explicitly capture counter by-value int initialCounter = _generationCounter; IncrementWorkCounterAndRun(() => { function(description); AddToWaitingQueue(initialCounter, callback); DecrementWorkCounter(); }); } /// /// Called to (re)initialize the threader when changing play modes or scenes /// public virtual void Awake() { Logging.LogInfo($"GameKit Threader {_threaderId} Awake"); // clear the queues each time this class is used to prevent any spill over when changing from playmode to editmode (and visa versa) lock (_waitingQueue) { _waitingQueue.Clear(); _executionQueue.Clear(); _generationCounter = unchecked(_generationCounter + 1); // overflow is possible, ignored. } } /// /// Called to handle any callbacks generated by the called functions /// public virtual void Update() { // move any actions waiting to execute into the execture queue lock (_waitingQueue) { _executionQueue.AddRange(_waitingQueue); _waitingQueue.Clear(); } // execute the current actions try { _executionQueue.ForEach((action) => { action(); #if UNITY_EDITOR // if there has been a callback, request that our settings window updates so that any results from that callback can display if needed SettingsWindowUpdateController.RequestUpdate(); #endif }); } finally { _executionQueue.Clear(); } } /// /// Waits for the completion of pending tasks in a loop (blocking). No-op in Editor. /// /// Time to wait before checking for task completion in each loop iteration. public void WaitForThreadedWork(int loopDelay = 50) { #if !UNITY_EDITOR // In game mode wait for pending tasks WaitForThreadedWork_Internal(loopDelay); #endif } /// /// Waits for the completion of pending tasks in a loop (blocking). /// /// Time to wait before checking for task completion in each loop iteration. private void WaitForThreadedWork_Internal(int loopDelay = 50) { while (true) { // Note: slightly abusing Interlocked.Add(x, 0) as an atomic read w/ full memory barrier. if (Interlocked.Add(ref _atomicOutstandingWorkCounter, 0) == 0) { Logging.LogInfo($"GameKit Threader {_threaderId} WaitForThreadedWork: No outstanding work remaining."); return; } Logging.LogInfo($"GameKit Threader {_threaderId} WaitForThreadedWork: Waiting for outstanding work..."); Thread.Sleep(loopDelay); } } /// /// Called to block until threaded work is complete. Callbacks remain unprocessed in the wait queue. Intended for tests only. /// public virtual void WaitForThreadedWork_TestOnly() { WaitForThreadedWork_Internal(1); } /// /// Helper method for adding a new callback action to the waiting queue in preparation for it to be called by the main thread. /// /// Generational counter value when call was initiated - callback will be ignored if counter has changed /// Callback to execute on the main thread during the update game state. Must have signature void func(RESULT) /// Object that is passed into the callback function when it is executed private void AddToWaitingQueue(int checkCounter, Action callback, RESULT result) { lock (_waitingQueue) { if (checkCounter == _generationCounter) { _waitingQueue.Add(() => callback(result)); } } } /// /// Helper method for adding a new callback action to the waiting queue in preparation for it to be called by the main thread. /// /// Generational counter value when call was initiated - callback will be ignored if counter has changed /// Callback to execute on the main thread during the update game state. Must have signature void func(void) private void AddToWaitingQueue(int checkCounter, Action callback) { lock (_waitingQueue) { if (checkCounter == _generationCounter) { _waitingQueue.Add(callback); } } } /// /// Helper method for tracking when threaded work item has completed (adjusting _atomicOutstandingWorkCounter) /// private void DecrementWorkCounter() { Interlocked.Decrement(ref _atomicOutstandingWorkCounter); } /// /// Helper method for scheduling an async threaded action from the main thread (adjusting _atomicOutstandingWorkCounter) /// /// Action to be executed asynchronously private void IncrementWorkCounterAndRun(Action action) { Interlocked.Increment(ref _atomicOutstandingWorkCounter); bool queued = false; try { queued = ThreadPool.QueueUserWorkItem(delegate (object param) { ((Action)param)(); }, action); } finally { // If QueueUserWorkItem throws for any reason, do not leave the counter permanently incremented if (!queued) { DecrementWorkCounter(); } } } } }