/*
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 * SPDX-License-Identifier: Apache-2.0
 */

package software.amazon.awssdk.eventstreamrpc;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.crt.eventstream.Header;
import software.amazon.awssdk.crt.eventstream.MessageFlags;
import software.amazon.awssdk.crt.eventstream.MessageType;
import software.amazon.awssdk.crt.eventstream.ServerConnectionContinuationHandler;
import software.amazon.awssdk.eventstreamrpc.model.EventStreamJsonMessage;
import software.amazon.awssdk.eventstreamrpc.model.EventStreamOperationError;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;

/**
 * Class to process continuations
 */
public abstract class OperationContinuationHandler<RequestType extends EventStreamJsonMessage,
        ResponseType extends EventStreamJsonMessage,
        StreamingRequestType extends EventStreamJsonMessage,
        StreamingResponseType extends EventStreamJsonMessage>
        extends ServerConnectionContinuationHandler implements StreamEventPublisher<StreamingResponseType> {
    private static final Logger LOGGER = LoggerFactory.getLogger(OperationContinuationHandler.class);

    private final OperationContinuationHandlerContext context;
    private List<Header> initialRequestHeaders;
    private RequestType initialRequest;

    /**
     * Returns the operation model context
     * @return the operation model context
     */
    abstract public OperationModelContext<RequestType, ResponseType,
            StreamingRequestType, StreamingResponseType> getOperationModelContext();

    /**
     * Constructs a new OperationContinuationHandler from the given context
     * @param context The operation OperationContinuationHandlerContext to use
     */
    public OperationContinuationHandler(final OperationContinuationHandlerContext context) {
        super(context.getContinuation());
        this.context = context;
    }

    @Override
    final protected void onContinuationClosed() {
        LOGGER.debug("{} stream continuation closed.", getOperationName());
        continuation.close();
        try {
            onStreamClosed();
        } catch (Exception e) {
            LOGGER.error("{} threw {}: {}", getOperationName(), e.getClass().getCanonicalName(), e.getMessage());
        }
    }

    /**
     * Returns the operation model context request type class
     * @return The operation model context request type class
     */
    final protected Class<RequestType> getRequestClass() {
        return getOperationModelContext().getRequestTypeClass();
    }

    /**
     * Returns the operation model context response type class
     * @return The operation model context response type class
     */
    final protected Class<ResponseType> getResponseClass() {
        return getOperationModelContext().getResponseTypeClass();
    }

    /**
     * Returns the operation model context streaming request type class
     * @return the operation model context streaming request type class
     */
    final protected Class<StreamingRequestType> getStreamingRequestClass() {
        return getOperationModelContext().getStreamingRequestTypeClass().get();
    }

    /**
     * Returns the operation model context streamining response type class
     * @return the operation model context streamining response type class
     */
    final protected Class<StreamingResponseType> getStreamingResponseClass() {
        return getOperationModelContext().getStreamingResponseTypeClass().get();
    }

    /**
     * Returns the operation name implemented by the handler. Generated code should populate this
     *
     * @return the operation name implemented by the handler.
     */
    private String getOperationName() {
        return getOperationModelContext().getOperationName();
    }

    /**
     * Called when the underlying continuation is closed. Gives operations a chance to cleanup whatever resources may be
     * on the other end of an open stream. Also invoked when an underlying ServerConnection is closed associated with
     * the stream/continuation
     */
    protected abstract void onStreamClosed();

    /**
     * Should return true if operation has either streaming input or output. If neither, return false and only allows
     * an initial-request -> initial->response before closing the continuation.
     *
     * @return true if operation has either streaming input or output
     */
    final protected boolean isStreamingOperation() {
        return getOperationModelContext().isStreamingOperation();
    }

    /**
     * Main request handler for any operation to do work on an initial request. Streaming response operations still must
     * send an initial-response which is empty.
     * <p>
     * Implementers should not call sendStreamEvent() during handleRequest() to send a streaming response after an
     * initial-response. This would violate the sequence of messages expected to occur for the specific operation.
     * Override "afterHandleRequest()" as a way of being informed of the quickest possible time to sent a stream
     * response after handleRequest returns.
     *
     * @param request The request to handle
     * @return The ResponseType after handling the request
     */
    public abstract ResponseType handleRequest(final RequestType request);

    /**
     * Same as handleRequest, but returns a future rather than running immediately on the SDK's thread.
     * If this method returns null, then handleRequest will be called.
     *
     * @param request The request to handle
     * @return A future containing the ResponseType after handling the request
     */
    public CompletableFuture<ResponseType> handleRequestAsync(final RequestType request) {
        return null;
    }

    /**
     * Override to appropriately enforce stream responses are sent after the initial response. This only gets called if
     * handleRequest returns normally and starts to send a response.
     */
    public void afterHandleRequest() {
    }

    /**
     * Handle an incoming stream event from the connected client on the operation.
     * <p>
     * If the implementation throws an exception, the framework will respond with the modeled exception to the client,
     * if it is modeled. If it is not modeled, it will respond with an internal error and log appropriately. Either
     * case, throwing an exception will result in closing the stream. To keep the stream open, do not throw
     *
     * @param streamRequestEvent The stream request event to handle
     */
    public abstract void handleStreamEvent(final StreamingRequestType streamRequestEvent);

    /**
     * Retrieves the underlying EventStream request headers for inspection. Pulling these headers out shouldn't be
     * necessary as it means operations are aware of the underlying protocol. Any headers needed to be pulled are
     * candidates for what should be in the service model directly
     *
     * @return The underlying EventStream request headers
     */
    final protected List<Header> getInitialRequestHeaders() {
        return initialRequestHeaders;   //not a defensive copy
    }

    /**
     * Retrieves the initial request object that initiated the stream
     * <p>
     * For use in handler implementations if initial request is wanted to handle further in-out events May be unecessary
     * memory, but also initial request may be used by framework to log errors with 'request-id' like semantics
     *
     * @return The initial request object that initiated the stream
     */
    final protected RequestType getInitialRequest() {
        return initialRequest;
    }

    /**
     * Retrieves the operation handler context. Use for inspecting state outside of the limited scope of this operation
     * handler.
     *
     * @return The operation handler context
     */
    final protected OperationContinuationHandlerContext getContext() {
        return context;
    }

    /**
     * TODO: close stream should be sent with the final message, or separately? Either should be fine
     *
     * @return A future that completes when the stream is closed
     */
    @Override
    final public CompletableFuture<Void> closeStream() {
        LOGGER.debug("[{}] closing stream", getOperationName());
        return continuation.sendMessage(null, null, MessageType.ApplicationMessage,
                MessageFlags.TerminateStream.getByteValue()).whenComplete((res, ex) -> {
            continuation.close();
            if (ex == null) {
                LOGGER.debug("[{}] closed stream", getOperationName());
            } else {
                LOGGER.error("[{}] {} error closing stream: {}", getOperationName(), ex.getClass().getName(),
                        ex.getMessage());
            }
        });
    }

    /**
     * Used so other processes/events going on in the server can push events back into this operation's opened
     * continuation
     *
     * @param streamingResponse A future that completes when the stream event message is sent
     */
    final public CompletableFuture<Void> sendStreamEvent(final StreamingResponseType streamingResponse) {
        return sendMessage(streamingResponse, false);
    }

    /**
     * Sends a message through the given continuation. If close is true, then the continuation is closed once finished
     * @param message The message to send
     * @param close If true, the continuation is closed after the message is sent
     * @return A future that completes when the message is sent
     */
    final protected CompletableFuture<Void> sendMessage(final EventStreamJsonMessage message, final boolean close) {
        if (continuation.isClosed()) { //is this check necessary?
            return CompletableFuture.supplyAsync(() -> {
                throw new EventStreamClosedException(continuation.getNativeHandle());
            });
        }
        final List<Header> responseHeaders = new ArrayList<>();
        byte[] outputPayload = getOperationModelContext().getServiceModel().toJson(message);
        responseHeaders.add(Header.createHeader(EventStreamRPCServiceModel.CONTENT_TYPE_HEADER,
                EventStreamRPCServiceModel.CONTENT_TYPE_APPLICATION_JSON));
        responseHeaders.add(Header.createHeader(EventStreamRPCServiceModel.SERVICE_MODEL_TYPE_HEADER,
                message.getApplicationModelType()));

        return continuation.sendMessage(responseHeaders, outputPayload, MessageType.ApplicationMessage,
                close ? MessageFlags.TerminateStream.getByteValue() : 0).whenComplete((res, ex) -> {
            if (close) {
                continuation.close();
            }
        });
    }

    /**
     * Sends an error over the stream. Same method is used for errors from the initial response or any errors that occur
     * while the stream is open. It will always close the stream/continuation on the same message using the terminate
     * flag on the same message
     *
     * @param message The message to send
     * @return A future that completes when the error is sent
     */
    final protected CompletableFuture<Void> sendModeledError(final EventStreamJsonMessage message) {
        if (continuation.isClosed()) {  //is this check necessary?
            return CompletableFuture.supplyAsync(() -> {
                throw new EventStreamClosedException(continuation.getNativeHandle());
            });
        }
        final List<Header> responseHeaders = new ArrayList<>();
        byte[] outputPayload = getOperationModelContext().getServiceModel().toJson(message);
        responseHeaders.add(Header.createHeader(EventStreamRPCServiceModel.CONTENT_TYPE_HEADER,
                EventStreamRPCServiceModel.CONTENT_TYPE_APPLICATION_JSON));
        responseHeaders.add(Header.createHeader(EventStreamRPCServiceModel.SERVICE_MODEL_TYPE_HEADER,
                message.getApplicationModelType()));

        return continuation.sendMessage(responseHeaders, outputPayload, MessageType.ApplicationError,
                MessageFlags.TerminateStream.getByteValue()).whenComplete((res, ex) -> {
            //complete silence on any error closing here
            continuation.close();
        });
    }

    private void invokeAfterHandleRequest() {
        try {
            afterHandleRequest();
        } catch (Exception e) {
            LOGGER.warn("{}.{} afterHandleRequest() threw {}: {}",
                    getOperationModelContext().getServiceModel().getServiceName(), getOperationName(),
                    e.getClass().getCanonicalName(), e.getMessage());
        }
    }

    @Override
    final protected void onContinuationMessage(List<Header> list, byte[] bytes, MessageType messageType,
                                               int messageFlags) {
        LOGGER.debug("Continuation native id: " + continuation.getNativeHandle());

        //We can prevent a client from sending a request, and hanging up before receiving a response
        //but doing so will prevent any work from being done
        if (initialRequest == null && (messageFlags & MessageFlags.TerminateStream.getByteValue()) != 0) {
            LOGGER.debug("Not invoking " + getOperationName() + " operation for client request received with a "
                    + "terminate flag set to 1");
            return;
        }
        final EventStreamRPCServiceModel serviceModel = getOperationModelContext().getServiceModel();
        try {
            if (initialRequest != null) {
                // Empty close stream messages from the client are valid. Do not need any processing here.
                if ((messageFlags & MessageFlags.TerminateStream.getByteValue()) != 0 && (bytes == null
                        || bytes.length == 0)) {
                    return;
                } else {
                    final StreamingRequestType streamEvent = serviceModel.fromJson(getStreamingRequestClass(), bytes);
                    //exceptions occurring during this processing will result in closure of stream
                    handleStreamEvent(streamEvent);
                }
            } else {
                //this is the initial request
                initialRequestHeaders = new ArrayList<>(list);
                initialRequest = serviceModel.fromJson(getRequestClass(), bytes);
                //call into business logic
                CompletableFuture<ResponseType> resultFuture = handleRequestAsync(initialRequest);
                if (resultFuture == null) {
                    resultFuture = CompletableFuture.completedFuture(handleRequest(initialRequest));
                }
                resultFuture.handle((result, throwable) -> {
                    if (throwable != null) {
                        handleAndSendError(throwable);
                        return null;
                    }
                    if (result != null) {
                        if (!getResponseClass().isInstance(result)) {
                            throw new RuntimeException("Handler for operation [" + getOperationName()
                                    + "] did not return expected type. Found: " + result.getClass().getName());
                        }
                        sendMessage(result, !isStreamingOperation()).whenComplete((res, ex) -> {
                            if (ex != null) {
                                LOGGER.error(ex.getClass().getName() + " sending response message: " + ex.getMessage());
                            } else {
                                LOGGER.trace("Response successfully sent");
                            }
                        });
                        invokeAfterHandleRequest();
                    } else {
                        //not streaming, but null response? we have a problem
                        throw new RuntimeException("Operation handler returned null response!");
                    }
                    return null;
                }).exceptionally((throwable) -> {
                    if (throwable != null) {
                        handleAndSendError(throwable);
                    }
                    return null;
                });
            }
        } catch (Exception e) {
            handleAndSendError(e);
        }
    }

    private void handleAndSendError(Throwable throwable) {
        // Pull out the underlying error from the "handle" method of a CompletableFuture
        if (throwable instanceof CompletionException) {
            throwable = throwable.getCause();
        }
        if (throwable instanceof EventStreamOperationError) {
            //We do not check if the specific exception thrown is a part of the core service?
            sendModeledError((EventStreamOperationError) throwable);
            invokeAfterHandleRequest();
        } else {
            final List<Header> responseHeaders = new ArrayList<>(1);
            byte[] outputPayload = "InternalServerError".getBytes(StandardCharsets.UTF_8);
            responseHeaders.add(Header.createHeader(EventStreamRPCServiceModel.CONTENT_TYPE_HEADER,
                    EventStreamRPCServiceModel.CONTENT_TYPE_APPLICATION_TEXT));
            //are there any exceptions we wouldn't want to return a generic server fault?
            //this is the kind of exception that should be logged with a request ID especially in a server-client context
            LOGGER.error("[{}] operation threw unexpected {}: {}", getOperationName(),
                    throwable.getClass().getCanonicalName(), throwable.getMessage());

            continuation.sendMessage(responseHeaders, outputPayload, MessageType.ApplicationError,
                    MessageFlags.TerminateStream.getByteValue()).whenComplete((res, ex) -> {
                if (ex != null) {
                    LOGGER.error(ex.getClass().getName() + " sending error response message: " + ex.getMessage());
                } else {
                    LOGGER.trace("Error response successfully sent");
                }
                continuation.close();
            });
        }
    }
}