/* SPDX-License-Identifier: Apache-2.0 * * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ /* * Modifications Copyright OpenSearch Contributors. See * GitHub history for details. * * Licensed to Elasticsearch B.V. under one or more contributor * license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright * ownership. Elasticsearch B.V. licenses this file to you under * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ using System; using System.Collections.Generic; using OpenSearch.Client.Specification.IndicesApi; using OpenSearch.Net; namespace OpenSearch.Client { /// /// A reindex implementation that uses ScrollAll() BulkAll() to compose a reindex pipeline. /// /// This differs from ReindexOnServer() in that documents are fetched from OpenSearch, transformed on the client side, /// then sent back to OpenSearch. /// /// /// This will create the target index if it doesn't exist already. If is not specified /// and the source of the reindex points to a single index we try and reuse the settings from source. /// You can completely opt out of all of this using /// /// public interface IReindexRequest where TSource : class where TTarget : class { /// /// The scroll typically outperforms the bulk operations by a long shot. If we'd leave things unbounded you'd quickly have way too many pending /// scroll /// requests. What this property allows you to express is that for each bucket in the max concurrency of the minimum max concurrency between /// producer and consumer amply the maximum overal pending of the other side by this factor. Typically the concurrency of the consumer /// (bulkall) will /// be lower and with this factor we can dampen the overall pending scroll requests while we are still processing bulk requests. /// defaults to 4 if not provided /// int? BackPressureFactor { get; set; } /// /// Provide a factory for the bulk all request, the first argument is the lazy collection of scroll results which is a mandatory /// argument to create or /// /// Note that is always overriden as well as /// /// /// Func>, IBulkAllRequest>> BulkAll { get; set; } /// /// Describe how the newly created index should be created. Remember you can also register Index Templates for more dynamic usecases. /// ICreateIndexRequest CreateIndexRequest { get; set; } Func Map { get; set; } /// /// Do not send a create index call on the target index, assume the index has been created outside of the reindex. /// Reindex will never create the index if it already exists however this will also omit the IndexExists call. /// bool OmitIndexCreation { get; set; } /// /// Describes the scroll operation where we need to fetch the documents from. /// /// Note that can be overriden by our own. /// /// IScrollAllRequest ScrollAll { get; set; } } /// public interface IReindexRequest : IReindexRequest where TSource : class { } /// public class ReindexRequest : IReindexRequest where TSource : class where TTarget : class { /// /// The scroll operation yielding the source documents for the reindex operation /// A function that converts from a source document to a target document /// A factory that instantiates the bulk all operation over the lazy stream of search result hits public ReindexRequest(IScrollAllRequest scrollSource, Func map, Func>, IBulkAllRequest>> bulkAllTarget ) { scrollSource.ThrowIfNull(nameof(scrollSource), "scrollSource must be set in order to get the source of a Reindex operation"); bulkAllTarget.ThrowIfNull(nameof(bulkAllTarget), "bulkAllTarget must set in order to get the target of a Reindex operation"); map.ThrowIfNull(nameof(map), "map must be set to know how to take TSource and transform it into TTarget"); var i = (IReindexRequest)this; i.ScrollAll = scrollSource; i.BulkAll = bulkAllTarget; i.Map = map; } /// public int? BackPressureFactor { get; set; } /// public ICreateIndexRequest CreateIndexRequest { get; set; } /// public bool OmitIndexCreation { get; set; } /// Func>, IBulkAllRequest>> IReindexRequest.BulkAll { get; set; } /// Func IReindexRequest.Map { get; set; } /// IScrollAllRequest IReindexRequest.ScrollAll { get; set; } } public class ReindexRequest : ReindexRequest where TSource : class { public ReindexRequest(IScrollAllRequest scrollSource, Func>, IBulkAllRequest>> bulkAllTarget ) : base(scrollSource, s => s, bulkAllTarget) { } } public class ReindexDescriptor : DescriptorBase, IReindexRequest>, IReindexRequest where TSource : class where TTarget : class { private Func>, IBulkAllRequest>> _createBulkAll; public ReindexDescriptor(Func mapper) { var i = (IReindexRequest)this; i.BulkAll = d => _createBulkAll.InvokeOrDefault(new BulkAllDescriptor>(d)); i.Map = mapper; } int? IReindexRequest.BackPressureFactor { get; set; } Func>, IBulkAllRequest>> IReindexRequest.BulkAll { get; set; } ICreateIndexRequest IReindexRequest.CreateIndexRequest { get; set; } Func IReindexRequest.Map { get; set; } bool IReindexRequest.OmitIndexCreation { get; set; } IScrollAllRequest IReindexRequest.ScrollAll { get; set; } /// public ReindexDescriptor ScrollAll(Time scrollTime, int slices, Func, IScrollAllRequest> selector = null ) => Assign(selector, (a, v) => a.ScrollAll = v.InvokeOrDefault(new ScrollAllDescriptor(scrollTime, slices))); /// public ReindexDescriptor BackPressureFactor(int? maximum) => Assign(maximum, (a, v) => a.BackPressureFactor = v); /// public ReindexDescriptor BulkAll( Func>, IBulkAllRequest>> selector ) { _createBulkAll = selector; return this; } /// public ReindexDescriptor OmitIndexCreation(bool omit = true) => Assign(omit, (a, v) => a.OmitIndexCreation = v); /// public ReindexDescriptor CreateIndex(Func createIndexSelector) => Assign(createIndexSelector.InvokeOrDefault(new CreateIndexDescriptor("ignored")), (a, v) => a.CreateIndexRequest = v); } }