/* * SPDX-License-Identifier: Apache-2.0 * * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ /* * Licensed to Elasticsearch under one or more contributor * license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright * ownership. Elasticsearch licenses this file to you under * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ /* * Modifications Copyright OpenSearch Contributors. See * GitHub history for details. */ package org.opensearch.index.shard; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRequest; import org.opensearch.action.ActionRequestValidationException; import org.opensearch.action.resync.ResyncReplicationRequest; import org.opensearch.action.resync.ResyncReplicationResponse; import org.opensearch.action.resync.TransportResyncReplicationAction; import org.opensearch.common.Strings; import org.opensearch.common.inject.Inject; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.common.util.io.IOUtils; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.translog.Translog; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskId; import org.opensearch.tasks.TaskManager; import org.opensearch.transport.TransportService; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import static java.util.Objects.requireNonNull; /** * Syncer for Primary replica * * @opensearch.internal */ public class PrimaryReplicaSyncer { private static final Logger logger = LogManager.getLogger(PrimaryReplicaSyncer.class); private final TaskManager taskManager; private final SyncAction syncAction; public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512, ByteSizeUnit.KB); private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE; @Inject public PrimaryReplicaSyncer(TransportService transportService, TransportResyncReplicationAction syncAction) { this(transportService.getTaskManager(), syncAction); } // for tests public PrimaryReplicaSyncer(TaskManager taskManager, SyncAction syncAction) { this.taskManager = taskManager; this.syncAction = syncAction; } void setChunkSize(ByteSizeValue chunkSize) { // only settable for tests if (chunkSize.bytesAsInt() <= 0) { throw new IllegalArgumentException("chunkSize must be > 0"); } this.chunkSize = chunkSize; } public void resync(final IndexShard indexShard, final ActionListener<ResyncTask> listener) { Translog.Snapshot snapshot = null; try { final long startingSeqNo = indexShard.getLastKnownGlobalCheckpoint() + 1; assert startingSeqNo >= 0 : "startingSeqNo must be non-negative; got [" + startingSeqNo + "]"; final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo(); final ShardId shardId = indexShard.shardId(); // Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender. // Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible // Also fail the resync early if the shard is shutting down snapshot = indexShard.newChangesSnapshot("resync", startingSeqNo, Long.MAX_VALUE, false, true); final Translog.Snapshot originalSnapshot = snapshot; final Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() { @Override public synchronized void close() throws IOException { originalSnapshot.close(); } @Override public synchronized int totalOperations() { return originalSnapshot.totalOperations(); } @Override public synchronized Translog.Operation next() throws IOException { IndexShardState state = indexShard.state(); if (state == IndexShardState.CLOSED) { throw new IndexShardClosedException(shardId); } else { assert state == IndexShardState.STARTED : "resync should only happen on a started shard, but state was: " + state; } return originalSnapshot.next(); } }; final ActionListener<ResyncTask> resyncListener = new ActionListener<ResyncTask>() { @Override public void onResponse(final ResyncTask resyncTask) { try { wrappedSnapshot.close(); listener.onResponse(resyncTask); } catch (final Exception e) { onFailure(e); } } @Override public void onFailure(final Exception e) { try { wrappedSnapshot.close(); } catch (final Exception inner) { e.addSuppressed(inner); } finally { listener.onFailure(e); } } }; // We must capture the timestamp after snapshotting a snapshot of operations to make sure // that the auto_id_timestamp of every operation in the snapshot is at most this value. final long maxSeenAutoIdTimestamp = indexShard.getMaxSeenAutoIdTimestamp(); resync( shardId, indexShard.routingEntry().allocationId().getId(), indexShard.getPendingPrimaryTerm(), wrappedSnapshot, startingSeqNo, maxSeqNo, maxSeenAutoIdTimestamp, resyncListener ); } catch (Exception e) { try { IOUtils.close(snapshot); } catch (IOException inner) { e.addSuppressed(inner); } finally { listener.onFailure(e); } } } private void resync( final ShardId shardId, final String primaryAllocationId, final long primaryTerm, final Translog.Snapshot snapshot, long startingSeqNo, long maxSeqNo, long maxSeenAutoIdTimestamp, ActionListener<ResyncTask> listener ) { ResyncRequest request = new ResyncRequest(shardId, primaryAllocationId); ResyncTask resyncTask = (ResyncTask) taskManager.register("transport", "resync", request); // it's not transport :-) ActionListener<Void> wrappedListener = new ActionListener<Void>() { @Override public void onResponse(Void ignore) { resyncTask.setPhase("finished"); taskManager.unregister(resyncTask); listener.onResponse(resyncTask); } @Override public void onFailure(Exception e) { resyncTask.setPhase("finished"); taskManager.unregister(resyncTask); listener.onFailure(e); } }; try { new SnapshotSender( syncAction, resyncTask, shardId, primaryAllocationId, primaryTerm, snapshot, chunkSize.bytesAsInt(), startingSeqNo, maxSeqNo, maxSeenAutoIdTimestamp, wrappedListener ).run(); } catch (Exception e) { wrappedListener.onFailure(e); } } /** * Synchronous action * * @opensearch.internal */ public interface SyncAction { void sync( ResyncReplicationRequest request, Task parentTask, String primaryAllocationId, long primaryTerm, ActionListener<ResyncReplicationResponse> listener ); } /** * Sends a snapshot * * @opensearch.internal */ static class SnapshotSender extends AbstractRunnable implements ActionListener<ResyncReplicationResponse> { private final Logger logger; private final SyncAction syncAction; private final ResyncTask task; // to track progress private final String primaryAllocationId; private final long primaryTerm; private final ShardId shardId; private final Translog.Snapshot snapshot; private final long startingSeqNo; private final long maxSeqNo; private final long maxSeenAutoIdTimestamp; private final int chunkSizeInBytes; private final ActionListener<Void> listener; private final AtomicBoolean firstMessage = new AtomicBoolean(true); private final AtomicInteger totalSentOps = new AtomicInteger(); private final AtomicInteger totalSkippedOps = new AtomicInteger(); private final AtomicBoolean closed = new AtomicBoolean(); SnapshotSender( SyncAction syncAction, ResyncTask task, ShardId shardId, String primaryAllocationId, long primaryTerm, Translog.Snapshot snapshot, int chunkSizeInBytes, long startingSeqNo, long maxSeqNo, long maxSeenAutoIdTimestamp, ActionListener<Void> listener ) { this.logger = PrimaryReplicaSyncer.logger; this.syncAction = syncAction; this.task = task; this.shardId = shardId; this.primaryAllocationId = primaryAllocationId; this.primaryTerm = primaryTerm; this.snapshot = snapshot; this.chunkSizeInBytes = chunkSizeInBytes; this.startingSeqNo = startingSeqNo; this.maxSeqNo = maxSeqNo; this.maxSeenAutoIdTimestamp = maxSeenAutoIdTimestamp; this.listener = listener; task.setTotalOperations(snapshot.totalOperations()); } @Override public void onResponse(ResyncReplicationResponse response) { run(); } @Override public void onFailure(Exception e) { if (closed.compareAndSet(false, true)) { listener.onFailure(e); } } private static final Translog.Operation[] EMPTY_ARRAY = new Translog.Operation[0]; @Override protected void doRun() throws Exception { long size = 0; final List<Translog.Operation> operations = new ArrayList<>(); task.setPhase("collecting_ops"); task.setResyncedOperations(totalSentOps.get()); task.setSkippedOperations(totalSkippedOps.get()); Translog.Operation operation; while ((operation = snapshot.next()) != null) { final long seqNo = operation.seqNo(); if (seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || seqNo < startingSeqNo) { totalSkippedOps.incrementAndGet(); continue; } assert operation.seqNo() >= 0 : "sending operation with unassigned sequence number [" + operation + "]"; operations.add(operation); size += operation.estimateSize(); totalSentOps.incrementAndGet(); // check if this request is past bytes threshold, and if so, send it off if (size >= chunkSizeInBytes) { break; } } final long trimmedAboveSeqNo = firstMessage.get() ? maxSeqNo : SequenceNumbers.UNASSIGNED_SEQ_NO; // have to send sync request even in case of there are no operations to sync - have to sync trimmedAboveSeqNo at least if (!operations.isEmpty() || trimmedAboveSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) { task.setPhase("sending_ops"); ResyncReplicationRequest request = new ResyncReplicationRequest( shardId, trimmedAboveSeqNo, maxSeenAutoIdTimestamp, operations.toArray(EMPTY_ARRAY) ); logger.trace( "{} sending batch of [{}][{}] (total sent: [{}], skipped: [{}])", shardId, operations.size(), new ByteSizeValue(size), totalSentOps.get(), totalSkippedOps.get() ); firstMessage.set(false); syncAction.sync(request, task, primaryAllocationId, primaryTerm, this); } else if (closed.compareAndSet(false, true)) { logger.trace("{} resync completed (total sent: [{}], skipped: [{}])", shardId, totalSentOps.get(), totalSkippedOps.get()); listener.onResponse(null); } } } /** * Request to resync primary and replica * * @opensearch.internal */ public static class ResyncRequest extends ActionRequest { private final ShardId shardId; private final String allocationId; public ResyncRequest(ShardId shardId, String allocationId) { this.shardId = shardId; this.allocationId = allocationId; } @Override public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) { return new ResyncTask(id, type, action, getDescription(), parentTaskId, headers); } @Override public String getDescription() { return toString(); } @Override public String toString() { return "ResyncRequest{ " + shardId + ", " + allocationId + " }"; } @Override public ActionRequestValidationException validate() { return null; } } /** * Task to resync primary and replica * * @opensearch.internal */ public static class ResyncTask extends Task { private volatile String phase = "starting"; private volatile int totalOperations; private volatile int resyncedOperations; private volatile int skippedOperations; public ResyncTask(long id, String type, String action, String description, TaskId parentTaskId, Map<String, String> headers) { super(id, type, action, description, parentTaskId, headers); } /** * Set the current phase of the task. */ public void setPhase(String phase) { this.phase = phase; } /** * Get the current phase of the task. */ public String getPhase() { return phase; } /** * total number of translog operations that were captured by translog snapshot */ public int getTotalOperations() { return totalOperations; } public void setTotalOperations(int totalOperations) { this.totalOperations = totalOperations; } /** * number of operations that have been successfully replicated */ public int getResyncedOperations() { return resyncedOperations; } public void setResyncedOperations(int resyncedOperations) { this.resyncedOperations = resyncedOperations; } /** * number of translog operations that have been skipped */ public int getSkippedOperations() { return skippedOperations; } public void setSkippedOperations(int skippedOperations) { this.skippedOperations = skippedOperations; } @Override public ResyncTask.Status getStatus() { return new ResyncTask.Status(phase, totalOperations, resyncedOperations, skippedOperations); } /** * Status for primary replica syncer * * @opensearch.internal */ public static class Status implements Task.Status { public static final String NAME = "resync"; private final String phase; private final int totalOperations; private final int resyncedOperations; private final int skippedOperations; public Status(StreamInput in) throws IOException { phase = in.readString(); totalOperations = in.readVInt(); resyncedOperations = in.readVInt(); skippedOperations = in.readVInt(); } public Status(String phase, int totalOperations, int resyncedOperations, int skippedOperations) { this.phase = requireNonNull(phase, "Phase cannot be null"); this.totalOperations = totalOperations; this.resyncedOperations = resyncedOperations; this.skippedOperations = skippedOperations; } @Override public String getWriteableName() { return NAME; } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); builder.field("phase", phase); builder.field("totalOperations", totalOperations); builder.field("resyncedOperations", resyncedOperations); builder.field("skippedOperations", skippedOperations); builder.endObject(); return builder; } @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(phase); out.writeVLong(totalOperations); out.writeVLong(resyncedOperations); out.writeVLong(skippedOperations); } @Override public String toString() { return Strings.toString(XContentType.JSON, this); } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Status status = (Status) o; if (totalOperations != status.totalOperations) return false; if (resyncedOperations != status.resyncedOperations) return false; if (skippedOperations != status.skippedOperations) return false; return phase.equals(status.phase); } @Override public int hashCode() { int result = phase.hashCode(); result = 31 * result + totalOperations; result = 31 * result + resyncedOperations; result = 31 * result + skippedOperations; return result; } } } }