//
// Copyright Amazon.com Inc. or its affiliates.
// All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//

import Foundation
#if canImport(Combine)
import Combine
#endif

/// An AmplifyOperation that emits InProcess values intermittently during the operation.
///
/// Unlike a regular `AmplifyOperation`, which emits a single Result at the completion of the operation's work, an
/// `AmplifyInProcessReportingOperation` may emit intermediate values while its work is ongoing. These values could be
/// incidental to the operation (such as a `Storage.downloadFile` operation reporting Progress values periodically as
/// the download proceeds), or they could be the primary delivery mechanism for an operation (such as a
/// `GraphQLSubscriptionOperation`'s emitting new subscription values).
open class AmplifyInProcessReportingOperation<
    Request: AmplifyOperationRequest,
    InProcess,
    Success,
    Failure: AmplifyError
>: AmplifyOperation<Request, Success, Failure> {
    public typealias InProcess = InProcess

    var inProcessListenerUnsubscribeToken: UnsubscribeToken?

    /// Local storage for the result publisher associated with this operation.
    /// We derive the `inProcessPublisher` computed property from this value.
    /// Amplify V2 can expect Combine to be available.
#if canImport(Combine)
    var inProcessSubject: PassthroughSubject<InProcess, Never>!
#endif

    public init(categoryType: CategoryType,
                eventName: HubPayloadEventName,
                request: Request,
                inProcessListener: InProcessListener? = nil,
                resultListener: ResultListener? = nil) {

        super.init(categoryType: categoryType, eventName: eventName, request: request, resultListener: resultListener)

#if canImport(Combine)
        inProcessSubject = PassthroughSubject<InProcess, Never>()
#endif

        // If the inProcessListener is present, we need to register a hub event listener for it, and ensure we
        // automatically unsubscribe when we receive a completion event for the operation
        if let inProcessListener = inProcessListener {
            self.inProcessListenerUnsubscribeToken = subscribe(inProcessListener: inProcessListener)
        }
    }

    /// Registers an in-process listener for this operation. If the operation
    /// completes, this listener will automatically be removed.
    ///
    /// - Parameter inProcessListener: The listener for in-process events
    /// - Returns: an UnsubscribeToken that can be used to remove the listener from Hub
    func subscribe(inProcessListener: @escaping InProcessListener) -> UnsubscribeToken {
        let channel = HubChannel(from: categoryType)
        let filterById = HubFilters.forOperation(self)

        var inProcessListenerToken: UnsubscribeToken!
        let inProcessHubListener: HubListener = { payload in
            if let inProcessData = payload.data as? InProcess {
                inProcessListener(inProcessData)
                return
            }
            // Remove listener if we see a result come through
            if payload.data is OperationResult {
                Amplify.Hub.removeListener(inProcessListenerToken)
            }
        }

        inProcessListenerToken = Amplify.Hub.listen(to: channel,
                                                    isIncluded: filterById,
                                                    listener: inProcessHubListener)

        return inProcessListenerToken
    }

    /// Classes that override this method must emit a completion to the `inProcessPublisher` upon cancellation
    open override func cancel() {
        super.cancel()
#if canImport(Combine)
        publish(completion: .finished)
#endif
    }

    /// Invokes `super.dispatch()`. On iOS 13+, this method first publishes a
    /// `.finished` completion on the in-process publisher.
    ///
    /// - Parameter result: The OperationResult to dispatch to the hub as part of the
    ///   HubPayload
    public override func dispatch(result: OperationResult) {
#if canImport(Combine)
        publish(completion: .finished)
#endif
        super.dispatch(result: result)
    }

}

public extension AmplifyInProcessReportingOperation {
    /// Convenience typealias for the `inProcessListener` callback submitted during Operation creation
    typealias InProcessListener = (InProcess) -> Void

    /// Dispatches an event to the hub. Internally, creates an
    /// `AmplifyOperationContext` object from the operation's `id`, and `request`
    /// - Parameter result: The OperationResult to dispatch to the hub as part of the HubPayload
    func dispatchInProcess(data: InProcess) {
#if canImport(Combine)
        publish(inProcessValue: data)
#endif

        let channel = HubChannel(from: categoryType)
        let context = AmplifyOperationContext(operationId: id, request: request)
        let payload = HubPayload(eventName: eventName, context: context, data: data)
        Amplify.Hub.dispatch(to: channel, payload: payload)
    }

    /// Removes the listener that was registered during operation instantiation
    func removeInProcessResultListener() {
        if let inProcessListenerUnsubscribeToken = inProcessListenerUnsubscribeToken {
            Amplify.Hub.removeListener(inProcessListenerUnsubscribeToken)
        }
    }

}