/* * SPDX-License-Identifier: Apache-2.0 * * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ /* * Licensed to Elasticsearch under one or more contributor * license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright * ownership. Elasticsearch licenses this file to you under * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ /* * Modifications Copyright OpenSearch Contributors. See * GitHub history for details. */ package org.opensearch.action.bulk; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.util.SparseFixedBitSet; import org.opensearch.ExceptionsHelper; import org.opensearch.core.Assertions; import org.opensearch.OpenSearchParseException; import org.opensearch.ResourceAlreadyExistsException; import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRunnable; import org.opensearch.action.DocWriteRequest; import org.opensearch.action.DocWriteResponse; import org.opensearch.action.RoutingMissingException; import org.opensearch.action.admin.indices.create.AutoCreateAction; import org.opensearch.action.admin.indices.create.CreateIndexRequest; import org.opensearch.action.admin.indices.create.CreateIndexResponse; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.ingest.IngestActionForwarder; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.AutoCreateIndex; import org.opensearch.action.support.HandledTransportAction; import org.opensearch.action.update.TransportUpdateAction; import org.opensearch.action.update.UpdateRequest; import org.opensearch.action.update.UpdateResponse; import org.opensearch.client.node.NodeClient; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateObserver; import org.opensearch.cluster.block.ClusterBlockException; import org.opensearch.cluster.block.ClusterBlockLevel; import org.opensearch.cluster.metadata.DataStream; import org.opensearch.cluster.metadata.IndexAbstraction; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.MappingMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.AtomicArray; import org.opensearch.common.lease.Releasable; import org.opensearch.core.index.Index; import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.IndexingPressureService; import org.opensearch.index.VersionType; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.core.index.shard.ShardId; import org.opensearch.indices.IndexClosedException; import org.opensearch.indices.SystemIndices; import org.opensearch.ingest.IngestService; import org.opensearch.node.NodeClosedException; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; import org.opensearch.threadpool.ThreadPool.Names; import org.opensearch.transport.TransportService; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.SortedMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerArray; import java.util.function.LongSupplier; import java.util.stream.Collectors; import static java.util.Collections.emptyMap; import static org.opensearch.cluster.metadata.IndexNameExpressionResolver.EXCLUDED_DATA_STREAMS_KEY; import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; /** * Groups bulk request items by shard, optionally creating non-existent indices and * delegates to {@link TransportShardBulkAction} for shard-level bulk execution * * @opensearch.internal */ public class TransportBulkAction extends HandledTransportAction { private static final Logger logger = LogManager.getLogger(TransportBulkAction.class); private final ThreadPool threadPool; private final AutoCreateIndex autoCreateIndex; private final ClusterService clusterService; private final IngestService ingestService; private final TransportShardBulkAction shardBulkAction; private final LongSupplier relativeTimeProvider; private final IngestActionForwarder ingestForwarder; private final NodeClient client; private final IndexNameExpressionResolver indexNameExpressionResolver; private static final String DROPPED_ITEM_WITH_AUTO_GENERATED_ID = "auto-generated"; private final IndexingPressureService indexingPressureService; private final SystemIndices systemIndices; @Inject public TransportBulkAction( ThreadPool threadPool, TransportService transportService, ClusterService clusterService, IngestService ingestService, TransportShardBulkAction shardBulkAction, NodeClient client, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, AutoCreateIndex autoCreateIndex, IndexingPressureService indexingPressureService, SystemIndices systemIndices ) { this( threadPool, transportService, clusterService, ingestService, shardBulkAction, client, actionFilters, indexNameExpressionResolver, autoCreateIndex, indexingPressureService, systemIndices, System::nanoTime ); } public TransportBulkAction( ThreadPool threadPool, TransportService transportService, ClusterService clusterService, IngestService ingestService, TransportShardBulkAction shardBulkAction, NodeClient client, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, AutoCreateIndex autoCreateIndex, IndexingPressureService indexingPressureService, SystemIndices systemIndices, LongSupplier relativeTimeProvider ) { super(BulkAction.NAME, transportService, actionFilters, BulkRequest::new, ThreadPool.Names.SAME); Objects.requireNonNull(relativeTimeProvider); this.threadPool = threadPool; this.clusterService = clusterService; this.ingestService = ingestService; this.shardBulkAction = shardBulkAction; this.autoCreateIndex = autoCreateIndex; this.relativeTimeProvider = relativeTimeProvider; this.ingestForwarder = new IngestActionForwarder(transportService); this.client = client; this.indexNameExpressionResolver = indexNameExpressionResolver; this.indexingPressureService = indexingPressureService; this.systemIndices = systemIndices; clusterService.addStateApplier(this.ingestForwarder); } /** * Retrieves the {@link IndexRequest} from the provided {@link DocWriteRequest} for index or upsert actions. Upserts are * modeled as {@link IndexRequest} inside the {@link UpdateRequest}. Ignores {@link org.opensearch.action.delete.DeleteRequest}'s * * @param docWriteRequest The request to find the {@link IndexRequest} * @return the found {@link IndexRequest} or {@code null} if one can not be found. */ public static IndexRequest getIndexWriteRequest(DocWriteRequest docWriteRequest) { IndexRequest indexRequest = null; if (docWriteRequest instanceof IndexRequest) { indexRequest = (IndexRequest) docWriteRequest; } else if (docWriteRequest instanceof UpdateRequest) { UpdateRequest updateRequest = (UpdateRequest) docWriteRequest; indexRequest = updateRequest.docAsUpsert() ? updateRequest.doc() : updateRequest.upsertRequest(); } return indexRequest; } @Override protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener listener) { final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices); final Releasable releasable = indexingPressureService.markCoordinatingOperationStarted(bulkRequest::ramBytesUsed, isOnlySystem); final ActionListener releasingListener = ActionListener.runBefore(listener, releasable::close); final String executorName = isOnlySystem ? Names.SYSTEM_WRITE : Names.WRITE; try { doInternalExecute(task, bulkRequest, executorName, releasingListener); } catch (Exception e) { releasingListener.onFailure(e); } } protected void doInternalExecute(Task task, BulkRequest bulkRequest, String executorName, ActionListener listener) { final long startTime = relativeTime(); final AtomicArray responses = new AtomicArray<>(bulkRequest.requests.size()); boolean hasIndexRequestsWithPipelines = false; final Metadata metadata = clusterService.state().getMetadata(); final Version minNodeVersion = clusterService.state().getNodes().getMinNodeVersion(); for (DocWriteRequest actionRequest : bulkRequest.requests) { IndexRequest indexRequest = getIndexWriteRequest(actionRequest); if (indexRequest != null) { // Each index request needs to be evaluated, because this method also modifies the IndexRequest boolean indexRequestHasPipeline = IngestService.resolvePipelines(actionRequest, indexRequest, metadata); hasIndexRequestsWithPipelines |= indexRequestHasPipeline; } if (actionRequest instanceof IndexRequest) { IndexRequest ir = (IndexRequest) actionRequest; ir.checkAutoIdWithOpTypeCreateSupportedByVersion(minNodeVersion); if (ir.getAutoGeneratedTimestamp() != IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP) { throw new IllegalArgumentException("autoGeneratedTimestamp should not be set externally"); } } } if (hasIndexRequestsWithPipelines) { // this method (doExecute) will be called again, but with the bulk requests updated from the ingest node processing but // also with IngestService.NOOP_PIPELINE_NAME on each request. This ensures that this on the second time through this method, // this path is never taken. try { if (Assertions.ENABLED) { final boolean arePipelinesResolved = bulkRequest.requests() .stream() .map(TransportBulkAction::getIndexWriteRequest) .filter(Objects::nonNull) .allMatch(IndexRequest::isPipelineResolved); assert arePipelinesResolved : bulkRequest; } if (clusterService.localNode().isIngestNode()) { processBulkIndexIngestRequest(task, bulkRequest, executorName, listener); } else { ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener); } } catch (Exception e) { listener.onFailure(e); } return; } final boolean includesSystem = includesSystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices); if (includesSystem || needToCheck()) { // Attempt to create all the indices that we're going to need during the bulk before we start. // Step 1: collect all the indices in the request final Map indices = bulkRequest.requests.stream() // delete requests should not attempt to create the index (if the index does not // exists), unless an external versioning is used .filter( request -> request.opType() != DocWriteRequest.OpType.DELETE || request.versionType() == VersionType.EXTERNAL || request.versionType() == VersionType.EXTERNAL_GTE ) .collect(Collectors.toMap(DocWriteRequest::index, DocWriteRequest::isRequireAlias, (v1, v2) -> v1 || v2)); /* Step 2: filter that to indices that don't exist and we can create. At the same time build a map of indices we can't create * that we'll use when we try to run the requests. */ final Map indicesThatCannotBeCreated = new HashMap<>(); Set autoCreateIndices = new HashSet<>(); ClusterState state = clusterService.state(); for (Map.Entry indexAndFlag : indices.entrySet()) { boolean shouldAutoCreate; final String index = indexAndFlag.getKey(); try { shouldAutoCreate = shouldAutoCreate(index, state); } catch (IndexNotFoundException e) { shouldAutoCreate = false; indicesThatCannotBeCreated.put(index, e); } // We should only auto create if we are not requiring it to be an alias if (shouldAutoCreate && (indexAndFlag.getValue() == false)) { autoCreateIndices.add(index); } } // Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back. if (autoCreateIndices.isEmpty()) { executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated); } else { final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size()); for (String index : autoCreateIndices) { createIndex(index, bulkRequest.timeout(), minNodeVersion, new ActionListener() { @Override public void onResponse(CreateIndexResponse result) { if (counter.decrementAndGet() == 0) { threadPool.executor(executorName).execute(new ActionRunnable(listener) { @Override protected void doRun() { executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated); } }); } } @Override public void onFailure(Exception e) { if (!(ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException)) { // fail all requests involving this index, if create didn't work for (int i = 0; i < bulkRequest.requests.size(); i++) { DocWriteRequest request = bulkRequest.requests.get(i); if (request != null && setResponseFailureIfIndexMatches(responses, i, request, index, e)) { bulkRequest.requests.set(i, null); } } } if (counter.decrementAndGet() == 0) { final ActionListener wrappedListener = ActionListener.wrap(listener::onResponse, inner -> { inner.addSuppressed(e); listener.onFailure(inner); }); threadPool.executor(executorName).execute(new ActionRunnable(wrappedListener) { @Override protected void doRun() { executeBulk(task, bulkRequest, startTime, wrappedListener, responses, indicesThatCannotBeCreated); } @Override public void onRejection(Exception rejectedException) { rejectedException.addSuppressed(e); super.onRejection(rejectedException); } }); } } }); } } } else { executeBulk(task, bulkRequest, startTime, listener, responses, emptyMap()); } } static void prohibitAppendWritesInBackingIndices(DocWriteRequest writeRequest, Metadata metadata) { IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(writeRequest.index()); if (indexAbstraction == null) { return; } if (indexAbstraction.getType() != IndexAbstraction.Type.CONCRETE_INDEX) { return; } if (indexAbstraction.getParentDataStream() == null) { return; } DataStream dataStream = indexAbstraction.getParentDataStream().getDataStream(); // At this point with write op is targeting a backing index of a data stream directly, // so checking if write op is append-only and if so fail. // (Updates and deletes are allowed to target a backing index) DocWriteRequest.OpType opType = writeRequest.opType(); // CREATE op_type is considered append-only and // INDEX op_type is considered append-only when no if_primary_term and if_seq_no is specified. // (the latter maybe an update, but at this stage we can't determine that. In order to determine // that an engine level change is needed and for now this check is sufficient.) if (opType == DocWriteRequest.OpType.CREATE) { throw new IllegalArgumentException( "index request with op_type=create targeting backing indices is disallowed, " + "target corresponding data stream [" + dataStream.getName() + "] instead" ); } if (opType == DocWriteRequest.OpType.INDEX && writeRequest.ifPrimaryTerm() == UNASSIGNED_PRIMARY_TERM && writeRequest.ifSeqNo() == UNASSIGNED_SEQ_NO) { throw new IllegalArgumentException( "index request with op_type=index and no if_primary_term and if_seq_no set " + "targeting backing indices is disallowed, target corresponding data stream [" + dataStream.getName() + "] instead" ); } } static void prohibitCustomRoutingOnDataStream(DocWriteRequest writeRequest, Metadata metadata) { IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(writeRequest.index()); if (indexAbstraction == null) { return; } if (indexAbstraction.getType() != IndexAbstraction.Type.DATA_STREAM) { return; } if (writeRequest.routing() != null) { IndexAbstraction.DataStream dataStream = (IndexAbstraction.DataStream) indexAbstraction; throw new IllegalArgumentException( "index request targeting data stream [" + dataStream.getName() + "] specifies a custom " + "routing. target the backing indices directly or remove the custom routing." ); } } boolean isOnlySystem(BulkRequest request, SortedMap indicesLookup, SystemIndices systemIndices) { return request.getIndices().stream().allMatch(indexName -> isSystemIndex(indicesLookup, systemIndices, indexName)); } boolean includesSystem(BulkRequest request, SortedMap indicesLookup, SystemIndices systemIndices) { return request.getIndices().stream().anyMatch(indexName -> isSystemIndex(indicesLookup, systemIndices, indexName)); } private boolean isSystemIndex(SortedMap indicesLookup, SystemIndices systemIndices, String indexName) { final IndexAbstraction abstraction = indicesLookup.get(indexName); if (abstraction != null) { return abstraction.isSystem(); } else { return systemIndices.isSystemIndex(indexName); } } boolean needToCheck() { return autoCreateIndex.needToCheck(); } boolean shouldAutoCreate(String index, ClusterState state) { return autoCreateIndex.shouldAutoCreate(index, state); } void createIndex(String index, TimeValue timeout, Version minNodeVersion, ActionListener listener) { CreateIndexRequest createIndexRequest = new CreateIndexRequest(); createIndexRequest.index(index); createIndexRequest.cause("auto(bulk api)"); createIndexRequest.clusterManagerNodeTimeout(timeout); client.execute(AutoCreateAction.INSTANCE, createIndexRequest, listener); } private boolean setResponseFailureIfIndexMatches( AtomicArray responses, int idx, DocWriteRequest request, String index, Exception e ) { if (index.equals(request.index())) { responses.set(idx, new BulkItemResponse(idx, request.opType(), new BulkItemResponse.Failure(request.index(), request.id(), e))); return true; } return false; } private long buildTookInMillis(long startTimeNanos) { return TimeUnit.NANOSECONDS.toMillis(relativeTime() - startTimeNanos); } /** * retries on retryable cluster blocks, resolves item requests, * constructs shard bulk requests and delegates execution to shard bulk action * * @opensearch.internal */ private final class BulkOperation extends ActionRunnable { private final Task task; private BulkRequest bulkRequest; // set to null once all requests are sent out private final AtomicArray responses; private final long startTimeNanos; private final ClusterStateObserver observer; private final Map indicesThatCannotBeCreated; BulkOperation( Task task, BulkRequest bulkRequest, ActionListener listener, AtomicArray responses, long startTimeNanos, Map indicesThatCannotBeCreated ) { super(listener); this.task = task; this.bulkRequest = bulkRequest; this.responses = responses; this.startTimeNanos = startTimeNanos; this.indicesThatCannotBeCreated = indicesThatCannotBeCreated; this.observer = new ClusterStateObserver(clusterService, bulkRequest.timeout(), logger, threadPool.getThreadContext()); } @Override protected void doRun() { assert bulkRequest != null; final ClusterState clusterState = observer.setAndGetObservedState(); if (handleBlockExceptions(clusterState)) { return; } final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver); Metadata metadata = clusterState.metadata(); for (int i = 0; i < bulkRequest.requests.size(); i++) { DocWriteRequest docWriteRequest = bulkRequest.requests.get(i); // the request can only be null because we set it to null in the previous step, so it gets ignored if (docWriteRequest == null) { continue; } if (addFailureIfRequiresAliasAndAliasIsMissing(docWriteRequest, i, metadata)) { continue; } if (addFailureIfIndexIsUnavailable(docWriteRequest, i, concreteIndices, metadata)) { continue; } Index concreteIndex = concreteIndices.resolveIfAbsent(docWriteRequest); try { // The ConcreteIndices#resolveIfAbsent(...) method validates via IndexNameExpressionResolver whether // an operation is allowed in index into a data stream, but this isn't done when resolve call is cached, so // the validation needs to be performed here too. IndexAbstraction indexAbstraction = clusterState.getMetadata().getIndicesLookup().get(concreteIndex.getName()); if (indexAbstraction.getParentDataStream() != null && // avoid valid cases when directly indexing into a backing index // (for example when directly indexing into .ds-logs-foobar-000001) concreteIndex.getName().equals(docWriteRequest.index()) == false && docWriteRequest.opType() != DocWriteRequest.OpType.CREATE) { throw new IllegalArgumentException("only write ops with an op_type of create are allowed in data streams"); } switch (docWriteRequest.opType()) { case CREATE: case INDEX: prohibitAppendWritesInBackingIndices(docWriteRequest, metadata); prohibitCustomRoutingOnDataStream(docWriteRequest, metadata); IndexRequest indexRequest = (IndexRequest) docWriteRequest; final IndexMetadata indexMetadata = metadata.index(concreteIndex); MappingMetadata mappingMd = indexMetadata.mapping(); Version indexCreated = indexMetadata.getCreationVersion(); indexRequest.resolveRouting(metadata); indexRequest.process(indexCreated, mappingMd, concreteIndex.getName()); break; case UPDATE: TransportUpdateAction.resolveAndValidateRouting( metadata, concreteIndex.getName(), (UpdateRequest) docWriteRequest ); break; case DELETE: docWriteRequest.routing(metadata.resolveWriteIndexRouting(docWriteRequest.routing(), docWriteRequest.index())); // check if routing is required, if so, throw error if routing wasn't specified if (docWriteRequest.routing() == null && metadata.routingRequired(concreteIndex.getName())) { throw new RoutingMissingException(concreteIndex.getName(), docWriteRequest.id()); } break; default: throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]"); } } catch (OpenSearchParseException | IllegalArgumentException | RoutingMissingException e) { BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), docWriteRequest.id(), e); BulkItemResponse bulkItemResponse = new BulkItemResponse(i, docWriteRequest.opType(), failure); responses.set(i, bulkItemResponse); // make sure the request gets never processed again bulkRequest.requests.set(i, null); } } // first, go over all the requests and create a ShardId -> Operations mapping Map> requestsByShard = new HashMap<>(); for (int i = 0; i < bulkRequest.requests.size(); i++) { DocWriteRequest request = bulkRequest.requests.get(i); if (request == null) { continue; } String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName(); ShardId shardId = clusterService.operationRouting() .indexShards(clusterState, concreteIndex, request.id(), request.routing()) .shardId(); List shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>()); shardRequests.add(new BulkItemRequest(i, request)); } if (requestsByShard.isEmpty()) { listener.onResponse( new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos)) ); return; } final AtomicInteger counter = new AtomicInteger(requestsByShard.size()); String nodeId = clusterService.localNode().getId(); for (Map.Entry> entry : requestsByShard.entrySet()) { final ShardId shardId = entry.getKey(); final List requests = entry.getValue(); BulkShardRequest bulkShardRequest = new BulkShardRequest( shardId, bulkRequest.getRefreshPolicy(), requests.toArray(new BulkItemRequest[0]) ); bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards()); bulkShardRequest.timeout(bulkRequest.timeout()); bulkShardRequest.routedBasedOnClusterVersion(clusterState.version()); if (task != null) { bulkShardRequest.setParentTask(nodeId, task.getId()); } // Add the shard level accounting for coordinating and supply the listener final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices); final Releasable releasable = indexingPressureService.markCoordinatingOperationStarted( shardId, bulkShardRequest::ramBytesUsed, isOnlySystem ); shardBulkAction.execute(bulkShardRequest, ActionListener.runBefore(new ActionListener() { @Override public void onResponse(BulkShardResponse bulkShardResponse) { for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) { // we may have no response if item failed if (bulkItemResponse.getResponse() != null) { bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo()); } responses.set(bulkItemResponse.getItemId(), bulkItemResponse); } if (counter.decrementAndGet() == 0) { finishHim(); } } @Override public void onFailure(Exception e) { // create failures for all relevant requests for (BulkItemRequest request : requests) { final String indexName = concreteIndices.getConcreteIndex(request.index()).getName(); DocWriteRequest docWriteRequest = request.request(); responses.set( request.id(), new BulkItemResponse( request.id(), docWriteRequest.opType(), new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e) ) ); } if (counter.decrementAndGet() == 0) { finishHim(); } } private void finishHim() { listener.onResponse( new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos)) ); } }, releasable::close)); } bulkRequest = null; // allow memory for bulk request items to be reclaimed before all items have been completed } private boolean handleBlockExceptions(ClusterState state) { ClusterBlockException blockException = state.blocks().globalBlockedException(ClusterBlockLevel.WRITE); if (blockException != null) { if (blockException.retryable()) { logger.trace("cluster is blocked, scheduling a retry", blockException); retry(blockException); } else { onFailure(blockException); } return true; } return false; } void retry(Exception failure) { assert failure != null; if (observer.isTimedOut()) { // we running as a last attempt after a timeout has happened. don't retry onFailure(failure); return; } observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override public void onNewClusterState(ClusterState state) { run(); } @Override public void onClusterServiceClose() { onFailure(new NodeClosedException(clusterService.localNode())); } @Override public void onTimeout(TimeValue timeout) { // Try one more time... run(); } }); } private boolean addFailureIfRequiresAliasAndAliasIsMissing(DocWriteRequest request, int idx, final Metadata metadata) { if (request.isRequireAlias() && (metadata.hasAlias(request.index()) == false)) { Exception exception = new IndexNotFoundException( "[" + DocWriteRequest.REQUIRE_ALIAS + "] request flag is [true] and [" + request.index() + "] is not an alias", request.index() ); addFailure(request, idx, exception); return true; } return false; } private boolean addFailureIfIndexIsUnavailable( DocWriteRequest request, int idx, final ConcreteIndices concreteIndices, final Metadata metadata ) { IndexNotFoundException cannotCreate = indicesThatCannotBeCreated.get(request.index()); if (cannotCreate != null) { addFailure(request, idx, cannotCreate); return true; } Index concreteIndex = concreteIndices.getConcreteIndex(request.index()); if (concreteIndex == null) { try { concreteIndex = concreteIndices.resolveIfAbsent(request); } catch (IndexClosedException | IndexNotFoundException | IllegalArgumentException ex) { addFailure(request, idx, ex); return true; } } IndexMetadata indexMetadata = metadata.getIndexSafe(concreteIndex); if (indexMetadata.getState() == IndexMetadata.State.CLOSE) { addFailure(request, idx, new IndexClosedException(concreteIndex)); return true; } return false; } private void addFailure(DocWriteRequest request, int idx, Exception unavailableException) { BulkItemResponse.Failure failure = new BulkItemResponse.Failure(request.index(), request.id(), unavailableException); BulkItemResponse bulkItemResponse = new BulkItemResponse(idx, request.opType(), failure); responses.set(idx, bulkItemResponse); // make sure the request gets never processed again bulkRequest.requests.set(idx, null); } } void executeBulk( Task task, final BulkRequest bulkRequest, final long startTimeNanos, final ActionListener listener, final AtomicArray responses, Map indicesThatCannotBeCreated ) { new BulkOperation(task, bulkRequest, listener, responses, startTimeNanos, indicesThatCannotBeCreated).run(); } /** * Concrete indices * * @opensearch.internal */ private static class ConcreteIndices { private final ClusterState state; private final IndexNameExpressionResolver indexNameExpressionResolver; private final Map indices = new HashMap<>(); ConcreteIndices(ClusterState state, IndexNameExpressionResolver indexNameExpressionResolver) { this.state = state; this.indexNameExpressionResolver = indexNameExpressionResolver; } Index getConcreteIndex(String indexOrAlias) { return indices.get(indexOrAlias); } Index resolveIfAbsent(DocWriteRequest request) { Index concreteIndex = indices.get(request.index()); if (concreteIndex == null) { boolean includeDataStreams = request.opType() == DocWriteRequest.OpType.CREATE; try { concreteIndex = indexNameExpressionResolver.concreteWriteIndex( state, request.indicesOptions(), request.indices()[0], false, includeDataStreams ); } catch (IndexNotFoundException e) { if (includeDataStreams == false && e.getMetadataKeys().contains(EXCLUDED_DATA_STREAMS_KEY)) { throw new IllegalArgumentException("only write ops with an op_type of create are allowed in data streams"); } else { throw e; } } indices.put(request.index(), concreteIndex); } return concreteIndex; } } private long relativeTime() { return relativeTimeProvider.getAsLong(); } private void processBulkIndexIngestRequest( Task task, BulkRequest original, String executorName, ActionListener listener ) { final long ingestStartTimeInNanos = System.nanoTime(); final BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original); ingestService.executeBulkRequest( original.numberOfActions(), () -> bulkRequestModifier, bulkRequestModifier::markItemAsFailed, (originalThread, exception) -> { if (exception != null) { logger.debug("failed to execute pipeline for a bulk request", exception); listener.onFailure(exception); } else { long ingestTookInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - ingestStartTimeInNanos); BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest(); ActionListener actionListener = bulkRequestModifier.wrapActionListenerIfNeeded( ingestTookInMillis, listener ); if (bulkRequest.requests().isEmpty()) { // at this stage, the transport bulk action can't deal with a bulk request with no requests, // so we stop and send an empty response back to the client. // (this will happen if pre-processing all items in the bulk failed) actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0)); } else { // If a processor went async and returned a response on a different thread then // before we continue the bulk request we should fork back on a write thread: if (originalThread == Thread.currentThread()) { assert Thread.currentThread().getName().contains(executorName); doInternalExecute(task, bulkRequest, executorName, actionListener); } else { threadPool.executor(executorName).execute(new ActionRunnable(listener) { @Override protected void doRun() { doInternalExecute(task, bulkRequest, executorName, actionListener); } @Override public boolean isForceExecution() { // If we fork back to a write thread we **not** should fail, because tp queue is full. // (Otherwise the work done during ingest will be lost) // It is okay to force execution here. Throttling of write requests happens prior to // ingest when a node receives a bulk request. return true; } }); } } } }, bulkRequestModifier::markItemAsDropped, executorName ); } /** * A modifier for a bulk request * * @opensearch.internal */ static final class BulkRequestModifier implements Iterator> { final BulkRequest bulkRequest; final SparseFixedBitSet failedSlots; final List itemResponses; final AtomicIntegerArray originalSlots; volatile int currentSlot = -1; BulkRequestModifier(BulkRequest bulkRequest) { this.bulkRequest = bulkRequest; this.failedSlots = new SparseFixedBitSet(bulkRequest.requests().size()); this.itemResponses = new ArrayList<>(bulkRequest.requests().size()); this.originalSlots = new AtomicIntegerArray(bulkRequest.requests().size()); // oversize, but that's ok } @Override public DocWriteRequest next() { return bulkRequest.requests().get(++currentSlot); } @Override public boolean hasNext() { return (currentSlot + 1) < bulkRequest.requests().size(); } BulkRequest getBulkRequest() { if (itemResponses.isEmpty()) { return bulkRequest; } else { BulkRequest modifiedBulkRequest = new BulkRequest(); modifiedBulkRequest.setRefreshPolicy(bulkRequest.getRefreshPolicy()); modifiedBulkRequest.waitForActiveShards(bulkRequest.waitForActiveShards()); modifiedBulkRequest.timeout(bulkRequest.timeout()); int slot = 0; List> requests = bulkRequest.requests(); for (int i = 0; i < requests.size(); i++) { DocWriteRequest request = requests.get(i); if (failedSlots.get(i) == false) { modifiedBulkRequest.add(request); originalSlots.set(slot++, i); } } return modifiedBulkRequest; } } ActionListener wrapActionListenerIfNeeded(long ingestTookInMillis, ActionListener actionListener) { if (itemResponses.isEmpty()) { return ActionListener.map( actionListener, response -> new BulkResponse(response.getItems(), response.getTook().getMillis(), ingestTookInMillis) ); } else { return ActionListener.delegateFailure(actionListener, (delegatedListener, response) -> { BulkItemResponse[] items = response.getItems(); for (int i = 0; i < items.length; i++) { itemResponses.add(originalSlots.get(i), response.getItems()[i]); } delegatedListener.onResponse( new BulkResponse(itemResponses.toArray(new BulkItemResponse[0]), response.getTook().getMillis(), ingestTookInMillis) ); }); } } synchronized void markItemAsDropped(int slot) { IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot)); failedSlots.set(slot); final String id = indexRequest.id() == null ? DROPPED_ITEM_WITH_AUTO_GENERATED_ID : indexRequest.id(); itemResponses.add( new BulkItemResponse( slot, indexRequest.opType(), new UpdateResponse( new ShardId(indexRequest.index(), IndexMetadata.INDEX_UUID_NA_VALUE, 0), id, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM, indexRequest.version(), DocWriteResponse.Result.NOOP ) ) ); } synchronized void markItemAsFailed(int slot, Exception e) { IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot)); logger.debug( String.format( Locale.ROOT, "failed to execute pipeline [%s] for document [%s/%s]", indexRequest.getPipeline(), indexRequest.index(), indexRequest.id() ), e ); // We hit a error during preprocessing a request, so we: // 1) Remember the request item slot from the bulk, so that we're done processing all requests we know what failed // 2) Add a bulk item failure for this request // 3) Continue with the next request in the bulk. failedSlots.set(slot); BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexRequest.index(), indexRequest.id(), e); itemResponses.add(new BulkItemResponse(slot, indexRequest.opType(), failure)); } } }