"""SyncFlowExecutor that will run continuously until stop is called.""" import logging import time from concurrent.futures.thread import ThreadPoolExecutor from dataclasses import dataclass from typing import Callable, Optional from samcli.lib.sync.exceptions import SyncFlowException from samcli.lib.sync.sync_flow import SyncFlow from samcli.lib.sync.sync_flow_executor import SyncFlowExecutor, SyncFlowFuture, SyncFlowTask, default_exception_handler LOG = logging.getLogger(__name__) @dataclass(frozen=True, eq=True) class DelayedSyncFlowTask(SyncFlowTask): """Data struct for individual SyncFlow execution tasks""" # Time in seconds of when the task was initially queued queue_time: float # Number of seconds this task should stay in queue before being executed wait_time: float class ContinuousSyncFlowExecutor(SyncFlowExecutor): """SyncFlowExecutor that continuously runs and executes SyncFlows. Call stop() to stop the executor""" # Flag for whether the executor should be stopped at the next available time _stop_flag: bool def __init__(self) -> None: super().__init__() self._stop_flag = False def stop(self, should_stop=True) -> None: """Stop executor after all current SyncFlows are finished.""" with self._flow_queue_lock: self._stop_flag = should_stop if should_stop: self._flow_queue.queue.clear() def should_stop(self) -> bool: """ Returns ------- bool Should executor stop execution on the next available time. """ return self._stop_flag def _can_exit(self): return self.should_stop() and super()._can_exit() def _submit_sync_flow_task( self, executor: ThreadPoolExecutor, sync_flow_task: SyncFlowTask ) -> Optional[SyncFlowFuture]: """Submit SyncFlowTask to be executed by ThreadPoolExecutor and return its future Adds additional time checks for DelayedSyncFlowTask Parameters ---------- executor : ThreadPoolExecutor THreadPoolExecutor to be used for execution sync_flow_task : SyncFlowTask SyncFlowTask to be executed. Returns ------- Optional[SyncFlowFuture] Returns SyncFlowFuture generated by the SyncFlowTask. Can be None if the task cannot be executed yet. """ if ( isinstance(sync_flow_task, DelayedSyncFlowTask) and sync_flow_task.wait_time + sync_flow_task.queue_time > time.time() ): return None return super()._submit_sync_flow_task(executor, sync_flow_task) def _add_sync_flow_task(self, task: SyncFlowTask) -> None: """Add SyncFlowTask to the queue Skips if the executor is in the state of being shut down. Parameters ---------- task : SyncFlowTask SyncFlowTask to be added. """ if self.should_stop(): LOG.debug( "%s is skipped from queueing as executor is in the process of stopping.", task.sync_flow.log_prefix ) return super()._add_sync_flow_task(task) def add_delayed_sync_flow(self, sync_flow: SyncFlow, dedup: bool = True, wait_time: float = 0) -> None: """Add a SyncFlow to queue to be executed Locks will be set with LockDistributor Parameters ---------- sync_flow : SyncFlow SyncFlow to be executed dedup : bool SyncFlow will not be added if this flag is True and has a duplicate in the queue wait_time : float Minimum number of seconds before SyncFlow executes """ self._add_sync_flow_task(DelayedSyncFlowTask(sync_flow, dedup, time.time(), wait_time)) def execute( self, exception_handler: Optional[Callable[[SyncFlowException], None]] = default_exception_handler ) -> None: """Blocking continuous execution of the SyncFlows Parameters ---------- exception_handler : Optional[Callable[[Exception], None]], optional Function to be called if an exception is raised during the execution of a SyncFlow, by default default_exception_handler.__func__ """ super().execute(exception_handler=exception_handler) self.stop(should_stop=False)