package com.amazonaws.services.kinesis.clientlibrary.lib.worker;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.amazonaws.services.kinesis.model.Shard;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer.class */
class ShardSyncer {
    private static final Log LOG = LogFactory.getLog(ShardSyncer.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncer$StartingSequenceNumberAndShardIdBasedComparator.class */
    public static class StartingSequenceNumberAndShardIdBasedComparator implements Comparator<KinesisClientLease>, Serializable {
        private static final long serialVersionUID = 1;
        private final Map<String, Shard> shardIdToShardMap;

        public StartingSequenceNumberAndShardIdBasedComparator(Map<String, Shard> map) {
            this.shardIdToShardMap = map;
        }

        @Override // java.util.Comparator
        public int compare(KinesisClientLease kinesisClientLease, KinesisClientLease kinesisClientLease2) {
            int i = 0;
            String leaseKey = kinesisClientLease.getLeaseKey();
            String leaseKey2 = kinesisClientLease2.getLeaseKey();
            Shard shard = this.shardIdToShardMap.get(leaseKey);
            Shard shard2 = this.shardIdToShardMap.get(leaseKey2);
            if (shard != null && shard2 != null) {
                i = new BigInteger(shard.getSequenceNumberRange().getStartingSequenceNumber()).compareTo(new BigInteger(shard2.getSequenceNumberRange().getStartingSequenceNumber()));
            }
            if (i == 0) {
                i = leaseKey.compareTo(leaseKey2);
            }
            return i;
        }
    }

    private ShardSyncer() {
    }

    static synchronized void bootstrapShardLeases(IKinesisProxy iKinesisProxy, ILeaseManager<KinesisClientLease> iLeaseManager, InitialPositionInStreamExtended initialPositionInStreamExtended, boolean z, boolean z2) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
        syncShardLeases(iKinesisProxy, iLeaseManager, initialPositionInStreamExtended, z, z2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy iKinesisProxy, ILeaseManager<KinesisClientLease> iLeaseManager, InitialPositionInStreamExtended initialPositionInStreamExtended, boolean z, boolean z2) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
        syncShardLeases(iKinesisProxy, iLeaseManager, initialPositionInStreamExtended, z, z2);
    }

    static synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy iKinesisProxy, ILeaseManager<KinesisClientLease> iLeaseManager, InitialPositionInStreamExtended initialPositionInStreamExtended, boolean z) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
        checkAndCreateLeasesForNewShards(iKinesisProxy, iLeaseManager, initialPositionInStreamExtended, z, false);
    }

    private static synchronized void syncShardLeases(IKinesisProxy iKinesisProxy, ILeaseManager<KinesisClientLease> iLeaseManager, InitialPositionInStreamExtended initialPositionInStreamExtended, boolean z, boolean z2) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
        List<Shard> shardList = getShardList(iKinesisProxy);
        LOG.debug("Num shards: " + shardList.size());
        Map<String, Shard> constructShardIdToShardMap = constructShardIdToShardMap(shardList);
        Map<String, Set<String>> constructShardIdToChildShardIdsMap = constructShardIdToChildShardIdsMap(constructShardIdToShardMap);
        Set<String> findInconsistentShardIds = findInconsistentShardIds(constructShardIdToChildShardIdsMap, constructShardIdToShardMap);
        if (!z2) {
            assertAllParentShardsAreClosed(findInconsistentShardIds);
        }
        List<KinesisClientLease> listLeases = iLeaseManager.listLeases();
        List<KinesisClientLease> determineNewLeasesToCreate = determineNewLeasesToCreate(shardList, listLeases, initialPositionInStreamExtended, findInconsistentShardIds);
        LOG.debug("Num new leases to create: " + determineNewLeasesToCreate.size());
        for (KinesisClientLease kinesisClientLease : determineNewLeasesToCreate) {
            long currentTimeMillis = System.currentTimeMillis();
            boolean z3 = false;
            try {
                iLeaseManager.createLeaseIfNotExists(kinesisClientLease);
                z3 = true;
                MetricsHelper.addSuccessAndLatency("CreateLease", currentTimeMillis, true, MetricsLevel.DETAILED);
            } catch (Throwable th) {
                MetricsHelper.addSuccessAndLatency("CreateLease", currentTimeMillis, z3, MetricsLevel.DETAILED);
                throw th;
            }
        }
        ArrayList arrayList = new ArrayList();
        if (listLeases != null) {
            arrayList.addAll(listLeases);
        }
        arrayList.addAll(determineNewLeasesToCreate);
        cleanupGarbageLeases(shardList, arrayList, iKinesisProxy, iLeaseManager);
        if (z) {
            cleanupLeasesOfFinishedShards(listLeases, constructShardIdToShardMap, constructShardIdToChildShardIdsMap, arrayList, iLeaseManager);
        }
    }

    private static void assertAllParentShardsAreClosed(Set<String> set) throws KinesisClientLibIOException {
        if (!set.isEmpty()) {
            throw new KinesisClientLibIOException(String.format("%d open child shards (%s) are inconsistent. This can happen due to a race condition between describeStream and a reshard operation.", Integer.valueOf(set.size()), StringUtils.join(set, ' ')));
        }
    }

    private static Set<String> findInconsistentShardIds(Map<String, Set<String>> map, Map<String, Shard> map2) {
        HashSet hashSet = new HashSet();
        for (String str : map.keySet()) {
            Shard shard = map2.get(str);
            if (str == null || shard.getSequenceNumberRange().getEndingSequenceNumber() == null) {
                hashSet.addAll(map.get(str));
            }
        }
        return hashSet;
    }

    static Map<String, KinesisClientLease> constructShardIdToKCLLeaseMap(List<KinesisClientLease> list) {
        HashMap hashMap = new HashMap();
        for (KinesisClientLease kinesisClientLease : list) {
            hashMap.put(kinesisClientLease.getLeaseKey(), kinesisClientLease);
        }
        return hashMap;
    }

    static synchronized void assertClosedShardsAreCoveredOrAbsent(Map<String, Shard> map, Map<String, Set<String>> map2, Set<String> set) throws KinesisClientLibIOException {
        for (String str : set) {
            Shard shard = map.get(str);
            if (shard == null) {
                LOG.info("Shard " + str + " is not present in Kinesis anymore.");
            } else {
                if (shard.getSequenceNumberRange().getEndingSequenceNumber() == null) {
                    throw new KinesisClientLibIOException("Shard " + set + " is not closed. This can happen if we constructed the list of shards  while a reshard operation was in progress.");
                }
                Set<String> set2 = map2.get(str);
                if (set2 == null) {
                    throw new KinesisClientLibIOException("Incomplete shard list: Closed shard " + str + " has no children.This can happen if we constructed the list of shards  while a reshard operation was in progress.");
                }
                assertHashRangeOfClosedShardIsCovered(shard, map, set2);
            }
        }
    }

    private static synchronized void assertHashRangeOfClosedShardIsCovered(Shard shard, Map<String, Shard> map, Set<String> set) throws KinesisClientLibIOException {
        BigInteger bigInteger = new BigInteger(shard.getHashKeyRange().getStartingHashKey());
        BigInteger bigInteger2 = new BigInteger(shard.getHashKeyRange().getEndingHashKey());
        BigInteger bigInteger3 = null;
        BigInteger bigInteger4 = null;
        Iterator<String> it = set.iterator();
        while (it.hasNext()) {
            Shard shard2 = map.get(it.next());
            BigInteger bigInteger5 = new BigInteger(shard2.getHashKeyRange().getStartingHashKey());
            if (bigInteger3 == null || bigInteger5.compareTo(bigInteger3) < 0) {
                bigInteger3 = bigInteger5;
            }
            BigInteger bigInteger6 = new BigInteger(shard2.getHashKeyRange().getEndingHashKey());
            if (bigInteger4 == null || bigInteger6.compareTo(bigInteger4) > 0) {
                bigInteger4 = bigInteger6;
            }
        }
        if (bigInteger3 == null || bigInteger4 == null || bigInteger3.compareTo(bigInteger) > 0 || bigInteger4.compareTo(bigInteger2) < 0) {
            throw new KinesisClientLibIOException("Incomplete shard list: hash key range of shard " + shard.getShardId() + " is not covered by its child shards.");
        }
    }

    static Map<String, Set<String>> constructShardIdToChildShardIdsMap(Map<String, Shard> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, Shard> entry : map.entrySet()) {
            String key = entry.getKey();
            Shard value = entry.getValue();
            String parentShardId = value.getParentShardId();
            if (parentShardId != null && map.containsKey(parentShardId)) {
                Set set = (Set) hashMap.get(parentShardId);
                if (set == null) {
                    set = new HashSet();
                    hashMap.put(parentShardId, set);
                }
                set.add(key);
            }
            String adjacentParentShardId = value.getAdjacentParentShardId();
            if (adjacentParentShardId != null && map.containsKey(adjacentParentShardId)) {
                Set set2 = (Set) hashMap.get(adjacentParentShardId);
                if (set2 == null) {
                    set2 = new HashSet();
                    hashMap.put(adjacentParentShardId, set2);
                }
                set2.add(key);
            }
        }
        return hashMap;
    }

    private static List<Shard> getShardList(IKinesisProxy iKinesisProxy) throws KinesisClientLibIOException {
        List<Shard> shardList = iKinesisProxy.getShardList();
        if (shardList == null) {
            throw new KinesisClientLibIOException("Stream is not in ACTIVE OR UPDATING state - will retry getting the shard list.");
        }
        return shardList;
    }

    static List<KinesisClientLease> determineNewLeasesToCreate(List<Shard> list, List<KinesisClientLease> list2, InitialPositionInStreamExtended initialPositionInStreamExtended, Set<String> set) {
        HashMap hashMap = new HashMap();
        Map<String, Shard> constructShardIdToShardMap = constructShardIdToShardMap(list);
        HashSet hashSet = new HashSet();
        for (KinesisClientLease kinesisClientLease : list2) {
            hashSet.add(kinesisClientLease.getLeaseKey());
            LOG.debug("Existing lease: " + kinesisClientLease);
        }
        List<Shard> openShards = getOpenShards(list);
        HashMap hashMap2 = new HashMap();
        for (Shard shard : openShards) {
            String shardId = shard.getShardId();
            LOG.debug("Evaluating leases for open shard " + shardId + " and its ancestors.");
            if (hashSet.contains(shardId)) {
                LOG.debug("Lease for shardId " + shardId + " already exists. Not creating a lease");
            } else if (set.contains(shardId)) {
                LOG.info("shardId " + shardId + " is an inconsistent child.  Not creating a lease");
            } else {
                LOG.debug("Need to create a lease for shardId " + shardId);
                KinesisClientLease newKCLLease = newKCLLease(shard);
                if (!checkIfDescendantAndAddNewLeasesForAncestors(shardId, initialPositionInStreamExtended, hashSet, constructShardIdToShardMap, hashMap, hashMap2) || initialPositionInStreamExtended.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) {
                    newKCLLease.setCheckpoint(convertToCheckpoint(initialPositionInStreamExtended));
                } else {
                    newKCLLease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
                }
                LOG.debug("Set checkpoint of " + newKCLLease.getLeaseKey() + " to " + newKCLLease.getCheckpoint());
                hashMap.put(shardId, newKCLLease);
            }
        }
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(hashMap.values());
        Collections.sort(arrayList, new StartingSequenceNumberAndShardIdBasedComparator(constructShardIdToShardMap));
        return arrayList;
    }

    static List<KinesisClientLease> determineNewLeasesToCreate(List<Shard> list, List<KinesisClientLease> list2, InitialPositionInStreamExtended initialPositionInStreamExtended) {
        return determineNewLeasesToCreate(list, list2, initialPositionInStreamExtended, new HashSet());
    }

    static boolean checkIfDescendantAndAddNewLeasesForAncestors(String str, InitialPositionInStreamExtended initialPositionInStreamExtended, Set<String> set, Map<String, Shard> map, Map<String, KinesisClientLease> map2, Map<String, Boolean> map3) {
        Boolean bool = map3.get(str);
        if (bool != null) {
            return bool.booleanValue();
        }
        boolean z = false;
        HashSet hashSet = new HashSet();
        if (str != null && map.containsKey(str)) {
            if (set.contains(str)) {
                z = true;
            } else {
                Set<String> parentShardIds = getParentShardIds(map.get(str), map);
                for (String str2 : parentShardIds) {
                    if (checkIfDescendantAndAddNewLeasesForAncestors(str2, initialPositionInStreamExtended, set, map, map2, map3)) {
                        z = true;
                        hashSet.add(str2);
                        LOG.debug("Parent shard " + str2 + " is a descendant.");
                    } else {
                        LOG.debug("Parent shard " + str2 + " is NOT a descendant.");
                    }
                }
                if (z) {
                    for (String str3 : parentShardIds) {
                        if (!set.contains(str3)) {
                            LOG.debug("Need to create a lease for shardId " + str3);
                            KinesisClientLease kinesisClientLease = map2.get(str3);
                            if (kinesisClientLease == null) {
                                kinesisClientLease = newKCLLease(map.get(str3));
                                map2.put(str3, kinesisClientLease);
                            }
                            if (!hashSet.contains(str3) || initialPositionInStreamExtended.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) {
                                kinesisClientLease.setCheckpoint(convertToCheckpoint(initialPositionInStreamExtended));
                            } else {
                                kinesisClientLease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
                            }
                        }
                    }
                } else if (initialPositionInStreamExtended.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON) || initialPositionInStreamExtended.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) {
                    z = true;
                }
            }
        }
        map3.put(str, Boolean.valueOf(z));
        return z;
    }

    static Set<String> getParentShardIds(Shard shard, Map<String, Shard> map) {
        HashSet hashSet = new HashSet(2);
        String parentShardId = shard.getParentShardId();
        if (parentShardId != null && map.containsKey(parentShardId)) {
            hashSet.add(parentShardId);
        }
        String adjacentParentShardId = shard.getAdjacentParentShardId();
        if (adjacentParentShardId != null && map.containsKey(adjacentParentShardId)) {
            hashSet.add(adjacentParentShardId);
        }
        return hashSet;
    }

    private static void cleanupGarbageLeases(List<Shard> list, List<KinesisClientLease> list2, IKinesisProxy iKinesisProxy, ILeaseManager<KinesisClientLease> iLeaseManager) throws KinesisClientLibIOException, DependencyException, InvalidStateException, ProvisionedThroughputException {
        HashSet hashSet = new HashSet();
        Iterator<Shard> it = list.iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().getShardId());
        }
        ArrayList<KinesisClientLease> arrayList = new ArrayList();
        for (KinesisClientLease kinesisClientLease : list2) {
            if (isCandidateForCleanup(kinesisClientLease, hashSet)) {
                arrayList.add(kinesisClientLease);
            }
        }
        if (arrayList.isEmpty()) {
            return;
        }
        LOG.info("Found " + arrayList.size() + " candidate leases for cleanup. Refreshing list of Kinesis shards to pick up recent/latest shards");
        List<Shard> shardList = getShardList(iKinesisProxy);
        HashSet hashSet2 = new HashSet();
        Iterator<Shard> it2 = shardList.iterator();
        while (it2.hasNext()) {
            hashSet2.add(it2.next().getShardId());
        }
        for (KinesisClientLease kinesisClientLease2 : arrayList) {
            if (isCandidateForCleanup(kinesisClientLease2, hashSet2)) {
                LOG.info("Deleting lease for shard " + kinesisClientLease2.getLeaseKey() + " as it is not present in Kinesis stream.");
                iLeaseManager.deleteLease(kinesisClientLease2);
            }
        }
    }

    static boolean isCandidateForCleanup(KinesisClientLease kinesisClientLease, Set<String> set) throws KinesisClientLibIOException {
        boolean z = true;
        if (set.contains(kinesisClientLease.getLeaseKey())) {
            z = false;
        } else {
            LOG.info("Found lease for non-existent shard: " + kinesisClientLease.getLeaseKey() + ". Checking its parent shards");
            for (String str : kinesisClientLease.getParentShardIds()) {
                if (set.contains(str)) {
                    String str2 = "Parent shard " + str + " exists but not the child shard " + kinesisClientLease.getLeaseKey();
                    LOG.info(str2);
                    throw new KinesisClientLibIOException(str2);
                }
            }
        }
        return z;
    }

    private static synchronized void cleanupLeasesOfFinishedShards(Collection<KinesisClientLease> collection, Map<String, Shard> map, Map<String, Set<String>> map2, List<KinesisClientLease> list, ILeaseManager<KinesisClientLease> iLeaseManager) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
        HashSet hashSet = new HashSet();
        ArrayList arrayList = new ArrayList();
        for (KinesisClientLease kinesisClientLease : collection) {
            if (kinesisClientLease.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END)) {
                hashSet.add(kinesisClientLease.getLeaseKey());
                arrayList.add(kinesisClientLease);
            }
        }
        if (arrayList.isEmpty()) {
            return;
        }
        assertClosedShardsAreCoveredOrAbsent(map, map2, hashSet);
        Collections.sort(arrayList, new StartingSequenceNumberAndShardIdBasedComparator(map));
        Map<String, KinesisClientLease> constructShardIdToKCLLeaseMap = constructShardIdToKCLLeaseMap(list);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            String leaseKey = ((KinesisClientLease) it.next()).getLeaseKey();
            Set<String> set = map2.get(leaseKey);
            if (leaseKey != null && set != null && !set.isEmpty()) {
                cleanupLeaseForClosedShard(leaseKey, set, constructShardIdToKCLLeaseMap, iLeaseManager);
            }
        }
    }

    static synchronized void cleanupLeaseForClosedShard(String str, Set<String> set, Map<String, KinesisClientLease> map, ILeaseManager<KinesisClientLease> iLeaseManager) throws DependencyException, InvalidStateException, ProvisionedThroughputException {
        KinesisClientLease kinesisClientLease = map.get(str);
        ArrayList arrayList = new ArrayList();
        Iterator<String> it = set.iterator();
        while (it.hasNext()) {
            KinesisClientLease kinesisClientLease2 = map.get(it.next());
            if (kinesisClientLease2 != null) {
                arrayList.add(kinesisClientLease2);
            }
        }
        if (kinesisClientLease != null && kinesisClientLease.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END) && arrayList.size() == set.size()) {
            boolean z = true;
            Iterator it2 = arrayList.iterator();
            while (true) {
                if (!it2.hasNext()) {
                    break;
                } else if (((KinesisClientLease) it2.next()).getCheckpoint().equals(ExtendedSequenceNumber.TRIM_HORIZON)) {
                    z = false;
                    break;
                }
            }
            if (z) {
                LOG.info("Deleting lease for shard " + kinesisClientLease.getLeaseKey() + " as it has been completely processed and processing of child shards has begun.");
                iLeaseManager.deleteLease(kinesisClientLease);
            }
        }
    }

    static KinesisClientLease newKCLLease(Shard shard) {
        KinesisClientLease kinesisClientLease = new KinesisClientLease();
        kinesisClientLease.setLeaseKey(shard.getShardId());
        ArrayList arrayList = new ArrayList(2);
        if (shard.getParentShardId() != null) {
            arrayList.add(shard.getParentShardId());
        }
        if (shard.getAdjacentParentShardId() != null) {
            arrayList.add(shard.getAdjacentParentShardId());
        }
        kinesisClientLease.setParentShardIds(arrayList);
        kinesisClientLease.setOwnerSwitchesSinceCheckpoint(0L);
        return kinesisClientLease;
    }

    static Map<String, Shard> constructShardIdToShardMap(List<Shard> list) {
        HashMap hashMap = new HashMap();
        for (Shard shard : list) {
            hashMap.put(shard.getShardId(), shard);
        }
        return hashMap;
    }

    static List<Shard> getOpenShards(List<Shard> list) {
        ArrayList arrayList = new ArrayList();
        for (Shard shard : list) {
            if (shard.getSequenceNumberRange().getEndingSequenceNumber() == null) {
                arrayList.add(shard);
                LOG.debug("Found open shard: " + shard.getShardId());
            }
        }
        return arrayList;
    }

    private static ExtendedSequenceNumber convertToCheckpoint(InitialPositionInStreamExtended initialPositionInStreamExtended) {
        ExtendedSequenceNumber extendedSequenceNumber = null;
        if (initialPositionInStreamExtended.getInitialPositionInStream().equals(InitialPositionInStream.TRIM_HORIZON)) {
            extendedSequenceNumber = ExtendedSequenceNumber.TRIM_HORIZON;
        } else if (initialPositionInStreamExtended.getInitialPositionInStream().equals(InitialPositionInStream.LATEST)) {
            extendedSequenceNumber = ExtendedSequenceNumber.LATEST;
        } else if (initialPositionInStreamExtended.getInitialPositionInStream().equals(InitialPositionInStream.AT_TIMESTAMP)) {
            extendedSequenceNumber = ExtendedSequenceNumber.AT_TIMESTAMP;
        }
        return extendedSequenceNumber;
    }
}
