/* SPDX-License-Identifier: Apache-2.0 * * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ /* * Modifications Copyright OpenSearch Contributors. See * GitHub history for details. * * Licensed to Elasticsearch B.V. under one or more contributor * license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright * ownership. Elasticsearch B.V. licenses this file to you under * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ using System; using System.Collections.Generic; using System.Linq; using OpenSearch.Net; namespace OpenSearch.Client { public interface IBulkAllRequest where T : class { /// In case of a HTTP 429 (Too Many Requests) response status code, how many times should we automatically back off before failing int? BackOffRetries { get; set; } /// In case of a HTTP 429 (Too Many Requests) response status code, how long should we wait before retrying Time BackOffTime { get; set; } /// /// Simple back pressure implementation that makes sure the minimum max concurrency between producer and consumer /// is not amplified by the greedier of the two by more then a given back pressure factor /// When set each bulk request will call /// ProducerConsumerBackPressure BackPressure { get; set; } /// /// By default, calls on the buffer. /// There might be case where you'd like more control over the bulk operation. By setting this callback, you are in complete control /// of describing how the buffer should be translated to a bulk operation. /// Action> BufferToBulk { get; set; } /// /// Halt the bulk all request if any of the documents returned is a failure that can not be retried. /// When true, will feed dropped documents to /// bool ContinueAfterDroppedDocuments { get; set; } /// /// The documents to send to OpenSearch, ideally lazily evaluated by using /// return to provide each document. /// will eager evaluate each partitioned page when operating on it, using /// . /// IEnumerable Documents { get; } /// /// If a bulk operation fails because it receives documents it can not retry they will be fed to this callback. /// If is set to true processing will continue, so this callback can be used /// to feed into a dead letter queue. Otherwise bulk all indexing will be halted. /// Action DroppedDocumentCallback { get; set; } ///The index to use for items that don't specify one. By default, will be inferred from . /// If no default index has been mapped for /// using /// on , an exception will be thrown. /// IndexName Index { get; set; } ///The maximum number of bulk operations we want to have in flight at a time int? MaxDegreeOfParallelism { get; set; } ///The pipeline id to preprocess all the incoming documents with string Pipeline { get; set; } /// The indices you wish to refresh after the bulk all completes, defaults to Indices RefreshIndices { get; set; } /// /// Refresh the index after performing ALL the bulk operations (NOTE this is an additional request) /// bool RefreshOnCompleted { get; set; } /// /// A predicate to control which documents should be retried. /// Defaults to failed bulk items with a HTTP 429 (Too Many Requests) response status code. /// Func RetryDocumentPredicate { get; set; } ///Specific per bulk operation routing value Routing Routing { get; set; } /// The number of documents to send per bulk int? Size { get; set; } ///Explicit per operation timeout Time Timeout { get; set; } /// /// Sets the number of shard copies that must be active before proceeding with the bulk operation. /// Defaults to 1, meaning the primary shard only. Set to `all` for all shard copies, otherwise set to any /// non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1) /// int? WaitForActiveShards { get; set; } /// /// Be notified every time a bulk response returns, this includes retries. /// is only called for successful batches. /// Action BulkResponseCallback { get; set; } } public class BulkAllRequest : IBulkAllRequest, IHelperCallable where T : class { public BulkAllRequest(IEnumerable documents) { Documents = documents; Index = typeof(T); } /// public int? BackOffRetries { get; set; } /// public Time BackOffTime { get; set; } /// public ProducerConsumerBackPressure BackPressure { get; set; } /// public Action> BufferToBulk { get; set; } /// public bool ContinueAfterDroppedDocuments { get; set; } /// public IEnumerable Documents { get; } /// public Action DroppedDocumentCallback { get; set; } /// public IndexName Index { get; set; } /// public int? MaxDegreeOfParallelism { get; set; } /// public string Pipeline { get; set; } /// public Indices RefreshIndices { get; set; } /// public bool RefreshOnCompleted { get; set; } /// public Func RetryDocumentPredicate { get; set; } /// public Routing Routing { get; set; } /// public int? Size { get; set; } /// public Time Timeout { get; set; } /// public int? WaitForActiveShards { get; set; } /// public Action BulkResponseCallback { get; set; } internal RequestMetaData ParentMetaData { get; set; } RequestMetaData IHelperCallable.ParentMetaData { get => ParentMetaData; set => ParentMetaData = value; } } public class BulkAllDescriptor : DescriptorBase, IBulkAllRequest>, IBulkAllRequest, IHelperCallable where T : class { private readonly IEnumerable _documents; public BulkAllDescriptor(IEnumerable documents) { _documents = documents; ((IBulkAllRequest)this).Index = typeof(T); } int? IBulkAllRequest.BackOffRetries { get; set; } Time IBulkAllRequest.BackOffTime { get; set; } ProducerConsumerBackPressure IBulkAllRequest.BackPressure { get; set; } Action> IBulkAllRequest.BufferToBulk { get; set; } bool IBulkAllRequest.ContinueAfterDroppedDocuments { get; set; } IEnumerable IBulkAllRequest.Documents => _documents; Action IBulkAllRequest.DroppedDocumentCallback { get; set; } IndexName IBulkAllRequest.Index { get; set; } int? IBulkAllRequest.MaxDegreeOfParallelism { get; set; } string IBulkAllRequest.Pipeline { get; set; } Indices IBulkAllRequest.RefreshIndices { get; set; } bool IBulkAllRequest.RefreshOnCompleted { get; set; } Func IBulkAllRequest.RetryDocumentPredicate { get; set; } Routing IBulkAllRequest.Routing { get; set; } int? IBulkAllRequest.Size { get; set; } Time IBulkAllRequest.Timeout { get; set; } int? IBulkAllRequest.WaitForActiveShards { get; set; } Action IBulkAllRequest.BulkResponseCallback { get; set; } RequestMetaData IHelperCallable.ParentMetaData { get; set; } /// public BulkAllDescriptor MaxDegreeOfParallelism(int? parallelism) => Assign(parallelism, (a, v) => a.MaxDegreeOfParallelism = v); /// public BulkAllDescriptor Size(int? size) => Assign(size, (a, v) => a.Size = v); /// public BulkAllDescriptor BackOffRetries(int? backoffs) => Assign(backoffs, (a, v) => a.BackOffRetries = v); /// public BulkAllDescriptor BackOffTime(Time time) => Assign(time, (a, v) => a.BackOffTime = v); /// public BulkAllDescriptor Index(IndexName index) => Assign(index, (a, v) => a.Index = v); /// public BulkAllDescriptor Index() where TOther : class => Assign(typeof(TOther), (a, v) => a.Index = v); /// public BulkAllDescriptor RefreshOnCompleted(bool refresh = true) => Assign(refresh, (a, v) => a.RefreshOnCompleted = v); /// public BulkAllDescriptor RefreshIndices(Indices indicesToRefresh) => Assign(indicesToRefresh, (a, v) => a.RefreshIndices = v); /// public BulkAllDescriptor Routing(Routing routing) => Assign(routing, (a, v) => a.Routing = v); /// public BulkAllDescriptor Timeout(Time timeout) => Assign(timeout, (a, v) => a.Timeout = v); /// public BulkAllDescriptor Pipeline(string pipeline) => Assign(pipeline, (a, v) => a.Pipeline = v); /// public BulkAllDescriptor BufferToBulk(Action> modifier) => Assign(modifier, (a, v) => a.BufferToBulk = v); /// public BulkAllDescriptor RetryDocumentPredicate(Func predicate) => Assign(predicate, (a, v) => a.RetryDocumentPredicate = v); /// /// Simple back pressure implementation that makes sure the minimum max concurrency between producer and consumer /// is not amplified by the greedier of the two by more then a given back pressure factor /// When set each scroll request will additionally wait on as well as /// if set. Not that the consumer has to call /// on the same instance every time it is done. /// /// The minimum maximum concurrency which would be the bottleneck of the producer consumer pipeline /// The maximum amplification back pressure of the greedier part of the producer consumer pipeline public BulkAllDescriptor BackPressure(int maxConcurrency, int? backPressureFactor = null) => Assign(new ProducerConsumerBackPressure(backPressureFactor, maxConcurrency), (a, v) => a.BackPressure = v); /// public BulkAllDescriptor ContinueAfterDroppedDocuments(bool proceed = true) => Assign(proceed, (a, v) => a.ContinueAfterDroppedDocuments = v); /// public BulkAllDescriptor DroppedDocumentCallback(Action callback) => Assign(callback, (a, v) => a.DroppedDocumentCallback = v); /// public BulkAllDescriptor BulkResponseCallback(Action callback) => Assign(callback, (a, v) => a.BulkResponseCallback = v); } }