/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/
package org.opensearch.backwards;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.ParseException;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.client.Request;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.Response;
import org.opensearch.client.ResponseException;
import org.opensearch.client.RestClient;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.Strings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.common.xcontent.support.XContentMapValues;
import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.test.rest.OpenSearchRestTestCase;
import org.opensearch.test.rest.yaml.ObjectPath;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
public class IndexingIT extends OpenSearchRestTestCase {
protected static final Version UPGRADE_FROM_VERSION = Version.fromString(System.getProperty("tests.upgrade_from_version"));
private int indexDocs(String index, final int idStart, final int numDocs) throws IOException {
for (int i = 0; i < numDocs; i++) {
final int id = idStart + i;
Request request = new Request("PUT", index + "/_doc/" + id);
request.setJsonEntity("{\"test\": \"test_" + randomAlphaOfLength(2) + "\"}");
assertOK(client().performRequest(request));
}
return numDocs;
}
/**
* Indexes a document in index with docId then concurrently updates the same document
* nUpdates times
*
* @return the document version after updates
*/
private int indexDocWithConcurrentUpdates(String index, final int docId, int nUpdates) throws IOException, InterruptedException {
indexDocs(index, docId, 1);
Thread[] indexThreads = new Thread[nUpdates];
for (int i = 0; i < nUpdates; i++) {
indexThreads[i] = new Thread(() -> {
try {
indexDocs(index, docId, 1);
} catch (IOException e) {
throw new AssertionError("failed while indexing [" + e.getMessage() + "]");
}
});
indexThreads[i].start();
}
for (Thread indexThread : indexThreads) {
indexThread.join();
}
return nUpdates + 1;
}
private void printClusterRouting() throws IOException, ParseException {
Request clusterStateRequest = new Request("GET", "_cluster/state/routing_nodes?pretty");
String clusterState = EntityUtils.toString(client().performRequest(clusterStateRequest).getEntity()).trim();
logger.info("cluster nodes: {}", clusterState);
}
/**
* This test verifies that segment replication does not break when primary shards are on lower OS version. It does this
* by verifying replica shards contains same number of documents as primary's.
*/
public void testIndexingWithPrimaryOnBwcNodes() throws Exception {
if (UPGRADE_FROM_VERSION.before(Version.V_2_4_0)) {
logger.info("--> Skip test for version {} where segment replication feature is not available", UPGRADE_FROM_VERSION);
return;
}
Nodes nodes = buildNodeAndVersions();
assumeFalse("new nodes is empty", nodes.getNewNodes().isEmpty());
logger.info("cluster discovered:\n {}", nodes.toString());
final List bwcNamesList = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.toList());
final String bwcNames = bwcNamesList.stream().collect(Collectors.joining(","));
// Update allocation settings so that primaries gets allocated only on nodes running on older version
Settings.Builder settings = Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put("index.routing.allocation.include._name", bwcNames);
final String index = "test-index";
createIndex(index, settings.build());
ensureNoInitializingShards(); // wait for all other shard activity to finish
int docCount = 200;
try (RestClient nodeClient = buildClient(restClientSettings(),
nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) {
logger.info("Remove allocation include settings so that shards can be allocated on current version nodes");
updateIndexSettings(index, Settings.builder().putNull("index.routing.allocation.include._name"));
// Add replicas so that it can be assigned on higher OS version nodes.
updateIndexSettings(index, Settings.builder().put("index.number_of_replicas", 2));
printClusterRouting();
ensureGreen(index);
// Index docs
indexDocs(index, 0, docCount);
// perform a refresh
assertOK(client().performRequest(new Request("POST", index + "/_flush")));
// verify replica catch up with primary
assertSeqNoOnShards(index, nodes, docCount, nodeClient);
}
}
/**
* This test creates a cluster with primary on higher version but due to {@link org.opensearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider};
* replica shard allocation on lower OpenSearch version is prevented. Thus, this test though cover the use case where
* primary shard containing nodes are running on higher OS version while replicas are unassigned.
*/
public void testIndexingWithReplicaOnBwcNodes() throws Exception {
if (UPGRADE_FROM_VERSION.before(Version.V_2_4_0)) {
logger.info("--> Skip test for version {} where segment replication feature is not available", UPGRADE_FROM_VERSION);
return;
}
Nodes nodes = buildNodeAndVersions();
assumeFalse("new nodes is empty", nodes.getNewNodes().isEmpty());
logger.info("cluster discovered:\n {}", nodes.toString());
final List bwcNamesList = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.toList());
final String bwcNames = bwcNamesList.stream().collect(Collectors.joining(","));
// Exclude bwc nodes from allocation so that primaries gets allocated on current/higher version
Settings.Builder settings = Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put("index.routing.allocation.exclude._name", bwcNames);
final String index = "test-index";
createIndex(index, settings.build());
ensureNoInitializingShards(); // wait for all other shard activity to finish
printClusterRouting();
int docCount = 200;
try (RestClient nodeClient = buildClient(restClientSettings(),
nodes.values().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) {
logger.info("allowing replica shards assignment on bwc nodes");
updateIndexSettings(index, Settings.builder().putNull("index.routing.allocation.exclude._name"));
// Add replicas so that it can be assigned on lower OS version nodes, but it doesn't work as called out in test overview
updateIndexSettings(index, Settings.builder().put("index.number_of_replicas", 2));
printClusterRouting();
// Index docs
indexDocs(index, 0, docCount);
// perform a refresh
assertOK(client().performRequest(new Request("POST", index + "/_flush")));
// verify replica catch up with primary
assertSeqNoOnShards(index, nodes, docCount, nodeClient);
}
}
public void testIndexVersionPropagation() throws Exception {
Nodes nodes = buildNodeAndVersions();
assumeFalse("new nodes is empty", nodes.getNewNodes().isEmpty());
logger.info("cluster discovered: {}", nodes.toString());
final List bwcNamesList = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.toList());
final String bwcNames = bwcNamesList.stream().collect(Collectors.joining(","));
Settings.Builder settings = Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2)
.put("index.routing.allocation.include._name", bwcNames);
final String index = "indexversionprop";
final int minUpdates = 5;
final int maxUpdates = 10;
createIndex(index, settings.build());
try (RestClient newNodeClient = buildClient(restClientSettings(),
nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) {
int nUpdates = randomIntBetween(minUpdates, maxUpdates);
logger.info("indexing docs with [{}] concurrent updates initially", nUpdates);
final int finalVersionForDoc1 = indexDocWithConcurrentUpdates(index, 1, nUpdates);
logger.info("allowing shards on all nodes");
updateIndexSettings(index, Settings.builder().putNull("index.routing.allocation.include._name"));
ensureGreen(index);
assertOK(client().performRequest(new Request("POST", index + "/_refresh")));
List shards = buildShards(index, nodes, newNodeClient);
Shard primary = buildShards(index, nodes, newNodeClient).stream().filter(Shard::isPrimary).findFirst().get();
logger.info("primary resolved to: " + primary.getNode().getNodeName());
for (Shard shard : shards) {
assertVersion(index, 1, "_only_nodes:" + shard.getNode().getNodeName(), finalVersionForDoc1);
assertCount(index, "_only_nodes:" + shard.getNode().getNodeName(), 1);
}
nUpdates = randomIntBetween(minUpdates, maxUpdates);
logger.info("indexing docs with [{}] concurrent updates after allowing shards on all nodes", nUpdates);
final int finalVersionForDoc2 = indexDocWithConcurrentUpdates(index, 2, nUpdates);
assertOK(client().performRequest(new Request("POST", index + "/_refresh")));
shards = buildShards(index, nodes, newNodeClient);
primary = shards.stream().filter(Shard::isPrimary).findFirst().get();
logger.info("primary resolved to: " + primary.getNode().getNodeName());
for (Shard shard : shards) {
assertVersion(index, 2, "_only_nodes:" + shard.getNode().getNodeName(), finalVersionForDoc2);
assertCount(index, "_only_nodes:" + shard.getNode().getNodeName(), 2);
}
primary = shards.stream().filter(Shard::isPrimary).findFirst().get();
logger.info("moving primary to new node by excluding {}", primary.getNode().getNodeName());
updateIndexSettings(index, Settings.builder().put("index.routing.allocation.exclude._name", primary.getNode().getNodeName()));
ensureGreen(index);
nUpdates = randomIntBetween(minUpdates, maxUpdates);
logger.info("indexing docs with [{}] concurrent updates after moving primary", nUpdates);
final int finalVersionForDoc3 = indexDocWithConcurrentUpdates(index, 3, nUpdates);
assertOK(client().performRequest(new Request("POST", index + "/_refresh")));
shards = buildShards(index, nodes, newNodeClient);
for (Shard shard : shards) {
assertVersion(index, 3, "_only_nodes:" + shard.getNode().getNodeName(), finalVersionForDoc3);
assertCount(index, "_only_nodes:" + shard.getNode().getNodeName(), 3);
}
logger.info("setting number of replicas to 0");
updateIndexSettings(index, Settings.builder().put("index.number_of_replicas", 0));
ensureGreen(index);
nUpdates = randomIntBetween(minUpdates, maxUpdates);
logger.info("indexing doc with [{}] concurrent updates after setting number of replicas to 0", nUpdates);
final int finalVersionForDoc4 = indexDocWithConcurrentUpdates(index, 4, nUpdates);
assertOK(client().performRequest(new Request("POST", index + "/_refresh")));
shards = buildShards(index, nodes, newNodeClient);
for (Shard shard : shards) {
assertVersion(index, 4, "_only_nodes:" + shard.getNode().getNodeName(), finalVersionForDoc4);
assertCount(index, "_only_nodes:" + shard.getNode().getNodeName(), 4);
}
logger.info("setting number of replicas to 1");
updateIndexSettings(index, Settings.builder().put("index.number_of_replicas", 1));
ensureGreen(index);
nUpdates = randomIntBetween(minUpdates, maxUpdates);
logger.info("indexing doc with [{}] concurrent updates after setting number of replicas to 1", nUpdates);
final int finalVersionForDoc5 = indexDocWithConcurrentUpdates(index, 5, nUpdates);
assertOK(client().performRequest(new Request("POST", index + "/_refresh")));
shards = buildShards(index, nodes, newNodeClient);
for (Shard shard : shards) {
assertVersion(index, 5, "_only_nodes:" + shard.getNode().getNodeName(), finalVersionForDoc5);
assertCount(index, "_only_nodes:" + shard.getNode().getNodeName(), 5);
}
}
}
public void testSeqNoCheckpoints() throws Exception {
Nodes nodes = buildNodeAndVersions();
assumeFalse("new nodes is empty", nodes.getNewNodes().isEmpty());
logger.info("cluster discovered: {}", nodes.toString());
final List bwcNamesList = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.toList());
final String bwcNames = bwcNamesList.stream().collect(Collectors.joining(","));
Settings.Builder settings = Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2)
.put("index.routing.allocation.include._name", bwcNames);
final String index = "test";
createIndex(index, settings.build());
try (RestClient newNodeClient = buildClient(restClientSettings(),
nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) {
int numDocs = 0;
final int numberOfInitialDocs = 1 + randomInt(5);
logger.info("indexing [{}] docs initially", numberOfInitialDocs);
numDocs += indexDocs(index, 0, numberOfInitialDocs);
boolean compat = nodes.getBWCVersion().compareTo(LegacyESVersion.fromId(6000000)) >= 0;
assertSeqNoOnShards(index, nodes, compat ? numDocs : 0, newNodeClient);
logger.info("allowing shards on all nodes");
updateIndexSettings(index, Settings.builder().putNull("index.routing.allocation.include._name"));
ensureGreen(index);
assertOK(client().performRequest(new Request("POST", index + "/_refresh")));
for (final String bwcName : bwcNamesList) {
assertCount(index, "_only_nodes:" + bwcName, numDocs);
}
final int numberOfDocsAfterAllowingShardsOnAllNodes = 1 + randomInt(5);
logger.info("indexing [{}] docs after allowing shards on all nodes", numberOfDocsAfterAllowingShardsOnAllNodes);
numDocs += indexDocs(index, numDocs, numberOfDocsAfterAllowingShardsOnAllNodes);
compat = nodes.getBWCVersion().compareTo(LegacyESVersion.fromId(6000000)) >= 0;
assertSeqNoOnShards(index, nodes, compat ? numDocs : 0, newNodeClient);
Shard primary = buildShards(index, nodes, newNodeClient).stream().filter(Shard::isPrimary).findFirst().get();
logger.info("moving primary to new node by excluding {}", primary.getNode().getNodeName());
updateIndexSettings(index, Settings.builder().put("index.routing.allocation.exclude._name", primary.getNode().getNodeName()));
ensureGreen(index);
int numDocsOnNewPrimary = 0;
final int numberOfDocsAfterMovingPrimary = 1 + randomInt(5);
logger.info("indexing [{}] docs after moving primary", numberOfDocsAfterMovingPrimary);
numDocsOnNewPrimary += indexDocs(index, numDocs, numberOfDocsAfterMovingPrimary);
numDocs += numberOfDocsAfterMovingPrimary;
compat = nodes.getBWCVersion().compareTo(LegacyESVersion.fromId(6000000)) >= 0;
assertSeqNoOnShards(index, nodes, compat ? numDocs : numDocsOnNewPrimary, newNodeClient);
/*
* Dropping the number of replicas to zero, and then increasing it to one triggers a recovery thus exercising any BWC-logic in
* the recovery code.
*/
logger.info("setting number of replicas to 0");
updateIndexSettings(index, Settings.builder().put("index.number_of_replicas", 0));
final int numberOfDocsAfterDroppingReplicas = 1 + randomInt(5);
logger.info("indexing [{}] docs after setting number of replicas to 0", numberOfDocsAfterDroppingReplicas);
numDocsOnNewPrimary += indexDocs(index, numDocs, numberOfDocsAfterDroppingReplicas);
numDocs += numberOfDocsAfterDroppingReplicas;
logger.info("setting number of replicas to 1");
updateIndexSettings(index, Settings.builder().put("index.number_of_replicas", 1));
ensureGreen(index);
assertOK(client().performRequest(new Request("POST", index + "/_refresh")));
for (Shard shard : buildShards(index, nodes, newNodeClient)) {
assertCount(index, "_only_nodes:" + shard.node.nodeName, numDocs);
}
compat = nodes.getBWCVersion().compareTo(LegacyESVersion.fromId(6000000)) >= 0;
assertSeqNoOnShards(index, nodes, compat ? numDocs : numDocsOnNewPrimary, newNodeClient);
}
}
public void testUpdateSnapshotStatus() throws Exception {
Nodes nodes = buildNodeAndVersions();
assumeFalse("new nodes is empty", nodes.getNewNodes().isEmpty());
logger.info("cluster discovered: {}", nodes.toString());
// Create the repository before taking the snapshot.
Request request = new Request("PUT", "/_snapshot/repo");
request.setJsonEntity(Strings
.toString(JsonXContent.contentBuilder()
.startObject()
.field("type", "fs")
.startObject("settings")
.field("compress", randomBoolean())
.field("location", System.getProperty("tests.path.repo"))
.endObject()
.endObject()));
assertOK(client().performRequest(request));
String bwcNames = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.joining(","));
// Allocating shards on the BWC nodes to makes sure that taking snapshot happens on those nodes.
Settings.Builder settings = Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), between(5, 10))
.put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)
.put("index.routing.allocation.include._name", bwcNames);
final String index = "test-snapshot-index";
createIndex(index, settings.build());
indexDocs(index, 0, between(50, 100));
ensureGreen(index);
assertOK(client().performRequest(new Request("POST", index + "/_refresh")));
request = new Request("PUT", "/_snapshot/repo/bwc-snapshot");
request.addParameter("wait_for_completion", "true");
request.setJsonEntity("{\"indices\": \"" + index + "\"}");
assertOK(client().performRequest(request));
// Allocating shards on all nodes, taking snapshots should happen on all nodes.
updateIndexSettings(index, Settings.builder().putNull("index.routing.allocation.include._name"));
ensureGreen(index);
assertOK(client().performRequest(new Request("POST", index + "/_refresh")));
request = new Request("PUT", "/_snapshot/repo/mixed-snapshot");
request.addParameter("wait_for_completion", "true");
request.setJsonEntity("{\"indices\": \"" + index + "\"}");
}
public void testSyncedFlushTransition() throws Exception {
Nodes nodes = buildNodeAndVersions();
assumeTrue("bwc version is on 1.x or Legacy 7.x", nodes.getBWCVersion().before(Version.V_2_0_0));
assumeFalse("no new node found", nodes.getNewNodes().isEmpty());
assumeFalse("no bwc node found", nodes.getBWCNodes().isEmpty());
// Allocate shards to new nodes then verify synced flush requests processed by old nodes/new nodes
String newNodes = nodes.getNewNodes().stream().map(Node::getNodeName).collect(Collectors.joining(","));
int numShards = randomIntBetween(1, 10);
int numOfReplicas = randomIntBetween(0, nodes.getNewNodes().size() - 1);
int totalShards = numShards * (numOfReplicas + 1);
final String index = "test_synced_flush";
createIndex(index, Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), numShards)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numOfReplicas)
.put("index.routing.allocation.include._name", newNodes).build());
ensureGreen(index);
indexDocs(index, randomIntBetween(0, 100), between(1, 100));
try (RestClient oldNodeClient = buildClient(restClientSettings(),
nodes.getBWCNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) {
Request request = new Request("POST", index + "/_flush/synced");
assertBusy(() -> {
ResponseException responseException = expectThrows(ResponseException.class, () -> oldNodeClient.performRequest(request));
assertThat(responseException.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus()));
assertThat(responseException.getResponse().getWarnings(),
contains("Synced flush is deprecated and will be removed in 3.0. Use flush at _/flush or /{index}/_flush instead."));
Map result = ObjectPath.createFromResponse(responseException.getResponse()).evaluate("_shards");
assertThat(result.get("total"), equalTo(totalShards));
assertThat(result.get("successful"), equalTo(0));
assertThat(result.get("failed"), equalTo(totalShards));
});
Map stats = entityAsMap(client().performRequest(new Request("GET", index + "/_stats?level=shards")));
assertThat(XContentMapValues.extractValue("indices." + index + ".total.translog.uncommitted_operations", stats), equalTo(0));
}
indexDocs(index, randomIntBetween(0, 100), between(1, 100));
try (RestClient newNodeClient = buildClient(restClientSettings(),
nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) {
Request request = new Request("POST", index + "/_flush/synced");
List warningMsg = Arrays.asList("Synced flush was removed and a normal flush was performed instead. " +
"This transition will be removed in a future version.");
RequestOptions.Builder requestOptionsBuilder = RequestOptions.DEFAULT.toBuilder();
requestOptionsBuilder.setWarningsHandler(warnings -> warnings.equals(warningMsg) == false);
request.setOptions(requestOptionsBuilder);
assertBusy(() -> {
Map result = ObjectPath.createFromResponse(newNodeClient.performRequest(request)).evaluate("_shards");
assertThat(result.get("total"), equalTo(totalShards));
assertThat(result.get("successful"), equalTo(totalShards));
assertThat(result.get("failed"), equalTo(0));
});
Map stats = entityAsMap(client().performRequest(new Request("GET", index + "/_stats?level=shards")));
assertThat(XContentMapValues.extractValue("indices." + index + ".total.translog.uncommitted_operations", stats), equalTo(0));
}
}
private void assertCount(final String index, final String preference, final int expectedCount) throws IOException {
Request request = new Request("GET", index + "/_count");
request.addParameter("preference", preference);
final Response response = client().performRequest(request);
assertOK(response);
final int actualCount = Integer.parseInt(ObjectPath.createFromResponse(response).evaluate("count").toString());
assertThat(actualCount, equalTo(expectedCount));
}
private void assertVersion(final String index, final int docId, final String preference, final int expectedVersion) throws IOException {
Request request = new Request("GET", index + "/_doc/" + docId);
request.addParameter("preference", preference);
final Response response = client().performRequest(request);
assertOK(response);
final int actualVersion = Integer.parseInt(ObjectPath.createFromResponse(response).evaluate("_version").toString());
assertThat("version mismatch for doc [" + docId + "] preference [" + preference + "]", actualVersion, equalTo(expectedVersion));
}
private void assertSeqNoOnShards(String index, Nodes nodes, int numDocs, RestClient client)
throws Exception {
assertBusy(() -> {
try {
List shards = buildShards(index, nodes, client);
Shard primaryShard = shards.stream().filter(Shard::isPrimary).findFirst().get();
assertNotNull("failed to find primary shard", primaryShard);
final long expectedGlobalCkp = numDocs - 1;
final long expectMaxSeqNo = numDocs - 1;
logger.info("primary resolved to node {}", primaryShard.getNode());
for (Shard shard : shards) {
final SeqNoStats seqNoStats = shard.getSeqNoStats();
logger.info("stats for {}, primary [{}]: [{}]", shard.getNode(), shard.isPrimary(), seqNoStats);
assertThat("max_seq no on " + shard.getNode() + " is wrong", seqNoStats.getMaxSeqNo(), equalTo(expectMaxSeqNo));
assertThat("localCheckpoint no on " + shard.getNode() + " is wrong",
seqNoStats.getLocalCheckpoint(), equalTo(expectMaxSeqNo));
assertThat("globalCheckpoint no on " + shard.getNode() + " is wrong",
seqNoStats.getGlobalCheckpoint(), equalTo(expectedGlobalCkp));
}
} catch (IOException e) {
throw new AssertionError("unexpected io exception", e);
}
});
}
private List buildShards(String index, Nodes nodes, RestClient client) throws IOException {
Request request = new Request("GET", index + "/_stats");
request.addParameter("level", "shards");
Response response = client.performRequest(request);
List