package org.apache.flink.streaming.connectors.kinesis.internals;

import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.HashKeyRange;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.SequenceNumberRange;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.Shard;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.metrics.KinesisConsumerMetricConstants;
import org.apache.flink.streaming.connectors.kinesis.metrics.ShardMetricsReporter;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.class */
public class KinesisDataFetcher<T> {
    public static final KinesisShardAssigner DEFAULT_SHARD_ASSIGNER;
    private static final Logger LOG;
    private final Properties configProps;
    private final List<String> streams;
    private final KinesisDeserializationSchema<T> deserializationSchema;
    private final KinesisShardAssigner shardAssigner;
    private final MetricGroup consumerMetricGroup;
    private final RuntimeContext runtimeContext;
    private final int totalNumberOfConsumerSubtasks;
    private final int indexOfThisConsumerSubtask;
    private final ExecutorService shardConsumersExecutor;
    private final Map<String, String> subscribedStreamsToLastDiscoveredShardIds;
    private final List<KinesisStreamShardState> subscribedShardsState;
    private final SourceFunction.SourceContext<T> sourceContext;
    private final Object checkpointLock;
    private final AtomicReference<Throwable> error;
    private final KinesisProxyInterface kinesis;
    private volatile Thread mainThread;
    private final AtomicInteger numberOfActiveShards;
    private volatile boolean running;
    static final /* synthetic */ boolean $assertionsDisabled;

    public KinesisDataFetcher(List<String> list, SourceFunction.SourceContext<T> sourceContext, RuntimeContext runtimeContext, Properties properties, KinesisDeserializationSchema<T> kinesisDeserializationSchema, KinesisShardAssigner kinesisShardAssigner) {
        this(list, sourceContext, sourceContext.getCheckpointLock(), runtimeContext, properties, kinesisDeserializationSchema, kinesisShardAssigner, new AtomicReference(), new ArrayList(), createInitialSubscribedStreamsToLastDiscoveredShardsState(list), KinesisProxy.create(properties));
    }

    @VisibleForTesting
    protected KinesisDataFetcher(List<String> list, SourceFunction.SourceContext<T> sourceContext, Object obj, RuntimeContext runtimeContext, Properties properties, KinesisDeserializationSchema<T> kinesisDeserializationSchema, KinesisShardAssigner kinesisShardAssigner, AtomicReference<Throwable> atomicReference, List<KinesisStreamShardState> list2, HashMap<String, String> hashMap, KinesisProxyInterface kinesisProxyInterface) {
        this.numberOfActiveShards = new AtomicInteger(0);
        this.running = true;
        this.streams = (List) Preconditions.checkNotNull(list);
        this.configProps = (Properties) Preconditions.checkNotNull(properties);
        this.sourceContext = (SourceFunction.SourceContext) Preconditions.checkNotNull(sourceContext);
        this.checkpointLock = Preconditions.checkNotNull(obj);
        this.runtimeContext = (RuntimeContext) Preconditions.checkNotNull(runtimeContext);
        this.totalNumberOfConsumerSubtasks = runtimeContext.getNumberOfParallelSubtasks();
        this.indexOfThisConsumerSubtask = runtimeContext.getIndexOfThisSubtask();
        this.deserializationSchema = (KinesisDeserializationSchema) Preconditions.checkNotNull(kinesisDeserializationSchema);
        this.shardAssigner = (KinesisShardAssigner) Preconditions.checkNotNull(kinesisShardAssigner);
        this.kinesis = (KinesisProxyInterface) Preconditions.checkNotNull(kinesisProxyInterface);
        this.consumerMetricGroup = runtimeContext.getMetricGroup().addGroup(KinesisConsumerMetricConstants.KINESIS_CONSUMER_METRICS_GROUP);
        this.error = (AtomicReference) Preconditions.checkNotNull(atomicReference);
        this.subscribedShardsState = (List) Preconditions.checkNotNull(list2);
        this.subscribedStreamsToLastDiscoveredShardIds = (Map) Preconditions.checkNotNull(hashMap);
        this.shardConsumersExecutor = createShardConsumersThreadPool(runtimeContext.getTaskNameWithSubtasks());
    }

    public void runFetcher() throws Exception {
        if (this.running) {
            this.mainThread = Thread.currentThread();
            boolean z = false;
            StringBuilder sb = new StringBuilder();
            for (Map.Entry<String, String> entry : this.subscribedStreamsToLastDiscoveredShardIds.entrySet()) {
                if (entry.getValue() != null) {
                    z = true;
                } else {
                    sb.append(entry.getKey()).append(", ");
                }
            }
            if (sb.length() != 0 && LOG.isWarnEnabled()) {
                LOG.warn("Subtask {} has failed to find any shards for the following subscribed streams: {}", Integer.valueOf(this.indexOfThisConsumerSubtask), sb.toString());
            }
            if (!z) {
                throw new RuntimeException("No shards can be found for all subscribed streams: " + this.streams);
            }
            for (int i = 0; i < this.subscribedShardsState.size(); i++) {
                KinesisStreamShardState kinesisStreamShardState = this.subscribedShardsState.get(i);
                if (!kinesisStreamShardState.getLastProcessedSequenceNum().equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
                    if (LOG.isInfoEnabled()) {
                        LOG.info("Subtask {} will start consuming seeded shard {} from sequence number {} with ShardConsumer {}", new Object[]{Integer.valueOf(this.indexOfThisConsumerSubtask), kinesisStreamShardState.getStreamShardHandle().toString(), kinesisStreamShardState.getLastProcessedSequenceNum(), Integer.valueOf(i)});
                    }
                    this.shardConsumersExecutor.submit(new ShardConsumer(this, Integer.valueOf(i), this.subscribedShardsState.get(i).getStreamShardHandle(), this.subscribedShardsState.get(i).getLastProcessedSequenceNum(), registerShardMetrics(this.consumerMetricGroup, this.subscribedShardsState.get(i))));
                }
            }
            long longValue = Long.valueOf(this.configProps.getProperty(ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, Long.toString(10000L))).longValue();
            if (this.numberOfActiveShards.get() == 0) {
                LOG.info("Subtask {} has no active shards to read on startup; marking the subtask as temporarily idle ...", Integer.valueOf(this.indexOfThisConsumerSubtask));
                this.sourceContext.markAsTemporarilyIdle();
            }
            while (this.running) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Subtask {} is trying to discover new shards that were created due to resharding ...", Integer.valueOf(this.indexOfThisConsumerSubtask));
                }
                for (StreamShardHandle streamShardHandle : discoverNewShardsToSubscribe()) {
                    KinesisStreamShardState kinesisStreamShardState2 = new KinesisStreamShardState(convertToStreamShardMetadata(streamShardHandle), streamShardHandle, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
                    int registerNewSubscribedShardState = registerNewSubscribedShardState(kinesisStreamShardState2);
                    if (LOG.isInfoEnabled()) {
                        LOG.info("Subtask {} has discovered a new shard {} due to resharding, and will start consuming the shard from sequence number {} with ShardConsumer {}", new Object[]{Integer.valueOf(this.indexOfThisConsumerSubtask), kinesisStreamShardState2.getStreamShardHandle().toString(), kinesisStreamShardState2.getLastProcessedSequenceNum(), Integer.valueOf(registerNewSubscribedShardState)});
                    }
                    this.shardConsumersExecutor.submit(new ShardConsumer(this, Integer.valueOf(registerNewSubscribedShardState), kinesisStreamShardState2.getStreamShardHandle(), kinesisStreamShardState2.getLastProcessedSequenceNum(), registerShardMetrics(this.consumerMetricGroup, kinesisStreamShardState2)));
                }
                if (this.running && longValue != 0) {
                    try {
                        Thread.sleep(longValue);
                    } catch (InterruptedException e) {
                    }
                }
            }
            awaitTermination();
            Throwable th = this.error.get();
            if (th != null) {
                if (th instanceof Exception) {
                    throw ((Exception) th);
                }
                if (!(th instanceof Error)) {
                    throw new Exception(th);
                }
                throw ((Error) th);
            }
        }
    }

    public HashMap<StreamShardMetadata, SequenceNumber> snapshotState() {
        if (!$assertionsDisabled && !Thread.holdsLock(this.checkpointLock)) {
            throw new AssertionError();
        }
        HashMap<StreamShardMetadata, SequenceNumber> hashMap = new HashMap<>();
        for (KinesisStreamShardState kinesisStreamShardState : this.subscribedShardsState) {
            hashMap.put(kinesisStreamShardState.getStreamShardMetadata(), kinesisStreamShardState.getLastProcessedSequenceNum());
        }
        return hashMap;
    }

    public void shutdownFetcher() {
        this.running = false;
        if (this.mainThread != null) {
            this.mainThread.interrupt();
        }
        if (LOG.isInfoEnabled()) {
            LOG.info("Shutting down the shard consumer threads of subtask {} ...", Integer.valueOf(this.indexOfThisConsumerSubtask));
        }
        this.shardConsumersExecutor.shutdownNow();
    }

    public void awaitTermination() throws InterruptedException {
        while (!this.shardConsumersExecutor.isTerminated()) {
            Thread.sleep(50L);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void stopWithError(Throwable th) {
        if (this.error.compareAndSet(null, th)) {
            shutdownFetcher();
        }
    }

    public void advanceLastDiscoveredShardOfStream(String str, String str2) {
        String str3 = this.subscribedStreamsToLastDiscoveredShardIds.get(str);
        if (str3 == null) {
            this.subscribedStreamsToLastDiscoveredShardIds.put(str, str2);
        } else if (StreamShardHandle.compareShardIds(str2, str3) > 0) {
            this.subscribedStreamsToLastDiscoveredShardIds.put(str, str2);
        }
    }

    public List<StreamShardHandle> discoverNewShardsToSubscribe() throws InterruptedException {
        LinkedList linkedList = new LinkedList();
        GetShardListResult shardList = this.kinesis.getShardList(this.subscribedStreamsToLastDiscoveredShardIds);
        if (shardList.hasRetrievedShards()) {
            for (String str : shardList.getStreamsWithRetrievedShards()) {
                for (StreamShardHandle streamShardHandle : shardList.getRetrievedShardListOfStream(str)) {
                    if (isThisSubtaskShouldSubscribeTo(this.shardAssigner.assign(streamShardHandle, this.totalNumberOfConsumerSubtasks), this.totalNumberOfConsumerSubtasks, this.indexOfThisConsumerSubtask)) {
                        linkedList.add(streamShardHandle);
                    }
                }
                advanceLastDiscoveredShardOfStream(str, shardList.getLastSeenShardOfStream(str).getShard().getShardId());
            }
        }
        return linkedList;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Properties getConsumerConfiguration() {
        return this.configProps;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public KinesisDeserializationSchema<T> getClonedDeserializationSchema() {
        try {
            return (KinesisDeserializationSchema) InstantiationUtil.clone(this.deserializationSchema, this.runtimeContext.getUserCodeClassLoader());
        } catch (IOException | ClassNotFoundException e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void emitRecordAndUpdateState(T t, long j, int i, SequenceNumber sequenceNumber) {
        synchronized (this.checkpointLock) {
            if (t != null) {
                this.sourceContext.collectWithTimestamp(t, j);
            } else {
                LOG.warn("Skipping non-deserializable record at sequence number {} of shard {}.", sequenceNumber, this.subscribedShardsState.get(i).getStreamShardHandle());
            }
            updateState(i, sequenceNumber);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void updateState(int i, SequenceNumber sequenceNumber) {
        synchronized (this.checkpointLock) {
            this.subscribedShardsState.get(i).setLastProcessedSequenceNum(sequenceNumber);
            if (sequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
                LOG.info("Subtask {} has reached the end of subscribed shard: {}", Integer.valueOf(this.indexOfThisConsumerSubtask), this.subscribedShardsState.get(i).getStreamShardHandle());
                if (this.numberOfActiveShards.decrementAndGet() == 0) {
                    LOG.info("Subtask {} has reached the end of all currently subscribed shards; marking the subtask as temporarily idle ...", Integer.valueOf(this.indexOfThisConsumerSubtask));
                    this.sourceContext.markAsTemporarilyIdle();
                }
            }
        }
    }

    public int registerNewSubscribedShardState(KinesisStreamShardState kinesisStreamShardState) {
        int size;
        synchronized (this.checkpointLock) {
            this.subscribedShardsState.add(kinesisStreamShardState);
            if (!kinesisStreamShardState.getLastProcessedSequenceNum().equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
                this.numberOfActiveShards.incrementAndGet();
            }
            size = this.subscribedShardsState.size() - 1;
        }
        return size;
    }

    private static ShardMetricsReporter registerShardMetrics(MetricGroup metricGroup, KinesisStreamShardState kinesisStreamShardState) {
        ShardMetricsReporter shardMetricsReporter = new ShardMetricsReporter();
        MetricGroup addGroup = metricGroup.addGroup(KinesisConsumerMetricConstants.STREAM_METRICS_GROUP, kinesisStreamShardState.getStreamShardHandle().getStreamName()).addGroup(KinesisConsumerMetricConstants.SHARD_METRICS_GROUP, kinesisStreamShardState.getStreamShardHandle().getShard().getShardId());
        shardMetricsReporter.getClass();
        addGroup.gauge(KinesisConsumerMetricConstants.MILLIS_BEHIND_LATEST_GAUGE, shardMetricsReporter::getMillisBehindLatest);
        return shardMetricsReporter;
    }

    public static boolean isThisSubtaskShouldSubscribeTo(int i, int i2, int i3) {
        return Math.abs(i % i2) == i3;
    }

    @VisibleForTesting
    protected ExecutorService createShardConsumersThreadPool(final String str) {
        return Executors.newCachedThreadPool(new ThreadFactory() { // from class: org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.1
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                AtomicLong atomicLong = new AtomicLong(0L);
                Thread thread = new Thread(runnable);
                thread.setName("shardConsumers-" + str + "-thread-" + atomicLong.getAndIncrement());
                thread.setDaemon(true);
                return thread;
            }
        });
    }

    @VisibleForTesting
    public List<KinesisStreamShardState> getSubscribedShardsState() {
        return this.subscribedShardsState;
    }

    protected static HashMap<String, String> createInitialSubscribedStreamsToLastDiscoveredShardsState(List<String> list) {
        HashMap<String, String> hashMap = new HashMap<>();
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            hashMap.put(it.next(), null);
        }
        return hashMap;
    }

    public static StreamShardMetadata convertToStreamShardMetadata(StreamShardHandle streamShardHandle) {
        StreamShardMetadata streamShardMetadata = new StreamShardMetadata();
        streamShardMetadata.setStreamName(streamShardHandle.getStreamName());
        streamShardMetadata.setShardId(streamShardHandle.getShard().getShardId());
        streamShardMetadata.setParentShardId(streamShardHandle.getShard().getParentShardId());
        streamShardMetadata.setAdjacentParentShardId(streamShardHandle.getShard().getAdjacentParentShardId());
        if (streamShardHandle.getShard().getHashKeyRange() != null) {
            streamShardMetadata.setStartingHashKey(streamShardHandle.getShard().getHashKeyRange().getStartingHashKey());
            streamShardMetadata.setEndingHashKey(streamShardHandle.getShard().getHashKeyRange().getEndingHashKey());
        }
        if (streamShardHandle.getShard().getSequenceNumberRange() != null) {
            streamShardMetadata.setStartingSequenceNumber(streamShardHandle.getShard().getSequenceNumberRange().getStartingSequenceNumber());
            streamShardMetadata.setEndingSequenceNumber(streamShardHandle.getShard().getSequenceNumberRange().getEndingSequenceNumber());
        }
        return streamShardMetadata;
    }

    public static StreamShardHandle convertToStreamShardHandle(StreamShardMetadata streamShardMetadata) {
        Shard shard = new Shard();
        shard.withShardId(streamShardMetadata.getShardId());
        shard.withParentShardId(streamShardMetadata.getParentShardId());
        shard.withAdjacentParentShardId(streamShardMetadata.getAdjacentParentShardId());
        HashKeyRange hashKeyRange = new HashKeyRange();
        hashKeyRange.withStartingHashKey(streamShardMetadata.getStartingHashKey());
        hashKeyRange.withEndingHashKey(streamShardMetadata.getEndingHashKey());
        shard.withHashKeyRange(hashKeyRange);
        SequenceNumberRange sequenceNumberRange = new SequenceNumberRange();
        sequenceNumberRange.withStartingSequenceNumber(streamShardMetadata.getStartingSequenceNumber());
        sequenceNumberRange.withEndingSequenceNumber(streamShardMetadata.getEndingSequenceNumber());
        shard.withSequenceNumberRange(sequenceNumberRange);
        return new StreamShardHandle(streamShardMetadata.getStreamName(), shard);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -2019220112:
                if (implMethodName.equals("lambda$static$d4637477$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/streaming/connectors/kinesis/KinesisShardAssigner") && serializedLambda.getFunctionalInterfaceMethodName().equals("assign") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/flink/streaming/connectors/kinesis/model/StreamShardHandle;I)I") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/streaming/connectors/kinesis/model/StreamShardHandle;I)I")) {
                    return (streamShardHandle, i) -> {
                        return streamShardHandle.hashCode();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    static {
        $assertionsDisabled = !KinesisDataFetcher.class.desiredAssertionStatus();
        DEFAULT_SHARD_ASSIGNER = (streamShardHandle, i) -> {
            return streamShardHandle.hashCode();
        };
        LOG = LoggerFactory.getLogger(KinesisDataFetcher.class);
    }
}
