/* SPDX-License-Identifier: Apache-2.0 * * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ /* * Modifications Copyright OpenSearch Contributors. See * GitHub history for details. * * Licensed to Elasticsearch B.V. under one or more contributor * license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright * ownership. Elasticsearch B.V. licenses this file to you under * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using OpenSearch.OpenSearch.Xunit.XunitPlumbing; using OpenSearch.Net; using FluentAssertions; using OpenSearch.Client; using OpenSearch.Client.Specification.SnapshotApi; namespace Tests.MetaHeader { public class MetaHeaderHelperTests { [U] public void BulkAllHelperRequestsIncludeExpectedHelperMetaData() { var pool = new SingleNodeConnectionPool(new Uri("http://localhost:9200")); // We can avoid specifying response bodies and this still exercises all requests. var responses = new List<(int, string)> { (200, "{}"), (200, "{}"), (200, "{}") }; var connection = new TestableInMemoryConnection(a => a.RequestMetaData.Single(x => x.Key == "helper").Value.Should().Be("b"), responses); var settings = new ConnectionSettings(pool, connection); var client = new OpenSearchClient(settings); var documents = CreateLazyStreamOfDocuments(20); var observableBulk = client.BulkAll(documents, f => f .MaxDegreeOfParallelism(8) .BackOffTime(TimeSpan.FromSeconds(10)) .BackOffRetries(2) .Size(10) .RefreshOnCompleted() .Index("an-index") ); var bulkObserver = observableBulk.Wait(TimeSpan.FromMinutes(5), b => { foreach (var item in b.Items) { item.IsValid.Should().BeTrue(); item.Id.Should().NotBeNullOrEmpty(); } }); connection.AssertExpectedCallCount(); } [U] public void ScrollAllHelperRequestsIncludeExpectedHelperMetaData() { var pool = new SingleNodeConnectionPool(new Uri("http://localhost:9200")); var responses = new List<(int, string)> { (200, "{\"_scroll_id\":\"SCROLLID\",\"took\":0,\"timed_out\":false,\"_shards\":{\"total\":1,\"successful\":1,\"skipped\":0,\"failed\":0},\"hits\":{\"total\":{\"value\":0,\"relation\":\"eq\"},\"max_score\":null,\"hits\":[]}}"), (200, "{\"_scroll_id\":\"SCROLLID\",\"took\":0,\"timed_out\":false,\"_shards\":{\"total\":1,\"successful\":1,\"skipped\":0,\"failed\":0},\"hits\":{\"total\":{\"value\":1,\"relation\":\"eq\"},\"max_score\":null,\"hits\":[{\"_index\":\"index-a\",\"_type\":\"_doc\",\"_id\":\"ISXw0HYBAJbnbq7-Utq6\",\"_score\":null,\"_source\":{\"name\": \"name-a\"},\"sort\":[0]}]}}"), (200, "{\"_scroll_id\":\"SCROLLID\",\"took\":1,\"timed_out\":false,\"terminated_early\":false,\"_shards\":{\"total\":1,\"successful\":1,\"skipped\":0,\"failed\":0},\"hits\":{\"total\":{\"value\":1,\"relation\":\"eq\"},\"max_score\":null,\"hits\":[]}}") }; var connection = new TestableInMemoryConnection(a => a.RequestMetaData.Single(x => x.Key == "helper").Value.Should().Be("s"), responses); var settings = new ConnectionSettings(pool, connection); var client = new OpenSearchClient(settings); var documents = CreateLazyStreamOfDocuments(20); var observableScroll = client.ScrollAll<SmallObject>("5s", 2, s => s.Search(ss => ss.Size(2).Index("index-a"))); var bulkObserver = observableScroll.Wait(TimeSpan.FromMinutes(5), _ => { }); connection.AssertExpectedCallCount(); } [U] public void ReindexHelperRequestsIncludeExpectedHelperMetaData() { var pool = new SingleNodeConnectionPool(new Uri("http://localhost:9200")); var responses = new List<(int, string)> { (404, "{}"), (200, "{\"index-a\":{\"aliases\":{},\"mappings\":{\"properties\":{\"name\":{\"type\":\"keyword\"}}},\"settings\":{\"index\":{\"routing\":{\"allocation\":{\"include\":{\"_tier_preference\":\"data_content\"}}},\"number_of_shards\":\"1\",\"provided_name\":\"index-a\",\"creation_date\":\"1609823178261\",\"number_of_replicas\":\"1\",\"uuid\":\"2R4H1VfTR5imfmIPkNIIxw\",\"version\":{\"created\":\"7100099\"}}}}}"), (200, "{\"acknowledged\":true,\"shards_acknowledged\":true,\"index\":\"index-b\"}"), (200, "{\"_scroll_id\":\"SCROLLID\",\"took\":0,\"timed_out\":false,\"_shards\":{\"total\":1,\"successful\":1,\"skipped\":0,\"failed\":0},\"hits\":{\"total\":{\"value\":0,\"relation\":\"eq\"},\"max_score\":null,\"hits\":[]}}"), (200, "{\"_scroll_id\":\"SCROLLID\",\"took\":0,\"timed_out\":false,\"_shards\":{\"total\":1,\"successful\":1,\"skipped\":0,\"failed\":0},\"hits\":{\"total\":{\"value\":1,\"relation\":\"eq\"},\"max_score\":null,\"hits\":[{\"_index\":\"index-a\",\"_type\":\"_doc\",\"_id\":\"ISXw0HYBAJbnbq7-Utq6\",\"_score\":null,\"_source\":{\"name\": \"name-a\"},\"sort\":[0]}]}}"), (200, "{\"_scroll_id\":\"SCROLLID\",\"took\":1,\"timed_out\":false,\"terminated_early\":false,\"_shards\":{\"total\":1,\"successful\":1,\"skipped\":0,\"failed\":0},\"hits\":{\"total\":{\"value\":1,\"relation\":\"eq\"},\"max_score\":null,\"hits\":[]}}"), (200, "{\"took\":4,\"errors\":false,\"items\":[{\"index\":{\"_index\":\"index-b\",\"_type\":\"_doc\",\"_id\":\"ISXw0HYBAJbnbq7-Utq6\",\"_version\":1,\"result\":\"created\",\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0},\"_seq_no\":0,\"_primary_term\":1,\"status\":201}}]}") }; var connection = new TestableInMemoryConnection(a => a.RequestMetaData.Single(x => x.Key == "helper").Value.Should().Be("r"), responses); var settings = new ConnectionSettings(pool, connection); var client = new OpenSearchClient(settings); var reindexObserver = client.Reindex<SmallObject>(r => r .ScrollAll("5s", 2, s => s.Search(ss => ss.Size(2).Index("index-a"))) .BulkAll(b => b.Size(1).Index("index-b"))) .Wait(TimeSpan.FromMinutes(1), _ => { }); connection.AssertExpectedCallCount(); } [U] public void SnapshotHelperRequestsIncludeExpectedHelperMetaData() { var pool = new SingleNodeConnectionPool(new Uri("http://localhost:9200")); // We can avoid specifying response bodies and this still exercises all requests. var responses = new List<(int, string)> { (200, "{}"), (200, "{}") }; var connection = new TestableInMemoryConnection(a => a.RequestMetaData.Single(x => x.Key == "helper").Value.Should().Be("sn"), responses); var settings = new ConnectionSettings(pool, connection); var client = new OpenSearchClient(settings); var observableSnapshot = new SnapshotObservable(client, new SnapshotRequest("repository-a", "snapshot-a")); var observer = new SnapshotObserver(connection); using var subscription = observableSnapshot.Subscribe(observer); } private class SnapshotObserver : IObserver<SnapshotStatusResponse> { private readonly TestableInMemoryConnection _connection; public SnapshotObserver(TestableInMemoryConnection connection) => _connection = connection; public void OnCompleted() => _connection.AssertExpectedCallCount(); public void OnError(Exception error) => throw new NotImplementedException(); public void OnNext(SnapshotStatusResponse value) { } } protected static IEnumerable<SmallObject> CreateLazyStreamOfDocuments(int count) { for (var i = 0; i < count; i++) yield return new SmallObject { Name = i.ToString() }; } protected class SmallObject { public string Name { get; set; } } protected class TestableInMemoryConnection : IConnection { internal static readonly byte[] EmptyBody = Encoding.UTF8.GetBytes(""); private readonly Action<RequestData> _perRequestAssertion; private readonly List<(int, string)> _responses; private int _requestCounter = -1; public TestableInMemoryConnection(Action<RequestData> assertion, List<(int, string)> responses) { _perRequestAssertion = assertion; _responses = responses; } public void AssertExpectedCallCount() => _requestCounter.Should().Be(_responses.Count - 1); async Task<TResponse> IConnection.RequestAsync<TResponse>(RequestData requestData, CancellationToken cancellationToken) { Interlocked.Increment(ref _requestCounter); _perRequestAssertion(requestData); await Task.Yield(); // avoids test deadlocks int statusCode; string response; if (_responses.Count > _requestCounter) (statusCode, response) = _responses[_requestCounter]; else (statusCode, response) = (500, (string)null); var stream = !string.IsNullOrEmpty(response) ? requestData.MemoryStreamFactory.Create(Encoding.UTF8.GetBytes(response)) : requestData.MemoryStreamFactory.Create(EmptyBody); return await ResponseBuilder .ToResponseAsync<TResponse>(requestData, null, statusCode, null, stream, RequestData.MimeType, cancellationToken) .ConfigureAwait(false); } TResponse IConnection.Request<TResponse>(RequestData requestData) { Interlocked.Increment(ref _requestCounter); _perRequestAssertion(requestData); int statusCode; string response; if (_responses.Count > _requestCounter) (statusCode, response) = _responses[_requestCounter]; else (statusCode, response) = (200, (string)null); var stream = !string.IsNullOrEmpty(response) ? requestData.MemoryStreamFactory.Create(Encoding.UTF8.GetBytes(response)) : requestData.MemoryStreamFactory.Create(EmptyBody); return ResponseBuilder.ToResponse<TResponse>(requestData, null, statusCode, null, stream, RequestData.MimeType); } public void Dispose() { } } } }