/* * SPDX-License-Identifier: Apache-2.0 * * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ /* * Licensed to Elasticsearch under one or more contributor * license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright * ownership. Elasticsearch licenses this file to you under * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ /* * Modifications Copyright OpenSearch Contributors. See * GitHub history for details. */ package org.opensearch.index.seqno; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.LegacyESVersion; import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.action.support.GroupedActionListener; import org.opensearch.action.support.replication.ReplicationResponse; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.AllocationId; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.common.SuppressForbidden; import org.opensearch.common.collect.Tuple; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.gateway.WriteStateException; import org.opensearch.index.IndexSettings; import org.opensearch.index.SegmentReplicationShardStats; import org.opensearch.index.engine.SafeCommitInfo; import org.opensearch.index.shard.AbstractIndexShardComponent; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ReplicationGroup; import org.opensearch.core.index.shard.ShardId; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationTimer; import java.io.IOException; import java.nio.file.Path; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongConsumer; import java.util.function.LongSupplier; import java.util.function.Supplier; import java.util.function.ToLongFunction; import java.util.stream.Collectors; import java.util.stream.LongStream; import java.util.stream.Stream; import java.util.stream.StreamSupport; /** * This class is responsible for tracking the replication group with its progress and safety markers (local and global checkpoints). * * The global checkpoint is the highest sequence number for which all lower (or equal) sequence number have been processed * on all shards that are currently active. Since shards count as "active" when the cluster-manager starts * them, and before this primary shard has been notified of this fact, we also include shards that have completed recovery. These shards * have received all old operations via the recovery mechanism and are kept up to date by the various replications actions. The set of * shards that are taken into account for the global checkpoint calculation are called the "in-sync shards". * <p> * The global checkpoint is maintained by the primary shard and is replicated to all the replicas (via {@link GlobalCheckpointSyncAction}). * * @opensearch.internal */ public class ReplicationTracker extends AbstractIndexShardComponent implements LongSupplier { /** * The allocation ID for the shard to which this tracker is a component of. */ final String shardAllocationId; /** * The global checkpoint tracker can operate in two modes: * - primary: this shard is in charge of collecting local checkpoint information from all shard copies and computing the global * checkpoint based on the local checkpoints of all in-sync shard copies. * - replica: this shard receives global checkpoint information from the primary (see * {@link #updateGlobalCheckpointOnReplica(long, String)}). * * When a shard is initialized (be it a primary or replica), it initially operates in replica mode. The global checkpoint tracker is * then switched to primary mode in the following three scenarios: * * - An initializing primary shard that is not a relocation target is moved to primary mode (using {@link #activatePrimaryMode}) once * the shard becomes active. * - An active replica shard is moved to primary mode (using {@link #activatePrimaryMode}) once it is promoted to primary. * - A primary relocation target is moved to primary mode (using {@link #activateWithPrimaryContext}) during the primary relocation * handoff. If the target shard is successfully initialized in primary mode, the source shard of a primary relocation is then moved * to replica mode (using {@link #completeRelocationHandoff}), as the relocation target will be in charge of the global checkpoint * computation from that point on. */ volatile boolean primaryMode; /** * The current operation primary term. Management of this value is done through {@link IndexShard} and must only be done when safe. See * {@link #setOperationPrimaryTerm(long)}. */ private volatile long operationPrimaryTerm; /** * Boolean flag that indicates if a relocation handoff is in progress. A handoff is started by calling * {@link #startRelocationHandoff(String)} and is finished by either calling {@link #completeRelocationHandoff} or * {@link #abortRelocationHandoff}, depending on whether the handoff was successful or not. During the handoff, which has as main * objective to transfer the internal state of the global checkpoint tracker from the relocation source to the target, the list of * in-sync shard copies cannot grow, otherwise the relocation target might miss this information and increase the global checkpoint * to eagerly. As consequence, some of the methods in this class are not allowed to be called while a handoff is in progress, * in particular {@link #markAllocationIdAsInSync}. * * A notable exception to this is the method {@link #updateFromClusterManager}, which is still allowed to be called during a relocation handoff. * The reason for this is that the handoff might fail and can be aborted (using {@link #abortRelocationHandoff}), in which case * it is important that the global checkpoint tracker does not miss any state updates that might happened during the handoff attempt. * This means, however, that the global checkpoint can still advance after the primary relocation handoff has been initiated, but only * because the cluster-manager could have failed some of the in-sync shard copies and marked them as stale. That is ok though, as this * information is conveyed through cluster state updates, and the new primary relocation target will also eventually learn about those. */ boolean handoffInProgress; /** * Boolean flag that indicates whether a relocation handoff completed (see {@link #completeRelocationHandoff}). */ volatile boolean relocated; /** * The global checkpoint tracker relies on the property that cluster state updates are applied in-order. After transferring a primary * context from the primary relocation source to the target and initializing the target, it is possible for the target to apply a * cluster state that is older than the one upon which the primary context was based. If we allowed this old cluster state * to influence the list of in-sync shard copies here, this could possibly remove such an in-sync copy from the internal structures * until the newer cluster state were to be applied, which would unsafely advance the global checkpoint. This field thus captures * the version of the last applied cluster state to ensure in-order updates. */ long appliedClusterStateVersion; IndexShardRoutingTable routingTable; /** * Local checkpoint information for all shard copies that are tracked. Has an entry for all shard copies that are either initializing * and / or in-sync, possibly also containing information about unassigned in-sync shard copies. The information that is tracked for * each shard copy is explained in the docs for the {@link CheckpointState} class. */ final Map<String, CheckpointState> checkpoints; /** * The current in-memory global checkpoint. In primary mode, this is a cached version of the checkpoint computed from the local * checkpoints. In replica mode, this is the in-memory global checkpoint that's communicated by the primary. */ volatile long globalCheckpoint; /** * A callback invoked when the in-memory global checkpoint is updated. For primary mode this occurs if the computed global checkpoint * advances on the basis of state changes tracked here. For non-primary mode this occurs if the local knowledge of the global checkpoint * advances due to an update from the primary. */ private final LongConsumer onGlobalCheckpointUpdated; /** * A supplier of the current time. This supplier is used to add a timestamp to retention leases, and to determine retention lease * expiration. */ private final LongSupplier currentTimeMillisSupplier; /** * A callback when a new retention lease is created or an existing retention lease is removed. In practice, this callback invokes the * retention lease sync action, to sync retention leases to replicas. */ private final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onSyncRetentionLeases; /** * This set contains allocation IDs for which there is a thread actively waiting for the local checkpoint to advance to at least the * current global checkpoint. */ final Set<String> pendingInSync; /** * Cached value for the last replication group that was computed */ volatile ReplicationGroup replicationGroup; /** * The current retention leases. */ private RetentionLeases retentionLeases = RetentionLeases.EMPTY; /** * The primary term of the most-recently persisted retention leases. This is used to check if we need to persist the current retention * leases. */ private long persistedRetentionLeasesPrimaryTerm; /** * The version of the most-recently persisted retention leases. This is used to check if we need to persist the current retention * leases. */ private long persistedRetentionLeasesVersion; /** * Whether there should be a peer recovery retention lease (PRRL) for every tracked shard copy. Always true on indices created from * {@code LegacyESVersion#V_7_4_0} onwards, because these versions create PRRLs properly. May be false on indices created in an * earlier version if we recently did a rolling upgrade and * {@link ReplicationTracker#createMissingPeerRecoveryRetentionLeases(ActionListener)} has not yet completed. Is only permitted * to change from false to true; can be removed once support for pre-PRRL indices is no longer needed. */ private boolean hasAllPeerRecoveryRetentionLeases; /** * Supplies information about the current safe commit which may be used to expire peer-recovery retention leases. */ private final Supplier<SafeCommitInfo> safeCommitInfoSupplier; /** * Threshold for expiring peer-recovery retention leases and falling back to file-based recovery. See * {@link IndexSettings#FILE_BASED_RECOVERY_THRESHOLD_SETTING}. */ private final double fileBasedRecoveryThreshold; private final Consumer<ReplicationGroup> onReplicationGroupUpdated; private volatile ReplicationCheckpoint latestReplicationCheckpoint; /** * Get all retention leases tracked on this shard. * * @return the retention leases */ public RetentionLeases getRetentionLeases() { return getRetentionLeases(false).v2(); } /** * If the expire leases parameter is false, gets all retention leases tracked on this shard and otherwise first calculates * expiration of existing retention leases, and then gets all non-expired retention leases tracked on this shard. Note that only the * primary shard calculates which leases are expired, and if any have expired, syncs the retention leases to any replicas. If the * expire leases parameter is true, this replication tracker must be in primary mode. * * @return a tuple indicating whether or not any retention leases were expired, and the non-expired retention leases */ public synchronized Tuple<Boolean, RetentionLeases> getRetentionLeases(final boolean expireLeases) { if (expireLeases == false) { return Tuple.tuple(false, retentionLeases); } assert primaryMode; // the primary calculates the non-expired retention leases and syncs them to replicas final long currentTimeMillis = currentTimeMillisSupplier.getAsLong(); final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis(); final Set<String> leaseIdsForCurrentPeers = routingTable.assignedShards() .stream() .map(ReplicationTracker::getPeerRecoveryRetentionLeaseId) .collect(Collectors.toSet()); final boolean allShardsStarted = routingTable.allShardsStarted(); final long minimumReasonableRetainedSeqNo = allShardsStarted ? 0L : getMinimumReasonableRetainedSeqNo(); final Map<Boolean, List<RetentionLease>> partitionByExpiration = retentionLeases.leases() .stream() .collect(Collectors.groupingBy(lease -> { if (lease.source().equals(PEER_RECOVERY_RETENTION_LEASE_SOURCE)) { if (leaseIdsForCurrentPeers.contains(lease.id())) { return false; } if (allShardsStarted) { logger.trace("expiring unused [{}]", lease); return true; } if (lease.retainingSequenceNumber() < minimumReasonableRetainedSeqNo) { logger.trace("expiring unreasonable [{}] retaining history before [{}]", lease, minimumReasonableRetainedSeqNo); return true; } } return currentTimeMillis - lease.timestamp() > retentionLeaseMillis; })); final Collection<RetentionLease> expiredLeases = partitionByExpiration.get(true); if (expiredLeases == null) { // early out as no retention leases have expired logger.debug("no retention leases are expired from current retention leases [{}]", retentionLeases); return Tuple.tuple(false, retentionLeases); } final Collection<RetentionLease> nonExpiredLeases = partitionByExpiration.get(false) != null ? partitionByExpiration.get(false) : Collections.emptyList(); logger.debug("expiring retention leases [{}] from current retention leases [{}]", expiredLeases, retentionLeases); retentionLeases = new RetentionLeases(operationPrimaryTerm, retentionLeases.version() + 1, nonExpiredLeases); return Tuple.tuple(true, retentionLeases); } private long getMinimumReasonableRetainedSeqNo() { final SafeCommitInfo safeCommitInfo = safeCommitInfoSupplier.get(); return safeCommitInfo.localCheckpoint + 1 - Math.round(Math.ceil(safeCommitInfo.docCount * fileBasedRecoveryThreshold)); // NB safeCommitInfo.docCount is a very low-level count of the docs in the index, and in particular if this shard contains nested // docs then safeCommitInfo.docCount counts every child doc separately from the parent doc. However every part of a nested document // has the same seqno, so we may be overestimating the cost of a file-based recovery when compared to an ops-based recovery and // therefore preferring ops-based recoveries inappropriately in this case. Correctly accounting for nested docs seems difficult to // do cheaply, and the circumstances in which this matters should be relatively rare, so we use this naive calculation regardless. // TODO improve this measure for when nested docs are in use } /** * Adds a new retention lease. * * @param id the identifier of the retention lease * @param retainingSequenceNumber the retaining sequence number * @param source the source of the retention lease * @param listener the callback when the retention lease is successfully added and synced to replicas * @return the new retention lease * @throws RetentionLeaseAlreadyExistsException if the specified retention lease already exists */ public RetentionLease addRetentionLease( final String id, final long retainingSequenceNumber, final String source, final ActionListener<ReplicationResponse> listener ) { Objects.requireNonNull(listener); final RetentionLease retentionLease; final RetentionLeases currentRetentionLeases; synchronized (this) { retentionLease = innerAddRetentionLease(id, retainingSequenceNumber, source); currentRetentionLeases = retentionLeases; } onSyncRetentionLeases.accept(currentRetentionLeases, listener); return retentionLease; } /** * Atomically clones an existing retention lease to a new ID. * * @param sourceLeaseId the identifier of the source retention lease * @param targetLeaseId the identifier of the retention lease to create * @param listener the callback when the retention lease is successfully added and synced to replicas * @return the new retention lease * @throws RetentionLeaseNotFoundException if the specified source retention lease does not exist * @throws RetentionLeaseAlreadyExistsException if the specified target retention lease already exists */ RetentionLease cloneRetentionLease(String sourceLeaseId, String targetLeaseId, ActionListener<ReplicationResponse> listener) { Objects.requireNonNull(listener); final RetentionLease retentionLease; final RetentionLeases currentRetentionLeases; synchronized (this) { assert primaryMode; if (getRetentionLeases().contains(sourceLeaseId) == false) { throw new RetentionLeaseNotFoundException(sourceLeaseId); } final RetentionLease sourceLease = getRetentionLeases().get(sourceLeaseId); retentionLease = innerAddRetentionLease(targetLeaseId, sourceLease.retainingSequenceNumber(), sourceLease.source()); currentRetentionLeases = retentionLeases; } // Syncing here may not be strictly necessary, because this new lease isn't retaining any extra history that wasn't previously // retained by the source lease; however we prefer to sync anyway since we expect to do so whenever creating a new lease. onSyncRetentionLeases.accept(currentRetentionLeases, listener); return retentionLease; } /** * Adds a new retention lease, but does not synchronise it with the rest of the replication group. * * @param id the identifier of the retention lease * @param retainingSequenceNumber the retaining sequence number * @param source the source of the retention lease * @return the new retention lease * @throws RetentionLeaseAlreadyExistsException if the specified retention lease already exists */ private RetentionLease innerAddRetentionLease(String id, long retainingSequenceNumber, String source) { assert Thread.holdsLock(this); assert primaryMode : id + "/" + retainingSequenceNumber + "/" + source; if (retentionLeases.contains(id)) { throw new RetentionLeaseAlreadyExistsException(id); } final RetentionLease retentionLease = new RetentionLease( id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source ); logger.debug("adding new retention lease [{}] to current retention leases [{}]", retentionLease, retentionLeases); retentionLeases = new RetentionLeases( operationPrimaryTerm, retentionLeases.version() + 1, Stream.concat(retentionLeases.leases().stream(), Stream.of(retentionLease)).collect(Collectors.toList()) ); return retentionLease; } /** * Renews an existing retention lease. * * @param id the identifier of the retention lease * @param retainingSequenceNumber the retaining sequence number * @param source the source of the retention lease * @return the renewed retention lease * @throws RetentionLeaseNotFoundException if the specified retention lease does not exist * @throws RetentionLeaseInvalidRetainingSeqNoException if the new retaining sequence number is lower than * the retaining sequence number of the current retention lease. */ public synchronized RetentionLease renewRetentionLease(final String id, final long retainingSequenceNumber, final String source) { assert primaryMode; final RetentionLease existingRetentionLease = retentionLeases.get(id); if (existingRetentionLease == null) { throw new RetentionLeaseNotFoundException(id); } if (retainingSequenceNumber < existingRetentionLease.retainingSequenceNumber()) { assert PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals(source) == false : "renewing peer recovery retention lease [" + existingRetentionLease + "]" + " with a lower retaining sequence number [" + retainingSequenceNumber + "]"; throw new RetentionLeaseInvalidRetainingSeqNoException(id, source, retainingSequenceNumber, existingRetentionLease); } final RetentionLease retentionLease = new RetentionLease( id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source ); retentionLeases = new RetentionLeases( operationPrimaryTerm, retentionLeases.version() + 1, Stream.concat(retentionLeases.leases().stream().filter(lease -> lease.id().equals(id) == false), Stream.of(retentionLease)) .collect(Collectors.toList()) ); return retentionLease; } /** * Removes an existing retention lease. * * @param id the identifier of the retention lease * @param listener the callback when the retention lease is successfully removed and synced to replicas */ public void removeRetentionLease(final String id, final ActionListener<ReplicationResponse> listener) { Objects.requireNonNull(listener); final RetentionLeases currentRetentionLeases; synchronized (this) { assert primaryMode; if (retentionLeases.contains(id) == false) { throw new RetentionLeaseNotFoundException(id); } logger.debug("removing retention lease [{}] from current retention leases [{}]", id, retentionLeases); retentionLeases = new RetentionLeases( operationPrimaryTerm, retentionLeases.version() + 1, retentionLeases.leases().stream().filter(lease -> lease.id().equals(id) == false).collect(Collectors.toList()) ); currentRetentionLeases = retentionLeases; } onSyncRetentionLeases.accept(currentRetentionLeases, listener); } /** * Updates retention leases on a replica. * * @param retentionLeases the retention leases */ public synchronized void updateRetentionLeasesOnReplica(final RetentionLeases retentionLeases) { assert primaryMode == false; if (retentionLeases.supersedes(this.retentionLeases)) { this.retentionLeases = retentionLeases; } } /** * Loads the latest retention leases from their dedicated state file. * * @param path the path to the directory containing the state file * @return the retention leases * @throws IOException if an I/O exception occurs reading the retention leases */ public RetentionLeases loadRetentionLeases(final Path path) throws IOException { final RetentionLeases retentionLeases; synchronized (retentionLeasePersistenceLock) { retentionLeases = RetentionLeases.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path); } // TODO after backporting we expect this never to happen in 8.x, so adjust this to throw an exception instead. assert Version.CURRENT.major <= 8 : "throw an exception instead of returning EMPTY on null"; if (retentionLeases == null) { return RetentionLeases.EMPTY; } return retentionLeases; } private final Object retentionLeasePersistenceLock = new Object(); /** * Persists the current retention leases to their dedicated state file. If this version of the retention leases are already persisted * then persistence is skipped. * * @param path the path to the directory containing the state file * @throws WriteStateException if an exception occurs writing the state file */ public void persistRetentionLeases(final Path path) throws WriteStateException { synchronized (retentionLeasePersistenceLock) { final RetentionLeases currentRetentionLeases; synchronized (this) { if (retentionLeases.supersedes(persistedRetentionLeasesPrimaryTerm, persistedRetentionLeasesVersion) == false) { logger.trace("skipping persisting retention leases [{}], already persisted", retentionLeases); return; } currentRetentionLeases = retentionLeases; } logger.trace("persisting retention leases [{}]", currentRetentionLeases); RetentionLeases.FORMAT.writeAndCleanup(currentRetentionLeases, path); persistedRetentionLeasesPrimaryTerm = currentRetentionLeases.primaryTerm(); persistedRetentionLeasesVersion = currentRetentionLeases.version(); } } public boolean assertRetentionLeasesPersisted(final Path path) throws IOException { assert RetentionLeases.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path) != null; return true; } /** * Retention leases for peer recovery have source {@link ReplicationTracker#PEER_RECOVERY_RETENTION_LEASE_SOURCE}, a lease ID * containing the persistent node ID calculated by {@link ReplicationTracker#getPeerRecoveryRetentionLeaseId}, and retain operations * with sequence numbers strictly greater than the given global checkpoint. */ public RetentionLease addPeerRecoveryRetentionLease( String nodeId, long globalCheckpoint, ActionListener<ReplicationResponse> listener ) { return addRetentionLease( getPeerRecoveryRetentionLeaseId(nodeId), globalCheckpoint + 1, PEER_RECOVERY_RETENTION_LEASE_SOURCE, listener ); } public RetentionLease cloneLocalPeerRecoveryRetentionLease(String nodeId, ActionListener<ReplicationResponse> listener) { return cloneRetentionLease( getPeerRecoveryRetentionLeaseId(routingTable.primaryShard()), getPeerRecoveryRetentionLeaseId(nodeId), listener ); } public void removePeerRecoveryRetentionLease(String nodeId, ActionListener<ReplicationResponse> listener) { removeRetentionLease(getPeerRecoveryRetentionLeaseId(nodeId), listener); } /** * Source for peer recovery retention leases; see {@link ReplicationTracker#addPeerRecoveryRetentionLease}. */ public static final String PEER_RECOVERY_RETENTION_LEASE_SOURCE = "peer recovery"; /** * Id for a peer recovery retention lease for the given node. See {@link ReplicationTracker#addPeerRecoveryRetentionLease}. */ public static String getPeerRecoveryRetentionLeaseId(String nodeId) { return "peer_recovery/" + nodeId; } /** * Id for a peer recovery retention lease for the given {@link ShardRouting}. * See {@link ReplicationTracker#addPeerRecoveryRetentionLease}. */ public static String getPeerRecoveryRetentionLeaseId(ShardRouting shardRouting) { return getPeerRecoveryRetentionLeaseId(shardRouting.currentNodeId()); } /** * Returns a list of peer recovery retention leases installed in this replication group */ public List<RetentionLease> getPeerRecoveryRetentionLeases() { return getRetentionLeases().leases() .stream() .filter(lease -> PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals(lease.source())) .collect(Collectors.toList()); } /** * Advance the peer-recovery retention leases for all assigned shard copies to discard history below the corresponding global * checkpoint, and renew any leases that are approaching expiry. */ public synchronized void renewPeerRecoveryRetentionLeases() { assert primaryMode; assert invariant(); /* * Peer-recovery retention leases never expire while the associated shard is assigned, but we must still renew them occasionally in * case the associated shard is temporarily unassigned. However we must not renew them too often, since each renewal must be * persisted and the resulting IO can be expensive on nodes with large numbers of shards (see #42299). We choose to renew them after * half the expiry time, so that by default the cluster has at least 6 hours to recover before these leases start to expire. */ final long renewalTimeMillis = currentTimeMillisSupplier.getAsLong() - indexSettings.getRetentionLeaseMillis() / 2; /* * If any of the peer-recovery retention leases need renewal, it's a good opportunity to renew them all. */ final boolean renewalNeeded = StreamSupport.stream(routingTable.spliterator(), false) .filter(ShardRouting::assignedToNode) .anyMatch(shardRouting -> { final RetentionLease retentionLease = retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting)); if (retentionLease == null) { /* * If this shard copy is tracked then we got here here via a rolling upgrade from an older version that doesn't * create peer recovery retention leases for every shard copy. */ assert (checkpoints.get(shardRouting.allocationId().getId()).tracked && checkpoints.get(shardRouting.allocationId().getId()).replicated == false) || checkpoints.get(shardRouting.allocationId().getId()).tracked == false || hasAllPeerRecoveryRetentionLeases == false; return false; } return retentionLease.timestamp() <= renewalTimeMillis || retentionLease.retainingSequenceNumber() <= checkpoints.get(shardRouting.allocationId().getId()).globalCheckpoint; }); if (renewalNeeded) { for (ShardRouting shardRouting : routingTable) { if (shardRouting.assignedToNode()) { final RetentionLease retentionLease = retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting)); if (retentionLease != null) { final CheckpointState checkpointState = checkpoints.get(shardRouting.allocationId().getId()); final long newRetainedSequenceNumber = Math.max(0L, checkpointState.globalCheckpoint + 1L); if (retentionLease.retainingSequenceNumber() <= newRetainedSequenceNumber) { renewRetentionLease( getPeerRecoveryRetentionLeaseId(shardRouting), newRetainedSequenceNumber, PEER_RECOVERY_RETENTION_LEASE_SOURCE ); } else { // the retention lease is tied to the node, not the shard copy, so it's possible a copy was removed and now // we are in the process of recovering it again, or maybe we were just promoted and have not yet received the // global checkpoints from our peers. assert checkpointState.globalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO : "cannot renew " + retentionLease + " according to " + checkpointState + " for " + shardRouting; } } } } } assert invariant(); } /** * The state of the lucene checkpoint * * @opensearch.internal */ public static class CheckpointState implements Writeable { /** * the last local checkpoint information that we have for this shard. All operations up to this point are properly fsynced to disk. */ long localCheckpoint; /** * the last global checkpoint information that we have for this shard. This is the global checkpoint that's fsynced to disk on the * respective shard, and all operations up to this point are properly fsynced to disk as well. */ long globalCheckpoint; /** * When a shard is in-sync, it is capable of being promoted as the primary during a failover. An in-sync shard * contributes to global checkpoint calculation on the primary iff {@link CheckpointState#replicated} is true. */ boolean inSync; /** * whether this shard is tracked in the replication group and has localTranslog, i.e., should receive document updates * from the primary. Tracked shards with localTranslog would have corresponding retention leases on the primary shard's * {@link ReplicationTracker}. */ boolean tracked; /** * Whether the replication requests to the primary are replicated to the concerned shard or not. */ boolean replicated; /** * The currently searchable replication checkpoint. */ ReplicationCheckpoint visibleReplicationCheckpoint; /** * Map of ReplicationCheckpoints to ReplicationTimers. Timers are added as new checkpoints are published, and removed when * the replica is caught up. */ Map<ReplicationCheckpoint, ReplicationTimer> checkpointTimers; /** * The time it took to complete the most recent replication event. */ long lastCompletedReplicationLag; public CheckpointState(long localCheckpoint, long globalCheckpoint, boolean inSync, boolean tracked, boolean replicated) { this.localCheckpoint = localCheckpoint; this.globalCheckpoint = globalCheckpoint; this.inSync = inSync; this.tracked = tracked; this.replicated = replicated; this.checkpointTimers = ConcurrentCollections.newConcurrentMap(); } public CheckpointState(StreamInput in) throws IOException { this.localCheckpoint = in.readZLong(); this.globalCheckpoint = in.readZLong(); this.inSync = in.readBoolean(); this.tracked = in.readBoolean(); if (in.getVersion().onOrAfter(Version.V_2_5_0)) { this.replicated = in.readBoolean(); } else { this.replicated = true; } } @Override public void writeTo(StreamOutput out) throws IOException { out.writeZLong(localCheckpoint); out.writeZLong(globalCheckpoint); out.writeBoolean(inSync); out.writeBoolean(tracked); if (out.getVersion().onOrAfter(Version.V_2_5_0)) { out.writeBoolean(replicated); } } /** * Returns a full copy of this object */ public CheckpointState copy() { return new CheckpointState(localCheckpoint, globalCheckpoint, inSync, tracked, replicated); } public long getLocalCheckpoint() { return localCheckpoint; } public long getGlobalCheckpoint() { return globalCheckpoint; } @Override public String toString() { return "LocalCheckpointState{" + "localCheckpoint=" + localCheckpoint + ", globalCheckpoint=" + globalCheckpoint + ", inSync=" + inSync + ", tracked=" + tracked + ", replicated=" + replicated + '}'; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; CheckpointState that = (CheckpointState) o; if (localCheckpoint != that.localCheckpoint) return false; if (globalCheckpoint != that.globalCheckpoint) return false; if (inSync != that.inSync) return false; if (tracked != that.tracked) return false; return replicated == that.replicated; } @Override public int hashCode() { int result = Long.hashCode(localCheckpoint); result = 31 * result + Long.hashCode(globalCheckpoint); result = 31 * result + Boolean.hashCode(inSync); result = 31 * result + Boolean.hashCode(tracked); result = 31 * result + Boolean.hashCode(replicated); return result; } } /** * Get the local knowledge of the persisted global checkpoints for all in-sync allocation IDs. * * @return a map from allocation ID to the local knowledge of the persisted global checkpoint for that allocation ID */ public synchronized Map<String, Long> getInSyncGlobalCheckpoints() { assert primaryMode; assert handoffInProgress == false; final Map<String, Long> globalCheckpoints = new HashMap<>(checkpoints.size()); // upper bound on the size checkpoints.entrySet() .stream() .filter(e -> e.getValue().inSync && e.getValue().replicated) .forEach(e -> globalCheckpoints.put(e.getKey(), e.getValue().globalCheckpoint)); return globalCheckpoints; } /** * Returns whether the replication tracker is in primary mode, i.e., whether the current shard is acting as primary from the point of * view of replication. */ public boolean isPrimaryMode() { return primaryMode; } /** * Returns the current operation primary term. * * @return the primary term */ public long getOperationPrimaryTerm() { return operationPrimaryTerm; } /** * Sets the current operation primary term. This method should be invoked only when no other operations are possible on the shard. That * is, either from the constructor of {@link IndexShard} or while holding all permits on the {@link IndexShard} instance. * * @param operationPrimaryTerm the new operation primary term */ public void setOperationPrimaryTerm(final long operationPrimaryTerm) { this.operationPrimaryTerm = operationPrimaryTerm; } /** * Returns whether the replication tracker has relocated away to another shard copy. */ public boolean isRelocated() { return relocated; } /** * Class invariant that should hold before and after every invocation of public methods on this class. As Java lacks implication * as a logical operator, many of the invariants are written under the form (!A || B), they should be read as (A implies B) however. */ private boolean invariant() { // local checkpoints only set during primary mode assert primaryMode || checkpoints.values().stream().allMatch(lcps -> lcps.localCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO); // global checkpoints only set during primary mode assert primaryMode || checkpoints.values().stream().allMatch(cps -> cps.globalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO); // relocation handoff can only occur in primary mode assert !handoffInProgress || primaryMode; // a relocated copy is not in primary mode assert !relocated || !primaryMode; // the current shard is marked as in-sync when the global checkpoint tracker operates in primary mode assert !primaryMode || checkpoints.get(shardAllocationId).inSync; // the current shard is marked as tracked when the global checkpoint tracker operates in primary mode assert !primaryMode || checkpoints.get(shardAllocationId).tracked; // the routing table and replication group is set when the global checkpoint tracker operates in primary mode assert !primaryMode || (routingTable != null && replicationGroup != null) : "primary mode but routing table is " + routingTable + " and replication group is " + replicationGroup; // when in primary mode, the current allocation ID is the allocation ID of the primary or the relocation allocation ID assert !primaryMode || (routingTable.primaryShard().allocationId().getId().equals(shardAllocationId) || routingTable.primaryShard().allocationId().getRelocationId().equals(shardAllocationId)); // during relocation handoff there are no entries blocking global checkpoint advancement assert !handoffInProgress || pendingInSync.isEmpty() : "entries blocking global checkpoint advancement during relocation handoff: " + pendingInSync; // entries blocking global checkpoint advancement can only exist in primary mode and when not having a relocation handoff assert pendingInSync.isEmpty() || (primaryMode && !handoffInProgress); // the computed global checkpoint is always up-to-date assert !primaryMode || globalCheckpoint == computeGlobalCheckpoint(pendingInSync, checkpoints.values(), globalCheckpoint) : "global checkpoint is not up-to-date, expected: " + computeGlobalCheckpoint(pendingInSync, checkpoints.values(), globalCheckpoint) + " but was: " + globalCheckpoint; // when in primary mode, the global checkpoint is at most the minimum local checkpoint on all in-sync shard copies assert !primaryMode || globalCheckpoint <= inSyncCheckpointStates(checkpoints, CheckpointState::getLocalCheckpoint, LongStream::min) : "global checkpoint [" + globalCheckpoint + "] " + "for primary mode allocation ID [" + shardAllocationId + "] " + "more than in-sync local checkpoints [" + checkpoints + "]"; // we have a routing table iff we have a replication group assert (routingTable == null) == (replicationGroup == null) : "routing table is " + routingTable + " but replication group is " + replicationGroup; assert replicationGroup == null || replicationGroup.equals(calculateReplicationGroup()) : "cached replication group out of sync: expected: " + calculateReplicationGroup() + " but was: " + replicationGroup; // all assigned shards from the routing table are tracked assert routingTable == null || checkpoints.keySet().containsAll(routingTable.getAllAllocationIds()) : "local checkpoints " + checkpoints + " not in-sync with routing table " + routingTable; for (Map.Entry<String, CheckpointState> entry : checkpoints.entrySet()) { // blocking global checkpoint advancement only happens for shards that are not in-sync assert !pendingInSync.contains(entry.getKey()) || !entry.getValue().inSync : "shard copy " + entry.getKey() + " blocks global checkpoint advancement but is in-sync"; // in-sync shard copies are tracked assert !entry.getValue().inSync || entry.getValue().tracked : "shard copy " + entry.getKey() + " is in-sync but not tracked"; } // all pending in sync shards are tracked for (String aId : pendingInSync) { assert checkpoints.get(aId) != null : "aId [" + aId + "] is pending in sync but isn't tracked"; } if (primaryMode && indexSettings.isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases) { // all tracked shard copies have a corresponding peer-recovery retention lease for (final ShardRouting shardRouting : routingTable.assignedShards()) { final CheckpointState cps = checkpoints.get(shardRouting.allocationId().getId()); if (cps.tracked && cps.replicated) { assert retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting)) : "no retention lease for tracked shard [" + shardRouting + "] in " + retentionLeases; assert PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals( retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting)).source() ) : "incorrect source [" + retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting)).source() + "] for [" + shardRouting + "] in " + retentionLeases; } } } return true; } private static long inSyncCheckpointStates( final Map<String, CheckpointState> checkpoints, ToLongFunction<CheckpointState> function, Function<LongStream, OptionalLong> reducer ) { final OptionalLong value = reducer.apply( checkpoints.values() .stream() .filter(cps -> cps.inSync && cps.replicated) .mapToLong(function) .filter(v -> v != SequenceNumbers.UNASSIGNED_SEQ_NO) ); return value.isPresent() ? value.getAsLong() : SequenceNumbers.UNASSIGNED_SEQ_NO; } public ReplicationTracker( final ShardId shardId, final String allocationId, final IndexSettings indexSettings, final long operationPrimaryTerm, final long globalCheckpoint, final LongConsumer onGlobalCheckpointUpdated, final LongSupplier currentTimeMillisSupplier, final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onSyncRetentionLeases, final Supplier<SafeCommitInfo> safeCommitInfoSupplier ) { this( shardId, allocationId, indexSettings, operationPrimaryTerm, globalCheckpoint, onGlobalCheckpointUpdated, currentTimeMillisSupplier, onSyncRetentionLeases, safeCommitInfoSupplier, x -> {} ); } /** * Initialize the global checkpoint service. The specified global checkpoint should be set to the last known global checkpoint, or * {@link SequenceNumbers#UNASSIGNED_SEQ_NO}. * * @param shardId the shard ID * @param allocationId the allocation ID * @param indexSettings the index settings * @param operationPrimaryTerm the current primary term * @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} * @param onSyncRetentionLeases a callback when a new retention lease is created or an existing retention lease expires * @param onReplicationGroupUpdated a callback when the replica group changes */ public ReplicationTracker( final ShardId shardId, final String allocationId, final IndexSettings indexSettings, final long operationPrimaryTerm, final long globalCheckpoint, final LongConsumer onGlobalCheckpointUpdated, final LongSupplier currentTimeMillisSupplier, final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onSyncRetentionLeases, final Supplier<SafeCommitInfo> safeCommitInfoSupplier, final Consumer<ReplicationGroup> onReplicationGroupUpdated ) { super(shardId, indexSettings); assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint; this.shardAllocationId = allocationId; this.primaryMode = false; this.operationPrimaryTerm = operationPrimaryTerm; this.handoffInProgress = false; this.appliedClusterStateVersion = -1L; this.globalCheckpoint = globalCheckpoint; this.checkpoints = new HashMap<>(1 + indexSettings.getNumberOfReplicas()); this.onGlobalCheckpointUpdated = Objects.requireNonNull(onGlobalCheckpointUpdated); this.currentTimeMillisSupplier = Objects.requireNonNull(currentTimeMillisSupplier); this.onSyncRetentionLeases = Objects.requireNonNull(onSyncRetentionLeases); this.pendingInSync = new HashSet<>(); this.routingTable = null; this.replicationGroup = null; this.hasAllPeerRecoveryRetentionLeases = indexSettings.isSoftDeleteEnabled() && indexSettings.getIndexMetadata().getState() == IndexMetadata.State.OPEN; this.fileBasedRecoveryThreshold = IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.get(indexSettings.getSettings()); this.safeCommitInfoSupplier = safeCommitInfoSupplier; this.onReplicationGroupUpdated = onReplicationGroupUpdated; this.latestReplicationCheckpoint = indexSettings.isSegRepEnabled() ? ReplicationCheckpoint.empty(shardId) : null; assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false; assert invariant(); } /** * Returns the current replication group for the shard. * * @return the replication group */ public ReplicationGroup getReplicationGroup() { assert primaryMode; return replicationGroup; } private void updateReplicationGroupAndNotify() { assert Thread.holdsLock(this); ReplicationGroup newReplicationGroup = calculateReplicationGroup(); replicationGroup = newReplicationGroup; onReplicationGroupUpdated.accept(newReplicationGroup); } private ReplicationGroup calculateReplicationGroup() { long newVersion; if (replicationGroup == null) { newVersion = 0; } else { newVersion = replicationGroup.getVersion() + 1; } assert indexSettings().isRemoteTranslogStoreEnabled() || checkpoints.entrySet().stream().filter(e -> e.getValue().tracked).allMatch(e -> e.getValue().replicated) : "In absence of remote translog store, all tracked shards must have replication mode as LOGICAL_REPLICATION"; return new ReplicationGroup( routingTable, checkpoints.entrySet().stream().filter(e -> e.getValue().inSync).map(Map.Entry::getKey).collect(Collectors.toSet()), checkpoints.entrySet().stream().filter(e -> e.getValue().tracked).map(Map.Entry::getKey).collect(Collectors.toSet()), newVersion ); } /** * Returns the in-memory global checkpoint for the shard. * * @return the global checkpoint */ public long getGlobalCheckpoint() { return globalCheckpoint; } @Override public long getAsLong() { return globalCheckpoint; } /** * Updates the global checkpoint on a replica shard after it has been updated by the primary. * * @param newGlobalCheckpoint the new global checkpoint * @param reason the reason the global checkpoint was updated */ public synchronized void updateGlobalCheckpointOnReplica(final long newGlobalCheckpoint, final String reason) { assert invariant(); assert primaryMode == false; /* * The global checkpoint here is a local knowledge which is updated under the mandate of the primary. It can happen that the primary * information is lagging compared to a replica (e.g., if a replica is promoted to primary but has stale info relative to other * replica shards). In these cases, the local knowledge of the global checkpoint could be higher than the sync from the lagging * primary. */ final long previousGlobalCheckpoint = globalCheckpoint; if (newGlobalCheckpoint > previousGlobalCheckpoint) { globalCheckpoint = newGlobalCheckpoint; logger.trace("updated global checkpoint from [{}] to [{}] due to [{}]", previousGlobalCheckpoint, globalCheckpoint, reason); onGlobalCheckpointUpdated.accept(globalCheckpoint); } assert invariant(); } /** * Update the local knowledge of the persisted global checkpoint for the specified allocation ID. * * @param allocationId the allocation ID to update the global checkpoint for * @param globalCheckpoint the global checkpoint */ public synchronized void updateGlobalCheckpointForShard(final String allocationId, final long globalCheckpoint) { assert primaryMode; assert handoffInProgress == false; assert invariant(); final CheckpointState cps = checkpoints.get(allocationId); assert !this.shardAllocationId.equals(allocationId) || cps != null; if (cps != null && globalCheckpoint > cps.globalCheckpoint) { final long previousGlobalCheckpoint = cps.globalCheckpoint; cps.globalCheckpoint = globalCheckpoint; logger.trace( "updated local knowledge for [{}] on the primary of the global checkpoint from [{}] to [{}]", allocationId, previousGlobalCheckpoint, globalCheckpoint ); } assert invariant(); } /** * Update the local knowledge of the visible checkpoint for the specified allocation ID. * * This method will also stop timers for each shard and compute replication lag metrics. * * @param allocationId the allocation ID to update the global checkpoint for * @param visibleCheckpoint the visible checkpoint */ public synchronized void updateVisibleCheckpointForShard(final String allocationId, final ReplicationCheckpoint visibleCheckpoint) { assert indexSettings.isSegRepEnabled(); assert primaryMode; assert handoffInProgress == false; assert invariant(); final CheckpointState cps = checkpoints.get(allocationId); assert !this.shardAllocationId.equals(allocationId); // Ignore if the cps is null (i.e. replica shard not in active state). if (cps == null) { logger.warn("Ignoring the checkpoint update for allocation ID {} as its not being tracked by primary", allocationId); return; } if (cps.checkpointTimers.isEmpty() == false) { // stop any timers for checkpoints up to the received cp and remove from cps.checkpointTimers. // Compute the max lag from the set of completed timers. final AtomicLong lastFinished = new AtomicLong(0L); cps.checkpointTimers.entrySet().removeIf((entry) -> { boolean result = entry.getKey().isAheadOf(visibleCheckpoint) == false; if (result) { final ReplicationTimer timer = entry.getValue(); timer.stop(); lastFinished.set(Math.max(lastFinished.get(), timer.time())); } return result; }); cps.lastCompletedReplicationLag = lastFinished.get(); } logger.trace( () -> new ParameterizedMessage( "updated local knowledge for [{}] on the primary of the visible checkpoint from [{}] to [{}], active timers {}", allocationId, cps.visibleReplicationCheckpoint, visibleCheckpoint, cps.checkpointTimers.keySet() ) ); cps.visibleReplicationCheckpoint = visibleCheckpoint; assert invariant(); } /** * After a new checkpoint is published, start a timer for each replica to the checkpoint. * @param checkpoint {@link ReplicationCheckpoint} */ public synchronized void setLatestReplicationCheckpoint(ReplicationCheckpoint checkpoint) { assert indexSettings.isSegRepEnabled(); if (checkpoint.equals(latestReplicationCheckpoint) == false) { this.latestReplicationCheckpoint = checkpoint; } if (primaryMode) { startReplicationLagTimers(); } } public ReplicationCheckpoint getLatestReplicationCheckpoint() { return this.latestReplicationCheckpoint; } private void startReplicationLagTimers() { for (Map.Entry<String, CheckpointState> entry : checkpoints.entrySet()) { final String allocationId = entry.getKey(); if (allocationId.equals(this.shardAllocationId) == false) { final CheckpointState cps = entry.getValue(); // if the shard is in checkpoints but is unavailable or out of sync we will not track its replication state. // it is possible for a shard to be in-sync but not yet removed from the checkpoints collection after a failover event. if (cps.inSync && replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false && latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint)) { cps.checkpointTimers.computeIfAbsent(latestReplicationCheckpoint, ignored -> { final ReplicationTimer replicationTimer = new ReplicationTimer(); replicationTimer.start(); return replicationTimer; }); logger.trace( () -> new ParameterizedMessage( "updated last published checkpoint for {} at visible cp {} to {} - timers [{}]", allocationId, cps.visibleReplicationCheckpoint, latestReplicationCheckpoint, cps.checkpointTimers.keySet() ) ); } } } } /** * Fetch stats on segment replication. * @return {@link Tuple} V1 - TimeValue in ms - mean replication lag for this primary to its entire group, * V2 - Set of {@link SegmentReplicationShardStats} per shard in this primary's replication group. */ public synchronized Set<SegmentReplicationShardStats> getSegmentReplicationStats() { assert indexSettings.isSegRepEnabled(); if (primaryMode) { return this.checkpoints.entrySet() .stream() // filter out this shard's allocation id, any shards that are out of sync or unavailable (shard marked in-sync but has not // been assigned to a node). .filter( entry -> entry.getKey().equals(this.shardAllocationId) == false && entry.getValue().inSync && replicationGroup.getUnavailableInSyncShards().contains(entry.getKey()) == false ) .map(entry -> buildShardStats(latestReplicationCheckpoint.getLength(), entry.getKey(), entry.getValue())) .collect(Collectors.toUnmodifiableSet()); } return Collections.emptySet(); } private SegmentReplicationShardStats buildShardStats( final long latestCheckpointLength, final String allocationId, final CheckpointState checkpointState ) { final Map<ReplicationCheckpoint, ReplicationTimer> checkpointTimers = checkpointState.checkpointTimers; return new SegmentReplicationShardStats( allocationId, checkpointTimers.size(), checkpointState.visibleReplicationCheckpoint == null ? latestCheckpointLength : Math.max(latestCheckpointLength - checkpointState.visibleReplicationCheckpoint.getLength(), 0), checkpointTimers.values().stream().mapToLong(ReplicationTimer::time).max().orElse(0), checkpointState.lastCompletedReplicationLag ); } /** * Initializes the global checkpoint tracker in primary mode (see {@link #primaryMode}. Called on primary activation or promotion. */ public synchronized void activatePrimaryMode(final long localCheckpoint) { assert invariant(); assert primaryMode == false; assert checkpoints.get(shardAllocationId) != null && checkpoints.get(shardAllocationId).inSync && checkpoints.get(shardAllocationId).localCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO : "expected " + shardAllocationId + " to have initialized entry in " + checkpoints + " when activating primary"; assert localCheckpoint >= SequenceNumbers.NO_OPS_PERFORMED; primaryMode = true; updateLocalCheckpoint(shardAllocationId, checkpoints.get(shardAllocationId), localCheckpoint); updateGlobalCheckpointOnPrimary(); addPeerRecoveryRetentionLeaseForSolePrimary(); assert invariant(); } /** * Creates a peer recovery retention lease for this shard, if one does not already exist and this shard is the sole * shard copy with local translog in the replication group. If one does not already exist and yet there are other * shard copies in this group then we must have just done a rolling upgrade from a version before {@code LegacyESVersion#V_7_4_0}, * in which case the missing leases should be created asynchronously by the caller using * {@link ReplicationTracker#createMissingPeerRecoveryRetentionLeases(ActionListener)}. */ private void addPeerRecoveryRetentionLeaseForSolePrimary() { assert primaryMode; assert Thread.holdsLock(this); final ShardRouting primaryShard = routingTable.primaryShard(); final String leaseId = getPeerRecoveryRetentionLeaseId(primaryShard); if (retentionLeases.get(leaseId) == null) { if (replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard)) || indexSettings().isRemoteTranslogStoreEnabled()) { assert primaryShard.allocationId().getId().equals(shardAllocationId) : routingTable.assignedShards() + " vs " + shardAllocationId; // Safe to call innerAddRetentionLease() without a subsequent sync since there are no other members of this replication // group. logger.trace("addPeerRecoveryRetentionLeaseForSolePrimary: adding lease [{}]", leaseId); innerAddRetentionLease( leaseId, Math.max(0L, checkpoints.get(shardAllocationId).globalCheckpoint + 1), PEER_RECOVERY_RETENTION_LEASE_SOURCE ); hasAllPeerRecoveryRetentionLeases = true; } else { /* * We got here here via a rolling upgrade from an older version that doesn't create peer recovery retention * leases for every shard copy, but in this case we do not expect any leases to exist. */ assert hasAllPeerRecoveryRetentionLeases == false : routingTable + " vs " + retentionLeases; logger.debug("{} becoming primary of {} with missing lease: {}", primaryShard, routingTable, retentionLeases); } } else if (hasAllPeerRecoveryRetentionLeases == false && routingTable.assignedShards() .stream() .allMatch( shardRouting -> retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting)) || checkpoints.get(shardRouting.allocationId().getId()).tracked == false )) { // Although this index is old enough not to have all the expected peer recovery retention leases, in fact it does, so we // don't need to do any more work. hasAllPeerRecoveryRetentionLeases = true; } } /** * Notifies the tracker of the current allocation IDs in the cluster state. * @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the cluster-manager * @param inSyncAllocationIds the allocation IDs of the currently in-sync shard copies * @param routingTable the shard routing table */ public synchronized void updateFromClusterManager( final long applyingClusterStateVersion, final Set<String> inSyncAllocationIds, final IndexShardRoutingTable routingTable ) { assert invariant(); if (applyingClusterStateVersion > appliedClusterStateVersion) { // check that the cluster-manager does not fabricate new in-sync entries out of thin air once we are in primary mode assert !primaryMode || inSyncAllocationIds.stream().allMatch(inSyncId -> checkpoints.containsKey(inSyncId) && checkpoints.get(inSyncId).inSync) : "update from cluster-manager in primary mode contains in-sync ids " + inSyncAllocationIds + " that have no matching entries in " + checkpoints; // remove entries which don't exist on cluster-manager Set<String> initializingAllocationIds = routingTable.getAllInitializingShards() .stream() .map(ShardRouting::allocationId) .map(AllocationId::getId) .collect(Collectors.toSet()); boolean removedEntries = checkpoints.keySet() .removeIf(aid -> !inSyncAllocationIds.contains(aid) && !initializingAllocationIds.contains(aid)); final ShardRouting primary = routingTable.primaryShard(); final String primaryAllocationId = primary.allocationId().getId(); final String primaryTargetAllocationId = primary.relocating() ? primary.getTargetRelocatingShard().allocationId().getId() : null; if (primaryMode) { // add new initializingIds that are missing locally. These are fresh shard copies - and not in-sync for (String initializingId : initializingAllocationIds) { if (checkpoints.containsKey(initializingId) == false) { final boolean inSync = inSyncAllocationIds.contains(initializingId); assert inSync == false : "update from cluster-manager in primary mode has " + initializingId + " as in-sync but it does not exist locally"; final long localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; final long globalCheckpoint = localCheckpoint; checkpoints.put( initializingId, new CheckpointState( localCheckpoint, globalCheckpoint, inSync, inSync, isReplicated(initializingId, primaryAllocationId, primaryTargetAllocationId) ) ); } } if (removedEntries) { pendingInSync.removeIf(aId -> checkpoints.containsKey(aId) == false); } } else { for (String initializingId : initializingAllocationIds) { final long localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; final long globalCheckpoint = localCheckpoint; checkpoints.put( initializingId, new CheckpointState( localCheckpoint, globalCheckpoint, false, false, isReplicated(initializingId, primaryAllocationId, primaryTargetAllocationId) ) ); } for (String inSyncId : inSyncAllocationIds) { final long localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; final long globalCheckpoint = localCheckpoint; checkpoints.put( inSyncId, new CheckpointState( localCheckpoint, globalCheckpoint, true, true, isReplicated(inSyncId, primaryAllocationId, primaryTargetAllocationId) ) ); } } appliedClusterStateVersion = applyingClusterStateVersion; this.routingTable = routingTable; updateReplicationGroupAndNotify(); if (primaryMode && removedEntries) { updateGlobalCheckpointOnPrimary(); // notify any waiter for local checkpoint advancement to recheck that their shard is still being tracked. notifyAllWaiters(); } } assert invariant(); } /** * Returns whether the requests are replicated considering the remote translog existence, current/primary/primary target allocation ids. * * @param allocationId given allocation id * @param primaryAllocationId primary allocation id * @param primaryTargetAllocationId primary target allocation id * @return the replication mode. */ private boolean isReplicated(String allocationId, String primaryAllocationId, String primaryTargetAllocationId) { // If remote translog is enabled, then returns replication mode checking current allocation id against the // primary and primary target allocation id. // If remote translog is enabled, then returns true if given allocation id matches the primary or it's relocation target allocation // id. if (indexSettings().isRemoteTranslogStoreEnabled()) { return (allocationId.equals(primaryAllocationId) || allocationId.equals(primaryTargetAllocationId)); } // For other case which is local translog, return true as the requests are replicated to all shards in the replication group. return true; } /** * Notifies the tracker of the current allocation IDs in the cluster state. * @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the cluster-manager * @param inSyncAllocationIds the allocation IDs of the currently in-sync shard copies * @param routingTable the shard routing table * @deprecated As of 2.2, because supporting inclusive language, replaced by {@link #updateFromClusterManager(long, Set, IndexShardRoutingTable)} */ @Deprecated public synchronized void updateFromMaster( final long applyingClusterStateVersion, final Set<String> inSyncAllocationIds, final IndexShardRoutingTable routingTable ) { updateFromClusterManager(applyingClusterStateVersion, inSyncAllocationIds, routingTable); } /** * Called when the recovery process for a shard has opened the engine on the target shard. Ensures that the right data structures * have been set up locally to track local checkpoint information for the shard and that the shard is added to the replication group. * * @param allocationId the allocation ID of the shard for which recovery was initiated */ public synchronized void initiateTracking(final String allocationId) { assert invariant(); assert primaryMode; assert handoffInProgress == false; CheckpointState cps = checkpoints.get(allocationId); if (cps == null) { // can happen if replica was removed from cluster but recovery process is unaware of it yet throw new IllegalStateException("no local checkpoint tracking information available"); } cps.tracked = true; updateReplicationGroupAndNotify(); assert invariant(); } /** * Marks the shard with the provided allocation ID as in-sync with the primary shard. This method will block until the local checkpoint * on the specified shard advances above the current global checkpoint. * * @param allocationId the allocation ID of the shard to mark as in-sync * @param localCheckpoint the current local checkpoint on the shard */ public synchronized void markAllocationIdAsInSync(final String allocationId, final long localCheckpoint) throws InterruptedException { assert invariant(); assert primaryMode; assert handoffInProgress == false; CheckpointState cps = checkpoints.get(allocationId); if (cps == null) { // can happen if replica was removed from cluster but recovery process is unaware of it yet throw new IllegalStateException("no local checkpoint tracking information available for " + allocationId); } assert localCheckpoint >= SequenceNumbers.NO_OPS_PERFORMED : "expected known local checkpoint for " + allocationId + " but was " + localCheckpoint; assert pendingInSync.contains(allocationId) == false : "shard copy " + allocationId + " is already marked as pending in-sync"; assert cps.tracked : "shard copy " + allocationId + " cannot be marked as in-sync as it's not tracked"; updateLocalCheckpoint(allocationId, cps, localCheckpoint); // if it was already in-sync (because of a previously failed recovery attempt), global checkpoint must have been // stuck from advancing assert !cps.inSync || cps.localCheckpoint >= getGlobalCheckpoint() || cps.replicated == false : "shard copy " + allocationId + " that's already in-sync should have a local checkpoint " + cps.localCheckpoint + " that's above the global checkpoint " + getGlobalCheckpoint() + " or it's not replicated"; if (cps.replicated && cps.localCheckpoint < getGlobalCheckpoint()) { pendingInSync.add(allocationId); try { while (true) { if (pendingInSync.contains(allocationId)) { waitForLocalCheckpointToAdvance(); } else { break; } } } finally { pendingInSync.remove(allocationId); } } else { cps.inSync = true; updateReplicationGroupAndNotify(); logger.trace("marked [{}] as in-sync", allocationId); updateGlobalCheckpointOnPrimary(); } assert invariant(); } private boolean updateLocalCheckpoint(String allocationId, CheckpointState cps, long localCheckpoint) { // a local checkpoint for a shard copy should be a valid sequence number assert localCheckpoint >= SequenceNumbers.NO_OPS_PERFORMED : "invalid local checkpoint [" + localCheckpoint + "] for shard copy [" + allocationId + "]"; if (localCheckpoint > cps.localCheckpoint) { logger.trace("updated local checkpoint of [{}] from [{}] to [{}]", allocationId, cps.localCheckpoint, localCheckpoint); cps.localCheckpoint = localCheckpoint; return true; } else { logger.trace( "skipped updating local checkpoint of [{}] from [{}] to [{}], current checkpoint is higher", allocationId, cps.localCheckpoint, localCheckpoint ); return false; } } /** * Notifies the service to update the local checkpoint for the shard with the provided allocation ID. If the checkpoint is lower than * the currently known one, this is a no-op. If the allocation ID is not tracked, it is ignored. * * @param allocationId the allocation ID of the shard to update the local checkpoint for * @param localCheckpoint the local checkpoint for the shard */ public synchronized void updateLocalCheckpoint(final String allocationId, final long localCheckpoint) { assert invariant(); assert primaryMode; assert handoffInProgress == false; CheckpointState cps = checkpoints.get(allocationId); if (cps == null) { // can happen if replica was removed from cluster but replication process is unaware of it yet return; } boolean increasedLocalCheckpoint = updateLocalCheckpoint(allocationId, cps, localCheckpoint); boolean pending = pendingInSync.contains(allocationId); if (pending && cps.localCheckpoint >= getGlobalCheckpoint()) { pendingInSync.remove(allocationId); pending = false; cps.inSync = true; updateReplicationGroupAndNotify(); logger.trace("marked [{}] as in-sync", allocationId); notifyAllWaiters(); } if (cps.replicated && increasedLocalCheckpoint && pending == false) { updateGlobalCheckpointOnPrimary(); } assert invariant(); } /** * Computes the global checkpoint based on the given local checkpoints. In case where there are entries preventing the * computation to happen (for example due to blocking), it returns the fallback value. */ private static long computeGlobalCheckpoint( final Set<String> pendingInSync, final Collection<CheckpointState> localCheckpoints, final long fallback ) { long minLocalCheckpoint = Long.MAX_VALUE; if (pendingInSync.isEmpty() == false) { return fallback; } for (final CheckpointState cps : localCheckpoints) { if (cps.inSync && cps.replicated) { if (cps.localCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO) { // unassigned in-sync replica return fallback; } else { minLocalCheckpoint = Math.min(cps.localCheckpoint, minLocalCheckpoint); } } } assert minLocalCheckpoint != Long.MAX_VALUE; return minLocalCheckpoint; } /** * Scans through the currently known local checkpoint and updates the global checkpoint accordingly. */ private synchronized void updateGlobalCheckpointOnPrimary() { assert primaryMode; final long computedGlobalCheckpoint = computeGlobalCheckpoint(pendingInSync, checkpoints.values(), getGlobalCheckpoint()); assert computedGlobalCheckpoint >= globalCheckpoint : "new global checkpoint [" + computedGlobalCheckpoint + "] is lower than previous one [" + globalCheckpoint + "]"; if (globalCheckpoint != computedGlobalCheckpoint) { globalCheckpoint = computedGlobalCheckpoint; logger.trace("updated global checkpoint to [{}]", computedGlobalCheckpoint); onGlobalCheckpointUpdated.accept(computedGlobalCheckpoint); } } /** * Initiates a relocation handoff and returns the corresponding primary context. */ public synchronized PrimaryContext startRelocationHandoff(String targetAllocationId) { assert invariant(); assert primaryMode; assert handoffInProgress == false; assert pendingInSync.isEmpty() : "relocation handoff started while there are still shard copies pending in-sync: " + pendingInSync; if (checkpoints.containsKey(targetAllocationId) == false) { // can happen if the relocation target was removed from cluster but the recovery process isn't aware of that. throw new IllegalStateException("relocation target [" + targetAllocationId + "] is no longer part of the replication group"); } handoffInProgress = true; // copy clusterStateVersion and checkpoints and return // all the entries from checkpoints that are inSync: the reason we don't need to care about initializing non-insync entries // is that they will have to undergo a recovery attempt on the relocation target, and will hence be supplied by the cluster state // update on the relocation target once relocation completes). We could alternatively also copy the map as-is (it’s safe), and it // would be cleaned up on the target by cluster state updates. Map<String, CheckpointState> localCheckpointsCopy = new HashMap<>(); for (Map.Entry<String, CheckpointState> entry : checkpoints.entrySet()) { localCheckpointsCopy.put(entry.getKey(), entry.getValue().copy()); } assert invariant(); return new PrimaryContext(appliedClusterStateVersion, localCheckpointsCopy, routingTable); } /** * Fails a relocation handoff attempt. */ public synchronized void abortRelocationHandoff() { assert invariant(); assert primaryMode; assert handoffInProgress; handoffInProgress = false; assert invariant(); } /** * Marks a relocation handoff attempt as successful. Moves the tracker into replica mode. */ public synchronized void completeRelocationHandoff() { assert invariant(); assert primaryMode; assert handoffInProgress; assert relocated == false; primaryMode = false; handoffInProgress = false; relocated = true; // forget all checkpoint information checkpoints.forEach((key, cps) -> { cps.localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; cps.globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; }); assert invariant(); } /** * Activates the global checkpoint tracker in primary mode (see {@link #primaryMode}. Called on primary relocation target during * primary relocation handoff. * * @param primaryContext the primary context used to initialize the state */ public synchronized void activateWithPrimaryContext(PrimaryContext primaryContext) { assert invariant(); assert primaryMode == false; if (primaryContext.checkpoints.containsKey(shardAllocationId) == false) { // can happen if the old primary was on an old version assert indexSettings.getIndexVersionCreated().before(LegacyESVersion.fromId(7000099)); throw new IllegalStateException("primary context [" + primaryContext + "] does not contain " + shardAllocationId); } final Runnable runAfter = getClusterManagerUpdateOperationFromCurrentState(); primaryMode = true; // capture current state to possibly replay missed cluster state update appliedClusterStateVersion = primaryContext.clusterStateVersion(); checkpoints.clear(); for (Map.Entry<String, CheckpointState> entry : primaryContext.checkpoints.entrySet()) { checkpoints.put(entry.getKey(), entry.getValue().copy()); } routingTable = primaryContext.getRoutingTable(); updateReplicationGroupAndNotify(); updateGlobalCheckpointOnPrimary(); // reapply missed cluster state update // note that if there was no cluster state update between start of the engine of this shard and the call to // initializeWithPrimaryContext, we might still have missed a cluster state update. This is best effort. runAfter.run(); addPeerRecoveryRetentionLeaseForSolePrimary(); assert invariant(); } private synchronized void setHasAllPeerRecoveryRetentionLeases() { hasAllPeerRecoveryRetentionLeases = true; assert invariant(); } public synchronized boolean hasAllPeerRecoveryRetentionLeases() { return hasAllPeerRecoveryRetentionLeases; } /** * Create any required peer-recovery retention leases that do not currently exist because we just did a rolling upgrade from a version * prior to {@code LegacyESVersion#V_7_4_0} that does not create peer-recovery retention leases. */ public synchronized void createMissingPeerRecoveryRetentionLeases(ActionListener<Void> listener) { if (hasAllPeerRecoveryRetentionLeases == false) { final List<ShardRouting> shardRoutings = routingTable.assignedShards(); final GroupedActionListener<ReplicationResponse> groupedActionListener = new GroupedActionListener<>(ActionListener.wrap(vs -> { setHasAllPeerRecoveryRetentionLeases(); listener.onResponse(null); }, listener::onFailure), shardRoutings.size()); for (ShardRouting shardRouting : shardRoutings) { if (retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting))) { groupedActionListener.onResponse(null); } else { final CheckpointState checkpointState = checkpoints.get(shardRouting.allocationId().getId()); if (checkpointState.tracked == false) { groupedActionListener.onResponse(null); } else { logger.trace("createMissingPeerRecoveryRetentionLeases: adding missing lease for {}", shardRouting); try { addPeerRecoveryRetentionLease( shardRouting.currentNodeId(), Math.max(SequenceNumbers.NO_OPS_PERFORMED, checkpointState.globalCheckpoint), groupedActionListener ); } catch (Exception e) { groupedActionListener.onFailure(e); } } } } } else { logger.trace("createMissingPeerRecoveryRetentionLeases: nothing to do"); listener.onResponse(null); } } private Runnable getClusterManagerUpdateOperationFromCurrentState() { assert primaryMode == false; final long lastAppliedClusterStateVersion = appliedClusterStateVersion; final Set<String> inSyncAllocationIds = new HashSet<>(); checkpoints.entrySet().forEach(entry -> { if (entry.getValue().inSync) { inSyncAllocationIds.add(entry.getKey()); } }); final IndexShardRoutingTable lastAppliedRoutingTable = routingTable; return () -> updateFromClusterManager(lastAppliedClusterStateVersion, inSyncAllocationIds, lastAppliedRoutingTable); } /** * Whether the are shards blocking global checkpoint advancement. */ public synchronized boolean pendingInSync() { assert primaryMode; return pendingInSync.isEmpty() == false; } /** * Returns the local checkpoint information tracked for a specific shard. Used by tests. */ public synchronized CheckpointState getTrackedLocalCheckpointForShard(String allocationId) { assert primaryMode; return checkpoints.get(allocationId); } /** * Notify all threads waiting on the monitor on this tracker. These threads should be waiting for the local checkpoint on a specific * allocation ID to catch up to the global checkpoint. */ @SuppressForbidden(reason = "Object#notifyAll waiters for local checkpoint advancement") private synchronized void notifyAllWaiters() { this.notifyAll(); } /** * Wait for the local checkpoint to advance to the global checkpoint. * * @throws InterruptedException if this thread was interrupted before of during waiting */ @SuppressForbidden(reason = "Object#wait for local checkpoint advancement") private synchronized void waitForLocalCheckpointToAdvance() throws InterruptedException { this.wait(); } /** * Represents the sequence number component of the primary context. This is the knowledge on the primary of the in-sync and initializing * shards and their local checkpoints. * * @opensearch.internal */ public static class PrimaryContext implements Writeable { private final long clusterStateVersion; private final Map<String, CheckpointState> checkpoints; private final IndexShardRoutingTable routingTable; public PrimaryContext(long clusterStateVersion, Map<String, CheckpointState> checkpoints, IndexShardRoutingTable routingTable) { this.clusterStateVersion = clusterStateVersion; this.checkpoints = checkpoints; this.routingTable = routingTable; } public PrimaryContext(StreamInput in) throws IOException { clusterStateVersion = in.readVLong(); checkpoints = in.readMap(StreamInput::readString, CheckpointState::new); routingTable = IndexShardRoutingTable.Builder.readFrom(in); } public long clusterStateVersion() { return clusterStateVersion; } public Map<String, CheckpointState> getCheckpointStates() { return checkpoints; } public IndexShardRoutingTable getRoutingTable() { return routingTable; } @Override public void writeTo(StreamOutput out) throws IOException { out.writeVLong(clusterStateVersion); out.writeMap(checkpoints, (streamOutput, s) -> out.writeString(s), (streamOutput, cps) -> cps.writeTo(out)); IndexShardRoutingTable.Builder.writeTo(routingTable, out); } @Override public String toString() { return "PrimaryContext{" + "clusterStateVersion=" + clusterStateVersion + ", checkpoints=" + checkpoints + ", routingTable=" + routingTable + '}'; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; PrimaryContext that = (PrimaryContext) o; if (clusterStateVersion != that.clusterStateVersion) return false; if (routingTable.equals(that.routingTable)) return false; return routingTable.equals(that.routingTable); } @Override public int hashCode() { int result = Long.hashCode(clusterStateVersion); result = 31 * result + checkpoints.hashCode(); result = 31 * result + routingTable.hashCode(); return result; } } }