/* SPDX-License-Identifier: Apache-2.0 * * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ /* * Modifications Copyright OpenSearch Contributors. See * GitHub history for details. * * Licensed to Elasticsearch B.V. under one or more contributor * license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright * ownership. Elasticsearch B.V. licenses this file to you under * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using OpenSearch.Net; namespace OpenSearch.Client { public class BulkAllObservable : IDisposable, IObservable where T : class { private readonly int _backOffRetries; private readonly TimeSpan _backOffTime; private readonly int _bulkSize; private readonly IOpenSearchClient _client; private readonly CancellationToken _compositeCancelToken; private readonly CancellationTokenSource _compositeCancelTokenSource; private readonly Action _droppedDocumentCallBack; private readonly int _maxDegreeOfParallelism; private readonly IBulkAllRequest _partitionedBulkRequest; private readonly Func _retryPredicate; private Action _incrementFailed = () => { }; private Action _incrementRetries = () => { }; private readonly Action _bulkResponseCallback; public BulkAllObservable( IOpenSearchClient client, IBulkAllRequest partitionedBulkRequest, CancellationToken cancellationToken = default ) { _client = client; _partitionedBulkRequest = partitionedBulkRequest; _backOffRetries = _partitionedBulkRequest.BackOffRetries.GetValueOrDefault(CoordinatedRequestDefaults.BulkAllBackOffRetriesDefault); _backOffTime = _partitionedBulkRequest?.BackOffTime?.ToTimeSpan() ?? CoordinatedRequestDefaults.BulkAllBackOffTimeDefault; _bulkSize = _partitionedBulkRequest.Size ?? CoordinatedRequestDefaults.BulkAllSizeDefault; _retryPredicate = _partitionedBulkRequest.RetryDocumentPredicate ?? RetryBulkActionPredicate; _droppedDocumentCallBack = _partitionedBulkRequest.DroppedDocumentCallback ?? DroppedDocumentCallbackDefault; _bulkResponseCallback = _partitionedBulkRequest.BulkResponseCallback; _maxDegreeOfParallelism = _partitionedBulkRequest.MaxDegreeOfParallelism ?? CoordinatedRequestDefaults.BulkAllMaxDegreeOfParallelismDefault; _compositeCancelTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); _compositeCancelToken = _compositeCancelTokenSource.Token; } public void Dispose() { _compositeCancelTokenSource?.Cancel(); _compositeCancelTokenSource?.Dispose(); } public IDisposable Subscribe(IObserver observer) { observer.ThrowIfNull(nameof(observer)); BulkAll(observer); return this; } public IDisposable Subscribe(BulkAllObserver observer) { _incrementFailed = observer.IncrementTotalNumberOfFailedBuffers; _incrementRetries = observer.IncrementTotalNumberOfRetries; return Subscribe((IObserver)observer); } private void BulkAll(IObserver observer) { var documents = _partitionedBulkRequest.Documents; var partitioned = new PartitionHelper(documents, _bulkSize); #pragma warning disable 4014 partitioned.ForEachAsync( #pragma warning restore 4014 (buffer, page) => BulkAsync(buffer, page, 0), (buffer, response) => observer.OnNext(response), ex => OnCompleted(ex, observer), _maxDegreeOfParallelism ); } private void OnCompleted(Exception exception, IObserver observer) { if (exception != null) observer.OnError(exception); else { try { RefreshOnCompleted(); observer.OnCompleted(); } catch (Exception e) { observer.OnError(e); } } } private void RefreshOnCompleted() { if (!_partitionedBulkRequest.RefreshOnCompleted) return; var indices = _partitionedBulkRequest.RefreshIndices ?? _partitionedBulkRequest.Index; if (indices == null) return; var refresh = _client.Indices.Refresh(indices, r => r.RequestConfiguration(rc => { switch (_partitionedBulkRequest) { case IHelperCallable helperCallable when helperCallable.ParentMetaData is object: rc.RequestMetaData(helperCallable.ParentMetaData); break; default: rc.RequestMetaData(RequestMetaDataFactory.BulkHelperRequestMetaData()); break; } return rc; })); if (!refresh.IsValid) throw Throw($"Refreshing after all documents have indexed failed", refresh.ApiCall); } private async Task BulkAsync(IList buffer, long page, int backOffRetries) { _compositeCancelToken.ThrowIfCancellationRequested(); var request = _partitionedBulkRequest; var response = await _client.BulkAsync(s => { s.Index(request.Index); s.Timeout(request.Timeout); if (request.BufferToBulk != null) request.BufferToBulk(s, buffer); else s.IndexMany(buffer); if (!string.IsNullOrEmpty(request.Pipeline)) s.Pipeline(request.Pipeline); if (request.Routing != null) s.Routing(request.Routing); if (request.WaitForActiveShards.HasValue) s.WaitForActiveShards(request.WaitForActiveShards.ToString()); switch (_partitionedBulkRequest) { case IHelperCallable helperCallable when helperCallable.ParentMetaData is object: s.RequestConfiguration(rc => rc.RequestMetaData(helperCallable.ParentMetaData)); break; default: s.RequestConfiguration(rc => rc.RequestMetaData(RequestMetaDataFactory.BulkHelperRequestMetaData())); break; } return s; }, _compositeCancelToken) .ConfigureAwait(false); _compositeCancelToken.ThrowIfCancellationRequested(); _bulkResponseCallback?.Invoke(response); if (!response.ApiCall.Success) return await HandleBulkRequest(buffer, page, backOffRetries, response).ConfigureAwait(false); var retryableDocuments = new List(); var droppedDocuments = new List>(); foreach (var documentWithResponse in response.Items.Zip(buffer, Tuple.Create)) { if (documentWithResponse.Item1.IsValid) continue; if (_retryPredicate(documentWithResponse.Item1, documentWithResponse.Item2)) retryableDocuments.Add(documentWithResponse.Item2); else droppedDocuments.Add(documentWithResponse); } HandleDroppedDocuments(droppedDocuments, response); if (retryableDocuments.Count > 0 && backOffRetries < _backOffRetries) return await RetryDocuments(page, ++backOffRetries, retryableDocuments).ConfigureAwait(false); if (retryableDocuments.Count > 0) throw ThrowOnBadBulk(response, $"Bulk indexing failed and after retrying {backOffRetries} times"); request.BackPressure?.Release(); return new BulkAllResponse { Retries = backOffRetries, Page = page, Items = response.Items }; } private void HandleDroppedDocuments(List> droppedDocuments, BulkResponse response) { if (droppedDocuments.Count <= 0) return; foreach (var dropped in droppedDocuments) _droppedDocumentCallBack(dropped.Item1, dropped.Item2); if (!_partitionedBulkRequest.ContinueAfterDroppedDocuments) throw ThrowOnBadBulk(response, $"{nameof(BulkAll)} halted after receiving failures that can not be retried from _bulk"); } private async Task HandleBulkRequest(IList buffer, long page, int backOffRetries, BulkResponse response) { var clientException = response.ApiCall.OriginalException as OpenSearchClientException; var failureReason = clientException?.FailureReason; var reason = failureReason?.GetStringValue() ?? nameof(PipelineFailure.BadRequest); switch (failureReason) { case PipelineFailure.MaxRetriesReached: if (response.ApiCall.AuditTrail.Last().Event == AuditEvent.FailedOverAllNodes) throw ThrowOnBadBulk(response, $"{nameof(BulkAll)} halted after attempted bulk failed over all the active nodes"); ThrowOnExhaustedRetries(); return await RetryDocuments(page, ++backOffRetries, buffer).ConfigureAwait(false); case PipelineFailure.CouldNotStartSniffOnStartup: case PipelineFailure.BadAuthentication: case PipelineFailure.NoNodesAttempted: case PipelineFailure.SniffFailure: case PipelineFailure.Unexpected: throw ThrowOnBadBulk(response, $"{nameof(BulkAll)} halted after {nameof(PipelineFailure)}.{reason} from _bulk"); case PipelineFailure.BadResponse: case PipelineFailure.PingFailure: case PipelineFailure.MaxTimeoutReached: case PipelineFailure.BadRequest: default: ThrowOnExhaustedRetries(); return await RetryDocuments(page, ++backOffRetries, buffer).ConfigureAwait(false); } void ThrowOnExhaustedRetries() { if (backOffRetries < _backOffRetries) return; throw ThrowOnBadBulk(response, $"{nameof(BulkAll)} halted after {nameof(PipelineFailure)}.{reason} from _bulk and exhausting retries ({backOffRetries})" ); } } private async Task RetryDocuments(long page, int backOffRetries, IList retryDocuments) { _incrementRetries(); await Task.Delay(_backOffTime, _compositeCancelToken).ConfigureAwait(false); return await BulkAsync(retryDocuments, page, backOffRetries).ConfigureAwait(false); } private Exception ThrowOnBadBulk(IOpenSearchResponse response, string message) { _incrementFailed(); _partitionedBulkRequest.BackPressure?.Release(); return Throw(message, response.ApiCall); } private static OpenSearchClientException Throw(string message, IApiCallDetails details) => new OpenSearchClientException(PipelineFailure.BadResponse, message, details); private static bool RetryBulkActionPredicate(BulkResponseItemBase bulkResponseItem, T d) => bulkResponseItem.Status == 429; private static void DroppedDocumentCallbackDefault(BulkResponseItemBase bulkResponseItem, T d) { } } }