/* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
using System;
using System.Collections.Generic;
using OpenSearch.Client.Specification.IndicesApi;
using OpenSearch.Net;
namespace OpenSearch.Client
{
///
/// A reindex implementation that uses ScrollAll() BulkAll() to compose a reindex pipeline.
///
/// This differs from ReindexOnServer() in that documents are fetched from OpenSearch, transformed on the client side,
/// then sent back to OpenSearch.
///
///
/// This will create the target index if it doesn't exist already. If is not specified
/// and the source of the reindex points to a single index we try and reuse the settings from source.
/// You can completely opt out of all of this using
///
///
public interface IReindexRequest
where TSource : class
where TTarget : class
{
///
/// The scroll typically outperforms the bulk operations by a long shot. If we'd leave things unbounded you'd quickly have way too many pending
/// scroll
/// requests. What this property allows you to express is that for each bucket in the max concurrency of the minimum max concurrency between
/// producer and consumer amply the maximum overal pending of the other side by this factor. Typically the concurrency of the consumer
/// (bulkall) will
/// be lower and with this factor we can dampen the overall pending scroll requests while we are still processing bulk requests.
/// defaults to 4 if not provided
///
int? BackPressureFactor { get; set; }
///
/// Provide a factory for the bulk all request, the first argument is the lazy collection of scroll results which is a mandatory
/// argument to create or
///
/// Note that is always overriden as well as
///
///
///
Func>, IBulkAllRequest>> BulkAll { get; set; }
///
/// Describe how the newly created index should be created. Remember you can also register Index Templates for more dynamic usecases.
///
ICreateIndexRequest CreateIndexRequest { get; set; }
Func Map { get; set; }
///
/// Do not send a create index call on the target index, assume the index has been created outside of the reindex.
/// Reindex will never create the index if it already exists however this will also omit the IndexExists call.
///
bool OmitIndexCreation { get; set; }
///
/// Describes the scroll operation where we need to fetch the documents from.
///
/// Note that can be overriden by our own.
///
///
IScrollAllRequest ScrollAll { get; set; }
}
///
public interface IReindexRequest : IReindexRequest where TSource : class { }
///
public class ReindexRequest : IReindexRequest
where TSource : class
where TTarget : class
{
///
/// The scroll operation yielding the source documents for the reindex operation
/// A function that converts from a source document to a target document
/// A factory that instantiates the bulk all operation over the lazy stream of search result hits
public ReindexRequest(IScrollAllRequest scrollSource, Func map,
Func>, IBulkAllRequest>> bulkAllTarget
)
{
scrollSource.ThrowIfNull(nameof(scrollSource), "scrollSource must be set in order to get the source of a Reindex operation");
bulkAllTarget.ThrowIfNull(nameof(bulkAllTarget), "bulkAllTarget must set in order to get the target of a Reindex operation");
map.ThrowIfNull(nameof(map), "map must be set to know how to take TSource and transform it into TTarget");
var i = (IReindexRequest)this;
i.ScrollAll = scrollSource;
i.BulkAll = bulkAllTarget;
i.Map = map;
}
///
public int? BackPressureFactor { get; set; }
///
public ICreateIndexRequest CreateIndexRequest { get; set; }
///
public bool OmitIndexCreation { get; set; }
///
Func>, IBulkAllRequest>> IReindexRequest.BulkAll { get; set; }
///
Func IReindexRequest.Map { get; set; }
///
IScrollAllRequest IReindexRequest.ScrollAll { get; set; }
}
public class ReindexRequest : ReindexRequest
where TSource : class
{
public ReindexRequest(IScrollAllRequest scrollSource,
Func>, IBulkAllRequest>> bulkAllTarget
)
: base(scrollSource, s => s, bulkAllTarget) { }
}
public class ReindexDescriptor
: DescriptorBase, IReindexRequest>, IReindexRequest
where TSource : class
where TTarget : class
{
private Func>, IBulkAllRequest>> _createBulkAll;
public ReindexDescriptor(Func mapper)
{
var i = (IReindexRequest)this;
i.BulkAll = d => _createBulkAll.InvokeOrDefault(new BulkAllDescriptor>(d));
i.Map = mapper;
}
int? IReindexRequest.BackPressureFactor { get; set; }
Func>, IBulkAllRequest>> IReindexRequest.BulkAll { get; set; }
ICreateIndexRequest IReindexRequest.CreateIndexRequest { get; set; }
Func IReindexRequest.Map { get; set; }
bool IReindexRequest.OmitIndexCreation { get; set; }
IScrollAllRequest IReindexRequest.ScrollAll { get; set; }
///
public ReindexDescriptor ScrollAll(Time scrollTime, int slices,
Func, IScrollAllRequest> selector = null
) =>
Assign(selector, (a, v) => a.ScrollAll = v.InvokeOrDefault(new ScrollAllDescriptor(scrollTime, slices)));
///
public ReindexDescriptor BackPressureFactor(int? maximum) =>
Assign(maximum, (a, v) => a.BackPressureFactor = v);
///
public ReindexDescriptor BulkAll(
Func>, IBulkAllRequest>> selector
)
{
_createBulkAll = selector;
return this;
}
///
public ReindexDescriptor OmitIndexCreation(bool omit = true) => Assign(omit, (a, v) => a.OmitIndexCreation = v);
///
public ReindexDescriptor CreateIndex(Func createIndexSelector) =>
Assign(createIndexSelector.InvokeOrDefault(new CreateIndexDescriptor("ignored")), (a, v) => a.CreateIndexRequest = v);
}
}