package software.amazon.awssdk.awscore.eventstream;

import java.io.ByteArrayInputStream;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.annotations.SdkProtectedApi;
import software.amazon.awssdk.core.SdkResponse;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.http.HttpResponseHandler;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.interceptor.SdkExecutionAttribute;
import software.amazon.awssdk.http.AbortableInputStream;
import software.amazon.awssdk.http.SdkCancellationException;
import software.amazon.awssdk.http.SdkHttpFullResponse;
import software.amazon.awssdk.utils.BinaryUtils;
import software.amazon.awssdk.utils.FunctionalUtils;
import software.amazon.eventstream.Message;
import software.amazon.eventstream.MessageDecoder;

@SdkProtectedApi
/* loaded from: input_file:BOOT-INF/lib/aws-core-2.13.35.jar:software/amazon/awssdk/awscore/eventstream/EventStreamAsyncResponseTransformer.class */
public final class EventStreamAsyncResponseTransformer<ResponseT, EventT> implements AsyncResponseTransformer<SdkResponse, Void> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) EventStreamAsyncResponseTransformer.class);
    private static final Object ON_COMPLETE_EVENT = new Object();
    private static final ExecutionAttributes EMPTY_EXECUTION_ATTRIBUTES = new ExecutionAttributes();
    private final EventStreamResponseHandler<ResponseT, EventT> eventStreamResponseHandler;
    private final HttpResponseHandler<? extends ResponseT> initialResponseHandler;
    private final HttpResponseHandler<? extends EventT> eventResponseHandler;
    private final HttpResponseHandler<? extends Throwable> exceptionResponseHandler;
    private final AtomicLong remainingDemand;
    private final AtomicReference<Subscriber<? super EventT>> subscriberRef;
    private final AtomicReference<Subscription> dataSubscription;
    private final MessageDecoder decoder;
    private volatile boolean isDone;
    private final Executor executor;
    private final Queue<Object> eventsToDeliver;
    private final AtomicBoolean isDelivering;
    private final AtomicBoolean isRequesting;
    private final CompletableFuture<Void> future;
    private final String serviceName;
    private String requestId;
    private volatile CompletableFuture<Void> transformFuture;
    private String extendedRequestId;

    /* loaded from: input_file:BOOT-INF/lib/aws-core-2.13.35.jar:software/amazon/awssdk/awscore/eventstream/EventStreamAsyncResponseTransformer$Builder.class */
    public static final class Builder<ResponseT, EventT> {
        private EventStreamResponseHandler<ResponseT, EventT> eventStreamResponseHandler;
        private HttpResponseHandler<? extends ResponseT> initialResponseHandler;
        private HttpResponseHandler<? extends EventT> eventResponseHandler;
        private HttpResponseHandler<? extends Throwable> exceptionResponseHandler;
        private Executor executor;
        private CompletableFuture<Void> future;
        private String serviceName;

        private Builder() {
        }

        public Builder<ResponseT, EventT> eventStreamResponseHandler(EventStreamResponseHandler<ResponseT, EventT> eventStreamResponseHandler) {
            this.eventStreamResponseHandler = eventStreamResponseHandler;
            return this;
        }

        public Builder<ResponseT, EventT> initialResponseHandler(HttpResponseHandler<? extends ResponseT> httpResponseHandler) {
            this.initialResponseHandler = httpResponseHandler;
            return this;
        }

        public Builder<ResponseT, EventT> eventResponseHandler(HttpResponseHandler<? extends EventT> httpResponseHandler) {
            this.eventResponseHandler = httpResponseHandler;
            return this;
        }

        public Builder<ResponseT, EventT> exceptionResponseHandler(HttpResponseHandler<? extends Throwable> httpResponseHandler) {
            this.exceptionResponseHandler = httpResponseHandler;
            return this;
        }

        public Builder<ResponseT, EventT> executor(Executor executor) {
            this.executor = executor;
            return this;
        }

        public Builder<ResponseT, EventT> future(CompletableFuture<Void> completableFuture) {
            this.future = completableFuture;
            return this;
        }

        public Builder<ResponseT, EventT> serviceName(String str) {
            this.serviceName = str;
            return this;
        }

        public EventStreamAsyncResponseTransformer<ResponseT, EventT> build() {
            return new EventStreamAsyncResponseTransformer<>(this.eventStreamResponseHandler, this.initialResponseHandler, this.eventResponseHandler, this.exceptionResponseHandler, this.executor, this.future, this.serviceName);
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/aws-core-2.13.35.jar:software/amazon/awssdk/awscore/eventstream/EventStreamAsyncResponseTransformer$ByteSubscriber.class */
    private class ByteSubscriber implements Subscriber<ByteBuffer> {
        private final CompletableFuture<Subscription> dataSubscriptionFuture;

        private ByteSubscriber(CompletableFuture<Subscription> completableFuture) {
            this.dataSubscriptionFuture = completableFuture;
        }

        @Override // org.reactivestreams.Subscriber
        public void onSubscribe(Subscription subscription) {
            EventStreamAsyncResponseTransformer.this.dataSubscription.set(subscription);
            this.dataSubscriptionFuture.complete(subscription);
        }

        @Override // org.reactivestreams.Subscriber
        public void onNext(ByteBuffer byteBuffer) {
            if (EventStreamAsyncResponseTransformer.this.isDone) {
                return;
            }
            synchronized (EventStreamAsyncResponseTransformer.this.eventsToDeliver) {
                EventStreamAsyncResponseTransformer.this.decoder.feed(BinaryUtils.copyBytesFrom(byteBuffer));
                if (!EventStreamAsyncResponseTransformer.this.eventsToDeliver.isEmpty()) {
                    EventStreamAsyncResponseTransformer.this.isRequesting.compareAndSet(true, false);
                    EventStreamAsyncResponseTransformer.this.drainEventsIfNotAlready();
                } else if (EventStreamAsyncResponseTransformer.this.remainingDemand.get() > 0) {
                    ((Subscription) EventStreamAsyncResponseTransformer.this.dataSubscription.get()).request(1L);
                }
            }
        }

        @Override // org.reactivestreams.Subscriber
        public void onError(Throwable th) {
        }

        @Override // org.reactivestreams.Subscriber
        public void onComplete() {
            EventStreamAsyncResponseTransformer.this.eventsToDeliver.add(EventStreamAsyncResponseTransformer.ON_COMPLETE_EVENT);
            EventStreamAsyncResponseTransformer.this.drainEventsIfNotAlready();
            EventStreamAsyncResponseTransformer.this.transformFuture.complete(null);
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/aws-core-2.13.35.jar:software/amazon/awssdk/awscore/eventstream/EventStreamAsyncResponseTransformer$EventPublisher.class */
    private class EventPublisher implements SdkPublisher<EventT> {
        private final Subscription dataSubscription;

        private EventPublisher(Subscription subscription) {
            this.dataSubscription = subscription;
        }

        @Override // org.reactivestreams.Publisher
        public void subscribe(Subscriber<? super EventT> subscriber) {
            if (EventStreamAsyncResponseTransformer.this.subscriberRef.compareAndSet(null, subscriber)) {
                subscriber.onSubscribe(new Subscription() { // from class: software.amazon.awssdk.awscore.eventstream.EventStreamAsyncResponseTransformer.EventPublisher.1
                    @Override // org.reactivestreams.Subscription
                    public void request(long j) {
                        if (EventStreamAsyncResponseTransformer.this.isDone) {
                            return;
                        }
                        synchronized (EventStreamAsyncResponseTransformer.this.eventsToDeliver) {
                            EventStreamAsyncResponseTransformer.this.remainingDemand.addAndGet(j);
                            if (EventStreamAsyncResponseTransformer.this.eventsToDeliver.isEmpty()) {
                                EventStreamAsyncResponseTransformer.this.requestDataIfNotAlready();
                            } else {
                                EventStreamAsyncResponseTransformer.this.drainEventsIfNotAlready();
                            }
                        }
                    }

                    @Override // org.reactivestreams.Subscription
                    public void cancel() {
                        EventPublisher.this.dataSubscription.cancel();
                        EventStreamAsyncResponseTransformer.this.future.complete(null);
                        EventStreamAsyncResponseTransformer.this.transformFuture.complete(null);
                    }
                });
            } else {
                EventStreamAsyncResponseTransformer.log.error("Event stream publishers can only be subscribed to once.");
                throw new IllegalStateException("This publisher may only be subscribed to once");
            }
        }
    }

    private EventStreamAsyncResponseTransformer(EventStreamResponseHandler<ResponseT, EventT> eventStreamResponseHandler, HttpResponseHandler<? extends ResponseT> httpResponseHandler, HttpResponseHandler<? extends EventT> httpResponseHandler2, HttpResponseHandler<? extends Throwable> httpResponseHandler3, Executor executor, CompletableFuture<Void> completableFuture, String str) {
        this.remainingDemand = new AtomicLong(0L);
        this.subscriberRef = new AtomicReference<>();
        this.dataSubscription = new AtomicReference<>();
        this.decoder = new MessageDecoder(this::handleMessage);
        this.isDone = false;
        this.eventsToDeliver = new LinkedList();
        this.isDelivering = new AtomicBoolean(false);
        this.isRequesting = new AtomicBoolean(false);
        this.requestId = null;
        this.extendedRequestId = null;
        this.eventStreamResponseHandler = eventStreamResponseHandler;
        this.initialResponseHandler = httpResponseHandler;
        this.eventResponseHandler = httpResponseHandler2;
        this.exceptionResponseHandler = httpResponseHandler3;
        this.executor = executor;
        this.future = completableFuture;
        this.serviceName = str;
    }

    @Override // software.amazon.awssdk.core.async.AsyncResponseTransformer
    public CompletableFuture<Void> prepare() {
        this.transformFuture = new CompletableFuture<>();
        this.subscriberRef.set(null);
        this.isDone = false;
        return this.transformFuture;
    }

    @Override // software.amazon.awssdk.core.async.AsyncResponseTransformer
    public void onResponse(SdkResponse sdkResponse) {
        if (sdkResponse == null || sdkResponse.sdkHttpResponse() == null) {
            return;
        }
        this.requestId = sdkResponse.sdkHttpResponse().firstMatchingHeader(HttpResponseHandler.X_AMZN_REQUEST_ID_HEADER).orElse(null);
        this.extendedRequestId = sdkResponse.sdkHttpResponse().firstMatchingHeader(HttpResponseHandler.X_AMZ_ID_2_HEADER).orElse(null);
    }

    @Override // software.amazon.awssdk.core.async.AsyncResponseTransformer
    public void onStream(SdkPublisher<ByteBuffer> sdkPublisher) {
        CompletableFuture completableFuture = new CompletableFuture();
        sdkPublisher.subscribe(new ByteSubscriber(completableFuture));
        completableFuture.thenAccept(subscription -> {
            try {
                this.eventStreamResponseHandler.onEventStream(new EventPublisher(subscription));
            } catch (Throwable th) {
                exceptionOccurred(th);
                subscription.cancel();
            }
        });
    }

    @Override // software.amazon.awssdk.core.async.AsyncResponseTransformer
    public void exceptionOccurred(Throwable th) {
        synchronized (this) {
            if (!this.isDone) {
                this.isDone = true;
                if (this.subscriberRef.get() != null && shouldSurfaceErrorToEventSubscriber(th)) {
                    FunctionalUtils.runAndLogError(log, "Error thrown from Subscriber#onError, ignoring.", () -> {
                        this.subscriberRef.get().onError(th);
                    });
                }
                this.eventStreamResponseHandler.exceptionOccurred(th);
                this.transformFuture.completeExceptionally(th);
            }
        }
    }

    private void onEventComplete() {
        synchronized (this) {
            if (this.isDone) {
                return;
            }
            this.isDone = true;
            FunctionalUtils.runAndLogError(log, "Error thrown from Subscriber#onComplete, ignoring.", () -> {
                this.subscriberRef.get().onComplete();
            });
            this.eventStreamResponseHandler.complete();
            this.future.complete(null);
        }
    }

    private void handleMessage(Message message) {
        try {
            if (isEvent(message)) {
                if (message.getHeaders().get(":event-type").getString().equals("initial-response")) {
                    this.eventStreamResponseHandler.responseReceived(this.initialResponseHandler.handle(adaptMessageToResponse(message, false), EMPTY_EXECUTION_ATTRIBUTES));
                } else {
                    this.eventsToDeliver.add(this.eventResponseHandler.handle(adaptMessageToResponse(message, false), EMPTY_EXECUTION_ATTRIBUTES));
                }
            } else if (isError(message) || isException(message)) {
                Throwable handle = this.exceptionResponseHandler.handle(adaptMessageToResponse(message, true), new ExecutionAttributes().putAttribute(SdkExecutionAttribute.SERVICE_NAME, this.serviceName));
                FunctionalUtils.runAndLogError(log, "Error thrown from exceptionOccurred, ignoring.", () -> {
                    exceptionOccurred(handle);
                });
            }
        } catch (Exception e) {
            throw SdkClientException.builder().cause((Throwable) e).mo2650build();
        }
    }

    private boolean isEvent(Message message) {
        return "event".equals(message.getHeaders().get(":message-type").getString());
    }

    private boolean isError(Message message) {
        return "error".equals(message.getHeaders().get(":message-type").getString());
    }

    private boolean isException(Message message) {
        return "exception".equals(message.getHeaders().get(":message-type").getString());
    }

    private SdkHttpFullResponse adaptMessageToResponse(Message message, boolean z) {
        Map<String, List<String>> map = (Map) message.getHeaders().entrySet().stream().collect(HashMap::new, (hashMap, entry) -> {
        }, (v0, v1) -> {
            v0.putAll(v1);
        });
        if (this.requestId != null) {
            map.put(HttpResponseHandler.X_AMZN_REQUEST_ID_HEADER, Collections.singletonList(this.requestId));
        }
        if (this.extendedRequestId != null) {
            map.put(HttpResponseHandler.X_AMZ_ID_2_HEADER, Collections.singletonList(this.extendedRequestId));
        }
        SdkHttpFullResponse.Builder headers = SdkHttpFullResponse.builder().content(AbortableInputStream.create(new ByteArrayInputStream(message.getPayload()))).headers(map);
        if (!z) {
            headers.statusCode(200);
        }
        return headers.mo2650build();
    }

    private static boolean shouldSurfaceErrorToEventSubscriber(Throwable th) {
        return !(th instanceof SdkCancellationException);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void requestDataIfNotAlready() {
        if (this.isRequesting.compareAndSet(false, true)) {
            this.dataSubscription.get().request(1L);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void drainEventsIfNotAlready() {
        if (this.isDelivering.compareAndSet(false, true)) {
            drainEvents();
        }
    }

    private void drainEvents() {
        if (!this.isDone && isCompletedOrDeliverEvent()) {
            onEventComplete();
        }
    }

    private boolean isCompletedOrDeliverEvent() {
        synchronized (this.eventsToDeliver) {
            if (this.eventsToDeliver.peek() == ON_COMPLETE_EVENT) {
                return true;
            }
            if (this.eventsToDeliver.isEmpty() || this.remainingDemand.get() == 0) {
                this.isDelivering.compareAndSet(true, false);
                if (this.remainingDemand.get() > 0) {
                    requestDataIfNotAlready();
                }
            } else {
                Object remove = this.eventsToDeliver.remove();
                this.remainingDemand.decrementAndGet();
                CompletableFuture.runAsync(() -> {
                    deliverEvent(remove);
                }, this.executor).thenRunAsync(this::drainEvents, this.executor).whenComplete((r4, th) -> {
                    if (th != null) {
                        log.error("Error occurred when delivering an event", th);
                        throw SdkClientException.create("fail to deliver events", th);
                    }
                });
            }
            return false;
        }
    }

    private void deliverEvent(Object obj) {
        this.subscriberRef.get().onNext(obj);
    }

    public static <ResponseT, EventT> Builder<ResponseT, EventT> builder() {
        return new Builder<>();
    }
}
