/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/
package org.opensearch.indices.recovery;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.core.Assertions;
import org.opensearch.action.ActionListener;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.util.concurrent.AsyncIOProcessor;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.index.seqno.LocalCheckpointTracker;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.function.Consumer;
import static org.opensearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
/**
* File chunks are sent/requested sequentially by at most one thread at any time. However, the sender/requestor won't wait for the response
* before processing the next file chunk request to reduce the recovery time especially on secure/compressed or high latency communication.
*
* The sender/requestor can send up to {@code maxConcurrentFileChunks} file chunk requests without waiting for responses. Since the recovery
* target can receive file chunks out of order, it has to buffer those file chunks in memory and only flush to disk when there's no gap.
* To ensure the recover target never buffers more than {@code maxConcurrentFileChunks} file chunks, we allow the sender/requestor to send
* only up to {@code maxConcurrentFileChunks} file chunk requests from the last flushed (and acknowledged) file chunk. We leverage the local
* checkpoint tracker for this purpose. We generate a new sequence number and assign it to each file chunk request before sending; then mark
* that sequence number as processed when we receive a response for the corresponding file chunk request. With the local checkpoint tracker,
* we know the last acknowledged-flushed file-chunk is a file chunk whose {@code requestSeqId} equals to the local checkpoint because the
* recover target can flush all file chunks up to the local checkpoint.
*
* When the number of un-replied file chunk requests reaches the limit (i.e. the gap between the max_seq_no and the local checkpoint is
* greater than {@code maxConcurrentFileChunks}), the sending/requesting thread will abort its execution. That process will be resumed by
* one of the networking threads which receive/handle the responses of the current pending file chunk requests. This process will continue
* until all chunk requests are sent/responded.
*
* @opensearch.internal
*/
public abstract class MultiChunkTransfer implements Closeable {
private Status status = Status.PROCESSING;
private final Logger logger;
private final ActionListener listener;
private final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED);
private final AsyncIOProcessor> processor;
private final int maxConcurrentChunks;
private Source currentSource = null;
private final Iterator remainingSources;
private Tuple readAheadRequest = null;
protected MultiChunkTransfer(
Logger logger,
ThreadContext threadContext,
ActionListener listener,
int maxConcurrentChunks,
List sources
) {
this.logger = logger;
this.maxConcurrentChunks = maxConcurrentChunks;
this.listener = listener;
this.processor = new AsyncIOProcessor>(logger, maxConcurrentChunks, threadContext) {
@Override
protected void write(List, Consumer>> items) throws IOException {
handleItems(items);
}
};
this.remainingSources = sources.iterator();
}
public final void start() {
addItem(UNASSIGNED_SEQ_NO, null, null); // put a dummy item to start the processor
}
private void addItem(long requestSeqId, Source resource, Exception failure) {
processor.put(new FileChunkResponseItem<>(requestSeqId, resource, failure), e -> { assert e == null : e; });
}
private void handleItems(List, Consumer>> items) {
if (status != Status.PROCESSING) {
assert status == Status.FAILED : "must not receive any response after the transfer was completed";
// These exceptions will be ignored as we record only the first failure, log them for debugging purpose.
items.stream()
.filter(item -> item.v1().failure != null)
.forEach(
item -> logger.debug(
new ParameterizedMessage("failed to transfer a chunk request {}", item.v1().source),
item.v1().failure
)
);
return;
}
try {
for (Tuple, Consumer> item : items) {
final FileChunkResponseItem resp = item.v1();
if (resp.requestSeqId == UNASSIGNED_SEQ_NO) {
continue; // not an actual item
}
requestSeqIdTracker.markSeqNoAsProcessed(resp.requestSeqId);
if (resp.failure != null) {
handleError(resp.source, resp.failure);
throw resp.failure;
}
}
while (requestSeqIdTracker.getMaxSeqNo() - requestSeqIdTracker.getProcessedCheckpoint() < maxConcurrentChunks) {
final Tuple request = readAheadRequest != null ? readAheadRequest : getNextRequest();
readAheadRequest = null;
if (request == null) {
assert currentSource == null && remainingSources.hasNext() == false;
if (requestSeqIdTracker.getMaxSeqNo() == requestSeqIdTracker.getProcessedCheckpoint()) {
onCompleted(null);
}
return;
}
final long requestSeqId = requestSeqIdTracker.generateSeqNo();
executeChunkRequest(
request.v2(),
ActionListener.wrap(r -> addItem(requestSeqId, request.v1(), null), e -> addItem(requestSeqId, request.v1(), e))
);
}
// While we are waiting for the responses, we can prepare the next request in advance
// so we can send it immediately when the responses arrive to reduce the transfer time.
if (readAheadRequest == null) {
readAheadRequest = getNextRequest();
}
} catch (Exception e) {
onCompleted(e);
}
}
private void onCompleted(Exception failure) {
if (Assertions.ENABLED && status != Status.PROCESSING) {
throw new AssertionError("invalid status: expected [" + Status.PROCESSING + "] actual [" + status + "]", failure);
}
status = failure == null ? Status.SUCCESS : Status.FAILED;
try {
IOUtils.close(failure, this);
} catch (Exception e) {
listener.onFailure(e);
return;
}
listener.onResponse(null);
}
private Tuple getNextRequest() throws Exception {
try {
if (currentSource == null) {
if (remainingSources.hasNext()) {
currentSource = remainingSources.next();
onNewResource(currentSource);
} else {
return null;
}
}
final Source md = currentSource;
final Request request = nextChunkRequest(md);
if (request.lastChunk()) {
currentSource = null;
}
return Tuple.tuple(md, request);
} catch (Exception e) {
handleError(currentSource, e);
throw e;
}
}
/**
* This method is called when starting sending/requesting a new source. Subclasses should override
* this method to reset the file offset or close the previous file and open a new file if needed.
*/
protected void onNewResource(Source resource) throws IOException {
}
protected abstract Request nextChunkRequest(Source resource) throws IOException;
protected abstract void executeChunkRequest(Request request, ActionListener listener);
protected abstract void handleError(Source resource, Exception e) throws Exception;
/**
* A file chunk item as the response
*
* @opensearch.internal
*/
private static class FileChunkResponseItem {
final long requestSeqId;
final Source source;
final Exception failure;
FileChunkResponseItem(long requestSeqId, Source source, Exception failure) {
this.requestSeqId = requestSeqId;
this.source = source;
this.failure = failure;
}
}
/**
* A chunk request
*
* @opensearch.internal
*/
public interface ChunkRequest {
/**
* @return {@code true} if this chunk request is the last chunk of the current file
*/
boolean lastChunk();
}
private enum Status {
PROCESSING,
SUCCESS,
FAILED
}
}