"""
Wraps watchdog to observe file system for any change.
"""
import logging
import platform
import threading
import uuid
from abc import ABC, abstractmethod
from pathlib import Path
from threading import Lock, Thread
from typing import Callable, Dict, List, Optional

import docker
from docker import DockerClient
from docker.errors import ImageNotFound
from docker.types import CancellableStream
from watchdog.events import FileSystemEvent, FileSystemEventHandler, PatternMatchingEventHandler
from watchdog.observers import Observer
from watchdog.observers.api import BaseObserver, ObservedWatch

from samcli.cli.global_config import Singleton
from samcli.lib.constants import DOCKER_MIN_API_VERSION
from samcli.lib.utils.hash import dir_checksum, file_checksum
from samcli.lib.utils.packagetype import IMAGE, ZIP
from samcli.local.lambdafn.config import FunctionConfig

LOG = logging.getLogger(__name__)
# Windows API error returned when attempting to perform I/O on closed pipe
BROKEN_PIPE_ERROR = 109


class ResourceObserver(ABC):
    @abstractmethod
    def watch(self, resource: str) -> None:
        """
        Start watching the input resource.

        Parameters
        ----------
        resource: str
            The resource that should be observed for modifications

        Raises
        ------
        ObserverException:
            if the input resource is not exist
        """

    @abstractmethod
    def unwatch(self, resource: str) -> None:
        """
        Remove the input resource form the observed resorces

        Parameters
        ----------
        resource: str
            The resource to be unobserved
        """

    @abstractmethod
    def start(self):
        """
        Start Observing.
        """

    @abstractmethod
    def stop(self):
        """
        Stop Observing.
        """


class ObserverException(Exception):
    """
    Exception raised when unable to observe the input Lambda Function.
    """


class LambdaFunctionObserver:
    """
    A class that will observe Lambda Function sources regardless if the source is code or image
    """

    def __init__(self, on_change: Callable) -> None:
        """
        Initialize the Image observer
        Parameters
        ----------
        on_change:
            Reference to the function that will be called if there is a change in aby of the observed image
        """
        self._observers: Dict[str, ResourceObserver] = {
            ZIP: FileObserver(self._on_zip_change),
            IMAGE: ImageObserver(self._on_image_change),
        }

        self._observed_functions: Dict[str, Dict[str, List[FunctionConfig]]] = {
            ZIP: {},
            IMAGE: {},
        }

        def _get_zip_lambda_function_paths(function_config: FunctionConfig) -> List[str]:
            """
            Returns a list of ZIP package type lambda function source code paths

            Parameters
            ----------
            function_config: FunctionConfig
                The lambda function configuration that will be observed

            Returns
            -------
            list[str]
                List of lambda functions' source code paths to be observed
            """
            code_paths = [function_config.code_abs_path]
            if function_config.layers:
                # Non-local layers will not have a codeuri property and don't need to be observed
                code_paths += [layer.codeuri for layer in function_config.layers if layer.codeuri]
            return code_paths

        def _get_image_lambda_function_image_names(function_config: FunctionConfig) -> List[str]:
            """
            Returns a list of Image package type lambda function image names

            Parameters
            ----------
            function_config: FunctionConfig
                The lambda function configuration that will be observed

            Returns
            -------
            list[str]
                List of lambda functions' image names to be observed
            """
            return [function_config.imageuri]

        self.get_resources: Dict[str, Callable] = {
            ZIP: _get_zip_lambda_function_paths,
            IMAGE: _get_image_lambda_function_image_names,
        }

        self._input_on_change: Callable = on_change
        self._watch_lock: Lock = threading.Lock()

    def _on_zip_change(self, paths: List[str]) -> None:
        """
        It got executed once there is a change in one of the watched lambda functions' source code.

        Parameters
        ----------
        paths: list[str]
            the changed lambda functions' source code paths
        """
        self._on_change(paths, ZIP)

    def _on_image_change(self, images: List[str]) -> None:
        """
        It got executed once there is a change in one of the watched lambda functions' images.

        Parameters
        ----------
        images: list[str]
            the changed lambda functions' images names
        """
        self._on_change(images, IMAGE)

    def _on_change(self, resources: List[str], package_type: str) -> None:
        """
        It got executed once there is a change in one of the watched lambda functions' resources.

        Parameters
        ----------
        resources: list[str]
            the changed lambda functions' resources (either source code path pr image names)
        package_type: str
            determine if the changed resource is a source code path or an image name
        """
        with self._watch_lock:
            changed_functions: List[FunctionConfig] = []
            for resource in resources:
                if self._observed_functions[package_type].get(resource, None):
                    changed_functions += self._observed_functions[package_type][resource]
            self._input_on_change(changed_functions)

    def watch(self, function_config: FunctionConfig) -> None:
        """
        Start watching the input lambda function.

        Parameters
        ----------
        function_config: FunctionConfig
            The lambda function configuration that will be observed

        Raises
        ------
        ObserverException:
            if not able to observe the input function source path/image
        """
        with self._watch_lock:
            if self.get_resources.get(function_config.packagetype, None):
                resources = self.get_resources[function_config.packagetype](function_config)
                for resource in resources:
                    functions = self._observed_functions[function_config.packagetype].get(resource, [])
                    functions += [function_config]
                    self._observed_functions[function_config.packagetype][resource] = functions
                    self._observers[function_config.packagetype].watch(resource)

    def unwatch(self, function_config: FunctionConfig) -> None:
        """
        Remove the input lambda function from the observed functions

        Parameters
        ----------
        function_config: FunctionConfig
            The lambda function configuration that will be observed
        """
        if self.get_resources.get(function_config.packagetype, None):
            resources = self.get_resources[function_config.packagetype](function_config)
            for resource in resources:
                functions = self._observed_functions[function_config.packagetype].get(resource, [])
                if function_config in functions:
                    functions.remove(function_config)
                if not functions:
                    self._observed_functions[function_config.packagetype].pop(resource, None)
                    self._observers[function_config.packagetype].unwatch(resource)

    def start(self):
        """
        Start Observing.
        """
        for _, observer in self._observers.items():
            observer.start()

    def stop(self):
        """
        Stop Observing.
        """
        for _, observer in self._observers.items():
            observer.stop()


class ImageObserverException(ObserverException):
    """
    Exception raised when unable to observe the input image.
    """


def broken_pipe_handler(func: Callable) -> Callable:
    """
    Decorator to handle the Windows API BROKEN_PIPE_ERROR error.

    Parameters
    ----------
    func: Callable
        The method to wrap around
    """

    # NOTE: As of right now, this checks for the Windows API error 109
    # specifically. This could be abstracted to potentially utilize a
    # callback method to further customize this.

    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception as exception:
            # handle a pywintypes exception that gets thrown when trying to exit
            # from a command that utilizes ImageObserver(s) in
            # EAGER container mode (start-api, start-lambda)

            # all containers would have been stopped, and deleted, however
            # the pipes to those containers are still loaded somewhere

            if not platform.system() == "Windows":
                raise

            win_error = getattr(exception, "winerror", None)

            if not win_error == BROKEN_PIPE_ERROR:
                raise

            LOG.debug("Handling BROKEN_PIPE_ERROR pywintypes, exception ignored gracefully")

    return wrapper


class ImageObserver(ResourceObserver):
    """
    A class that will observe some docker images for any change.
    """

    def __init__(self, on_change: Callable) -> None:
        """
        Initialize the Image observer
        Parameters
        ----------
        on_change:
            Reference to the function that will be called if there is a change in aby of the observed image
        """
        self._observed_images: Dict[str, str] = {}
        self._input_on_change: Callable = on_change
        self.docker_client: DockerClient = docker.from_env(version=DOCKER_MIN_API_VERSION)
        self.events: CancellableStream = self.docker_client.events(filters={"type": "image"}, decode=True)
        self._images_observer_thread: Optional[Thread] = None
        self._lock: Lock = threading.Lock()

    @broken_pipe_handler
    def _watch_images_events(self):
        for event in self.events:
            if event.get("Action", None) != "tag":
                continue
            image_name = event["Actor"]["Attributes"]["name"]
            if self._observed_images.get(image_name, None):
                new_image_id = event["id"]
                if new_image_id != self._observed_images[image_name]:
                    self._observed_images[image_name] = new_image_id
                    self._input_on_change([image_name])

    def watch(self, resource: str) -> None:
        """
        Start watching the input image.

        Parameters
        ----------
        resource: str
            The container image name that will be observed

        Raises
        ------
        ImageObserverException:
            if the input image_name is not exist
        """
        try:
            image = self.docker_client.images.get(resource)
            self._observed_images[resource] = image.id
        except ImageNotFound as exc:
            raise ImageObserverException("Can not observe non exist image") from exc

    def unwatch(self, resource: str) -> None:
        """
        Remove the input image form the observed images

        Parameters
        ----------
        resource: str
            The container image name to be unobserved
        """
        self._observed_images.pop(resource, None)

    def start(self):
        """
        Start Observing.
        """
        with self._lock:
            if not self._images_observer_thread:
                self._images_observer_thread = threading.Thread(target=self._watch_images_events, daemon=True)
                self._images_observer_thread.start()

    def stop(self):
        """
        Stop Observing.
        """
        with self._lock:
            self.events.close()
            # wait until the images observer thread got stopped
            while self._images_observer_thread and self._images_observer_thread.is_alive():
                pass


class FileObserverException(ObserverException):
    """
    Exception raised when unable to observe the input path.
    """


class FileObserver(ResourceObserver):
    """
    A class that will Wrap the Singleton File Observer.
    """

    def __init__(self, on_change: Callable) -> None:
        """
        Initialize the file observer
        Parameters
        ----------
        on_change:
            Reference to the function that will be called if there is a change in aby of the observed paths
        """
        self._group = str(uuid.uuid4())
        self._single_file_observer = SingletonFileObserver()
        self._single_file_observer.add_group(self._group, on_change)

    def watch(self, resource: str) -> None:
        self._single_file_observer.watch(resource, self._group)

    def unwatch(self, resource: str) -> None:
        self._single_file_observer.unwatch(resource, self._group)

    def start(self):
        self._single_file_observer.start()

    def stop(self):
        self._single_file_observer.stop()


class SingletonFileObserver(metaclass=Singleton):
    """
    A Singleton class that will observe some file system paths for any change for multiple purposes.
    """

    def __init__(self) -> None:
        """
        Initialize the file observer
        """
        self._observed_paths_per_group: Dict[str, Dict[str, str]] = {}
        self._observed_groups_handlers: Dict[str, Callable] = {}
        self._observed_watches: Dict[str, ObservedWatch] = {}
        self._watch_dog_observed_paths: Dict[str, List[str]] = {}
        self._observer: BaseObserver = Observer()
        self._code_modification_handler: PatternMatchingEventHandler = PatternMatchingEventHandler(
            patterns=["*"], ignore_patterns=[], ignore_directories=False
        )

        self._code_deletion_handler: PatternMatchingEventHandler = PatternMatchingEventHandler(
            patterns=["*"], ignore_patterns=[], ignore_directories=False
        )

        self._code_modification_handler.on_modified = self.on_change
        self._code_deletion_handler.on_deleted = self.on_change
        self._watch_lock = threading.Lock()
        self._lock: Lock = threading.Lock()

    def on_change(self, event: FileSystemEvent) -> None:
        """
        It got executed once there is a change in one of the paths that watchdog is observing.
        This method will check if any of the input paths is really changed, and based on that it will
        invoke the input on_change function with the changed paths

        Parameters
        ----------
        event: watchdog.events.FileSystemEvent
            Determines that there is a change happened to some file/dir in the observed paths
        """
        with self._watch_lock:
            LOG.debug("a %s change got detected in path %s", event.event_type, event.src_path)
            for group, _observed_paths in self._observed_paths_per_group.items():
                if event.event_type == "deleted":
                    observed_paths = [
                        path
                        for path in _observed_paths
                        if path == event.src_path
                        or path in self._watch_dog_observed_paths.get(f"{event.src_path}_False", [])
                    ]
                else:
                    observed_paths = [path for path in _observed_paths if event.src_path.startswith(path)]

                if not observed_paths:
                    continue

                LOG.debug("affected paths of this change %s", observed_paths)
                changed_paths = []
                for path in observed_paths:
                    path_obj = Path(path)
                    # The path got deleted
                    if not path_obj.exists():
                        _observed_paths.pop(path, None)
                        changed_paths += [path]
                    else:
                        new_checksum = calculate_checksum(path)
                        if new_checksum and new_checksum != _observed_paths.get(path, None):
                            changed_paths += [path]
                            _observed_paths[path] = new_checksum
                        else:
                            LOG.debug("the path %s content does not change", path)

                if changed_paths:
                    self._observed_groups_handlers[group](changed_paths)

    def add_group(self, group: str, on_change: Callable) -> None:
        """
        Add new group to file observer. This enable FileObserver to watch the same path for
        multiple purposes.

        Parameters
        ----------
        group: str
            unique string define a new group of paths to be watched.

        on_change: Callable
            The method to be called in case if any path related to this group got changed.
        """
        if group in self._observed_paths_per_group:
            raise Exception(f"The group {group} of paths is already watched")
        self._observed_paths_per_group[group] = {}
        self._observed_groups_handlers[group] = on_change

    def watch(self, resource: str, group: str) -> None:
        """
        Start watching the input path. File Observer will keep track of the input path with its hash, to check it later
        if it got really changed or not.
        File Observer will send the parent path to watchdog for to be observed to avoid the missing events if the input
        paths got deleted.

        Parameters
        ----------
        resource: str
            The file/dir path to be observed

        group: str
            unique string define a new group of paths to be watched.

        Raises
        ------
        FileObserverException:
            if the input path is not exist
        """
        with self._watch_lock:
            path_obj = Path(resource)
            if not path_obj.exists():
                raise FileObserverException("Can not observe non exist path")

            _observed_paths = self._observed_paths_per_group[group]
            _check_sum = calculate_checksum(resource)
            if not _check_sum:
                raise Exception(f"Failed to calculate the hash of resource {resource}")
            _observed_paths[resource] = _check_sum

            LOG.debug("watch resource %s", resource)
            # recursively watch the input path, and all child path for any modification
            self._watch_path(resource, resource, self._code_modification_handler, True)

            LOG.debug("watch resource %s's parent %s", resource, str(path_obj.parent))
            # watch only the direct parent path child directories for any deletion
            # Parent directory watching is needed, as if the input path got deleted,
            # watchdog will not send an event for it
            self._watch_path(str(path_obj.parent), resource, self._code_deletion_handler, False)

    def _watch_path(
        self, watch_dog_path: str, original_path: str, watcher_handler: FileSystemEventHandler, recursive: bool
    ) -> None:
        """
        update the observed paths data structure, and call watch dog observer to observe the input watch dog path
        if it is not observed before

        Parameters
        ----------
        watch_dog_path: str
            The file/dir path to be observed by watch dog
        original_path: str
            The original input file/dir path to be observed
        watcher_handler: FileSystemEventHandler
            The watcher event handler
        recursive: bool
            determines if we need to watch the path, and all children paths recursively, or just the direct children
            paths
        """

        # Allow watching the same path in 2 Modes recursivly, and non-recusrsivly.
        # here, we need to only watch the input path in a specific recursive mode
        original_watch_dog_path = watch_dog_path
        watch_dog_path = f"{watch_dog_path}_{recursive}"
        child_paths = self._watch_dog_observed_paths.get(watch_dog_path, [])
        first_time = not bool(child_paths)
        if original_path not in child_paths:
            child_paths += [original_path]
        self._watch_dog_observed_paths[watch_dog_path] = child_paths
        if first_time:
            LOG.debug("Create Observer for resource %s with recursive %s", original_watch_dog_path, recursive)
            self._observed_watches[watch_dog_path] = self._observer.schedule(
                watcher_handler, original_watch_dog_path, recursive=recursive
            )

    def unwatch(self, resource: str, group: str) -> None:
        """
        Remove the input path form the observed paths, and stop watching this path.

        Parameters
        ----------
        resource: str
            The file/dir path to be unobserved
        group: str
            unique string define a new group of paths to be watched.
        """
        path_obj = Path(resource)

        LOG.debug("unwatch resource %s", resource)
        # unwatch input path
        self._unwatch_path(resource, resource, group, True)

        LOG.debug("unwatch resource %s's parent %s", resource, str(path_obj.parent))
        # unwatch parent path
        self._unwatch_path(str(path_obj.parent), resource, group, False)

    def _unwatch_path(self, watch_dog_path: str, original_path: str, group: str, recursive: bool) -> None:
        """
        update the observed paths data structure, and call watch dog observer to unobserve the input watch dog path
        if it is not observed before

        Parameters
        ----------
        watch_dog_path: str
            The file/dir path to be unobserved by watch dog
        original_path: str
            The original input file/dir path to be unobserved
        group: str
            unique string define a new group of paths to be watched.
        recursive: bool
            determines if we need to watch the path, and all children paths recursively, or just the direct children
            paths
        """

        # Allow watching the same path in 2 Modes recursivly, and non-recusrsivly.
        # here, we need to only stop watching the input path in a specific recursive mode
        original_watch_dog_path = watch_dog_path
        watch_dog_path = f"{watch_dog_path}_{recursive}"
        _observed_paths = self._observed_paths_per_group[group]
        child_paths = self._watch_dog_observed_paths.get(watch_dog_path, [])
        if original_path in child_paths:
            child_paths.remove(original_path)
            _observed_paths.pop(original_path, None)
        if not child_paths:
            self._watch_dog_observed_paths.pop(watch_dog_path, None)
            if self._observed_watches.get(watch_dog_path, None):
                LOG.debug("Unschedule Observer for resource %s with recursive %s", original_watch_dog_path, recursive)
                self._observer.unschedule(self._observed_watches[watch_dog_path])
                self._observed_watches.pop(watch_dog_path, None)

    def start(self):
        """
        Start Observing.
        """
        with self._lock:
            if not self._observer.is_alive():
                self._observer.start()

    def stop(self):
        """
        Stop Observing.
        """
        with self._lock:
            if self._observer.is_alive():
                self._observer.stop()


def calculate_checksum(path: str) -> Optional[str]:
    try:
        path_obj = Path(path)
        if path_obj.is_file():
            checksum = file_checksum(path)
        else:
            checksum = dir_checksum(path)
        return checksum
    except Exception:
        return None