/* * SPDX-License-Identifier: Apache-2.0 * * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ /* * Licensed to Elasticsearch under one or more contributor * license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright * ownership. Elasticsearch licenses this file to you under * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ /* * Modifications Copyright OpenSearch Contributors. See * GitHub history for details. */ package org.opensearch.discovery; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.index.CorruptIndexException; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.NoShardAvailableActionException; import org.opensearch.action.get.GetResponse; import org.opensearch.action.index.IndexRequestBuilder; import org.opensearch.action.index.IndexResponse; import org.opensearch.client.Client; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.action.shard.ShardStateAction; import org.opensearch.cluster.coordination.ClusterBootstrapService; import org.opensearch.cluster.coordination.LagDetector; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.Murmur3HashFunction; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.common.xcontent.XContentType; import org.opensearch.index.VersionType; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.indices.IndicesService; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.disruption.NetworkDisruption; import org.opensearch.test.disruption.NetworkDisruption.Bridge; import org.opensearch.test.disruption.NetworkDisruption.TwoPartitions; import org.opensearch.test.disruption.ServiceDisruptionScheme; import org.opensearch.test.junit.annotations.TestIssueLogging; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.opensearch.action.DocWriteResponse.Result.CREATED; import static org.opensearch.action.DocWriteResponse.Result.UPDATED; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.in; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.oneOf; /** * Tests various cluster operations (e.g., indexing) during disruptions. */ @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class ClusterDisruptionIT extends AbstractDisruptionTestCase { private enum ConflictMode { none, external, create; static ConflictMode randomMode() { ConflictMode[] values = values(); return values[randomInt(values.length - 1)]; } } /** * Test that we do not loose document whose indexing request was successful, under a randomly selected disruption scheme * We also collect & report the type of indexing failures that occur. *

* This test is a superset of tests run in the Jepsen test suite, with the exception of versioned updates */ @TestIssueLogging(value = "_root:DEBUG,org.opensearch.action.bulk:TRACE,org.opensearch.action.get:TRACE," + "org.opensearch.discovery:TRACE,org.opensearch.action.support.replication:TRACE," + "org.opensearch.cluster.service:TRACE,org.opensearch.indices.recovery:TRACE," + "org.opensearch.indices.cluster:TRACE,org.opensearch.index.shard:TRACE", issueUrl = "https://github.com/elastic/elasticsearch/issues/41068") public void testAckedIndexing() throws Exception { final int seconds = !(TEST_NIGHTLY && rarely()) ? 1 : 5; final String timeout = seconds + "s"; final List nodes = startCluster(rarely() ? 5 : 3); assertAcked( prepareCreate("test").setSettings( Settings.builder() .put(indexSettings()) .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2)) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, randomInt(2)) ) ); ensureGreen(); ServiceDisruptionScheme disruptionScheme = addRandomDisruptionScheme(); logger.info("disruption scheme [{}] added", disruptionScheme); final ConcurrentHashMap ackedDocs = new ConcurrentHashMap<>(); // id -> node sent. final AtomicBoolean stop = new AtomicBoolean(false); List indexers = new ArrayList<>(nodes.size()); List semaphores = new ArrayList<>(nodes.size()); final AtomicInteger idGenerator = new AtomicInteger(0); final AtomicReference countDownLatchRef = new AtomicReference<>(); final List exceptedExceptions = new CopyOnWriteArrayList<>(); final ConflictMode conflictMode = ConflictMode.randomMode(); final List fieldNames = IntStream.rangeClosed(0, randomInt(10)).mapToObj(n -> "f" + n).collect(Collectors.toList()); logger.info("starting indexers using conflict mode " + conflictMode); try { for (final String node : nodes) { final Semaphore semaphore = new Semaphore(0); semaphores.add(semaphore); final Client client = client(node); final String name = "indexer_" + indexers.size(); final int numPrimaries = getNumShards("test").numPrimaries; Thread thread = new Thread(() -> { while (!stop.get()) { String id = null; try { if (!semaphore.tryAcquire(10, TimeUnit.SECONDS)) { continue; } logger.info("[{}] Acquired semaphore and it has {} permits left", name, semaphore.availablePermits()); try { id = Integer.toString(idGenerator.incrementAndGet()); int shard = Math.floorMod(Murmur3HashFunction.hash(id), numPrimaries); logger.trace("[{}] indexing id [{}] through node [{}] targeting shard [{}]", name, id, node, shard); IndexRequestBuilder indexRequestBuilder = client.prepareIndex("test") .setId(id) .setSource(Collections.singletonMap(randomFrom(fieldNames), randomNonNegativeLong()), XContentType.JSON) .setTimeout(timeout); if (conflictMode == ConflictMode.external) { indexRequestBuilder.setVersion(randomIntBetween(1, 10)).setVersionType(VersionType.EXTERNAL); } else if (conflictMode == ConflictMode.create) { indexRequestBuilder.setCreate(true); } IndexResponse response = indexRequestBuilder.get(timeout); assertThat(response.getResult(), is(oneOf(CREATED, UPDATED))); ackedDocs.put(id, node); logger.trace("[{}] indexed id [{}] through node [{}], response [{}]", name, id, node, response); } catch (OpenSearchException e) { exceptedExceptions.add(e); final String docId = id; logger.trace(() -> new ParameterizedMessage("[{}] failed id [{}] through node [{}]", name, docId, node), e); } finally { countDownLatchRef.get().countDown(); logger.trace("[{}] decreased counter : {}", name, countDownLatchRef.get().getCount()); } } catch (InterruptedException e) { // fine - semaphore interrupt } catch (AssertionError | Exception e) { logger.info(() -> new ParameterizedMessage("unexpected exception in background thread of [{}]", node), e); } } }); thread.setName(name); thread.start(); indexers.add(thread); } int docsPerIndexer = randomInt(3); logger.info("indexing {} docs per indexer before partition", docsPerIndexer); countDownLatchRef.set(new CountDownLatch(docsPerIndexer * indexers.size())); for (Semaphore semaphore : semaphores) { semaphore.release(docsPerIndexer); } assertTrue(countDownLatchRef.get().await(1, TimeUnit.MINUTES)); for (int iter = 1 + randomInt(2); iter > 0; iter--) { logger.info("starting disruptions & indexing (iteration [{}])", iter); disruptionScheme.startDisrupting(); docsPerIndexer = 1 + randomInt(5); logger.info("indexing {} docs per indexer during partition", docsPerIndexer); countDownLatchRef.set(new CountDownLatch(docsPerIndexer * indexers.size())); Collections.shuffle(semaphores, random()); for (Semaphore semaphore : semaphores) { assertThat(semaphore.availablePermits(), equalTo(0)); semaphore.release(docsPerIndexer); } logger.info("waiting for indexing requests to complete"); assertTrue(countDownLatchRef.get().await(docsPerIndexer * seconds * 1000 + 2000, TimeUnit.MILLISECONDS)); logger.info("stopping disruption"); disruptionScheme.stopDisrupting(); for (String node : internalCluster().getNodeNames()) { ensureStableCluster( nodes.size(), TimeValue.timeValueMillis(disruptionScheme.expectedTimeToHeal().millis() + DISRUPTION_HEALING_OVERHEAD.millis()), true, node ); } // in case of a bridge partition, shard allocation can fail "index.allocation.max_retries" times if the cluster-manager // is the super-connected node and recovery source and target are on opposite sides of the bridge if (disruptionScheme instanceof NetworkDisruption && ((NetworkDisruption) disruptionScheme).getDisruptedLinks() instanceof Bridge) { assertBusy(() -> assertAcked(client().admin().cluster().prepareReroute().setRetryFailed(true))); } ensureGreen("test"); logger.info("validating successful docs"); assertBusy(() -> { for (String node : nodes) { try { logger.debug("validating through node [{}] ([{}] acked docs)", node, ackedDocs.size()); for (String id : ackedDocs.keySet()) { assertTrue( "doc [" + id + "] indexed via node [" + ackedDocs.get(id) + "] not found", client(node).prepareGet("test", id).setPreference("_local").get().isExists() ); } } catch (AssertionError | NoShardAvailableActionException e) { throw new AssertionError(e.getMessage() + " (checked via node [" + node + "]", e); } } }, 30, TimeUnit.SECONDS); logger.info("done validating (iteration [{}])", iter); } } finally { logger.info("shutting down indexers"); stop.set(true); for (Thread indexer : indexers) { indexer.interrupt(); indexer.join(60000); } if (exceptedExceptions.size() > 0) { StringBuilder sb = new StringBuilder(); for (Exception e : exceptedExceptions) { sb.append("\n").append(e.getMessage()); } logger.debug("Indexing exceptions during disruption: {}", sb); } } } /** * Test that a document which is indexed on the majority side of a partition, is available from the minority side, * once the partition is healed */ public void testRejoinDocumentExistsInAllShardCopies() throws Exception { List nodes = startCluster(3); assertAcked( prepareCreate("test").setSettings( Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2) ).get() ); ensureGreen("test"); nodes = new ArrayList<>(nodes); Collections.shuffle(nodes, random()); String isolatedNode = nodes.get(0); String notIsolatedNode = nodes.get(1); TwoPartitions partitions = isolateNode(isolatedNode); NetworkDisruption scheme = addRandomDisruptionType(partitions); scheme.startDisrupting(); ensureStableCluster(2, notIsolatedNode); assertFalse(client(notIsolatedNode).admin().cluster().prepareHealth("test").setWaitForYellowStatus().get().isTimedOut()); IndexResponse indexResponse = internalCluster().client(notIsolatedNode).prepareIndex("test").setSource("field", "value").get(); assertThat(indexResponse.getVersion(), equalTo(1L)); logger.info("Verifying if document exists via node[{}]", notIsolatedNode); GetResponse getResponse = internalCluster().client(notIsolatedNode) .prepareGet("test", indexResponse.getId()) .setPreference("_local") .get(); assertThat(getResponse.isExists(), is(true)); assertThat(getResponse.getVersion(), equalTo(1L)); assertThat(getResponse.getId(), equalTo(indexResponse.getId())); scheme.stopDisrupting(); ensureStableCluster(3); ensureGreen("test"); for (String node : nodes) { logger.info("Verifying if document exists after isolating node[{}] via node[{}]", isolatedNode, node); getResponse = internalCluster().client(node).prepareGet("test", indexResponse.getId()).setPreference("_local").get(); assertThat(getResponse.isExists(), is(true)); assertThat(getResponse.getVersion(), equalTo(1L)); assertThat(getResponse.getId(), equalTo(indexResponse.getId())); } } // simulate handling of sending shard failure during an isolation public void testSendingShardFailure() throws Exception { List nodes = startCluster(3); String clusterManagerNode = internalCluster().getClusterManagerName(); List nonClusterManagerNodes = nodes.stream().filter(node -> !node.equals(clusterManagerNode)).collect(Collectors.toList()); String nonClusterManagerNode = randomFrom(nonClusterManagerNodes); assertAcked( prepareCreate("test").setSettings( Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2) ) ); ensureGreen(); String nonClusterManagerNodeId = internalCluster().clusterService(nonClusterManagerNode).localNode().getId(); // fail a random shard ShardRouting failedShard = randomFrom( clusterService().state().getRoutingNodes().node(nonClusterManagerNodeId).shardsWithState(ShardRoutingState.STARTED) ); ShardStateAction service = internalCluster().getInstance(ShardStateAction.class, nonClusterManagerNode); CountDownLatch latch = new CountDownLatch(1); AtomicBoolean success = new AtomicBoolean(); String isolatedNode = randomBoolean() ? clusterManagerNode : nonClusterManagerNode; TwoPartitions partitions = isolateNode(isolatedNode); // we cannot use the NetworkUnresponsive disruption type here as it will swallow the "shard failed" request, calling neither // onSuccess nor onFailure on the provided listener. NetworkDisruption networkDisruption = new NetworkDisruption(partitions, NetworkDisruption.DISCONNECT); setDisruptionScheme(networkDisruption); networkDisruption.startDisrupting(); service.localShardFailed( failedShard, "simulated", new CorruptIndexException("simulated", (String) null), new ActionListener() { @Override public void onResponse(final Void aVoid) { success.set(true); latch.countDown(); } @Override public void onFailure(Exception e) { success.set(false); latch.countDown(); assert false; } } ); if (isolatedNode.equals(nonClusterManagerNode)) { assertNoClusterManager(nonClusterManagerNode); } else { ensureStableCluster(2, nonClusterManagerNode); } // heal the partition networkDisruption.removeAndEnsureHealthy(internalCluster()); // the cluster should stabilize ensureStableCluster(3); latch.await(); // the listener should be notified assertTrue(success.get()); // the failed shard should be gone List shards = clusterService().state().getRoutingTable().allShards("test"); for (ShardRouting shard : shards) { assertThat(shard.allocationId(), not(equalTo(failedShard.allocationId()))); } } public void testCannotJoinIfClusterManagerLostDataFolder() throws Exception { String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(); String dataNode = internalCluster().startDataOnlyNode(); internalCluster().restartNode(clusterManagerNode, new InternalTestCluster.RestartCallback() { @Override public boolean clearData(String nodeName) { return true; } @Override public Settings onNodeStopped(String nodeName) { return Settings.builder() .put(ClusterBootstrapService.INITIAL_CLUSTER_MANAGER_NODES_SETTING.getKey(), nodeName) /* * the data node might join while the cluster-manager is still not fully established as cluster-manager just yet and bypasses the join * validation that is done before adding the node to the cluster. Only the join validation when handling the publish * request takes place, but at this point the cluster state has been successfully committed, and will subsequently be * exposed to the applier. The health check below therefore sees the cluster state with the 2 nodes and thinks all is * good, even though the data node never accepted this state. What's worse is that it takes 90 seconds for the data * node to be kicked out of the cluster (lag detection). We speed this up here. */ .put(LagDetector.CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING.getKey(), "10s") .build(); } @Override public boolean validateClusterForming() { return false; } }); assertBusy(() -> { assertFalse(internalCluster().client(clusterManagerNode).admin().cluster().prepareHealth().get().isTimedOut()); assertTrue( internalCluster().client(clusterManagerNode) .admin() .cluster() .prepareHealth() .setWaitForNodes("2") .setTimeout("2s") .get() .isTimedOut() ); }, 30, TimeUnit.SECONDS); internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataNode)); // otherwise we will fail during clean-up } /** * Tests that indices are properly deleted even if there is a cluster-manager transition in between. * Test for https://github.com/elastic/elasticsearch/issues/11665 */ public void testIndicesDeleted() throws Exception { final String idxName = "test"; final List allClusterManagerEligibleNodes = internalCluster().startClusterManagerOnlyNodes(2); final String dataNode = internalCluster().startDataOnlyNode(); ensureStableCluster(3); assertAcked(prepareCreate("test")); final String clusterManagerNode1 = internalCluster().getClusterManagerName(); NetworkDisruption networkDisruption = new NetworkDisruption( new TwoPartitions(clusterManagerNode1, dataNode), NetworkDisruption.UNRESPONSIVE ); internalCluster().setDisruptionScheme(networkDisruption); networkDisruption.startDisrupting(); // We know this will time out due to the partition, we check manually below to not proceed until // the delete has been applied to the cluster-manager node and the cluster-manager eligible node. internalCluster().client(clusterManagerNode1).admin().indices().prepareDelete(idxName).setTimeout("0s").get(); // Don't restart the cluster-manager node until we know the index deletion has taken effect on cluster-manager and the // cluster-manager eligible node. assertBusy(() -> { for (String clusterManagerNode : allClusterManagerEligibleNodes) { final ClusterState clusterManagerState = internalCluster().clusterService(clusterManagerNode).state(); assertTrue("index not deleted on " + clusterManagerNode, clusterManagerState.metadata().hasIndex(idxName) == false); } }); internalCluster().restartNode(clusterManagerNode1, InternalTestCluster.EMPTY_CALLBACK); ensureYellow(); assertFalse(client().admin().indices().prepareExists(idxName).get().isExists()); } public void testRestartNodeWhileIndexing() throws Exception { startCluster(3); String index = "restart_while_indexing"; assertAcked( client().admin() .indices() .prepareCreate(index) .setSettings( Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, between(1, 2)) ) ); AtomicBoolean stopped = new AtomicBoolean(); Thread[] threads = new Thread[between(1, 4)]; AtomicInteger docID = new AtomicInteger(); Set ackedDocs = ConcurrentCollections.newConcurrentSet(); for (int i = 0; i < threads.length; i++) { threads[i] = new Thread(() -> { while (stopped.get() == false && docID.get() < 5000) { String id = Integer.toString(docID.incrementAndGet()); try { IndexResponse response = client().prepareIndex(index) .setId(id) .setSource(Collections.singletonMap("f" + randomIntBetween(1, 10), randomNonNegativeLong()), XContentType.JSON) .get(); assertThat(response.getResult(), is(oneOf(CREATED, UPDATED))); logger.info("--> index id={} seq_no={}", response.getId(), response.getSeqNo()); ackedDocs.add(response.getId()); } catch (OpenSearchException ignore) { logger.info("--> fail to index id={}", id); } } }); threads[i].start(); } ensureGreen(index); assertBusy(() -> assertThat(docID.get(), greaterThanOrEqualTo(100)), 1L, TimeUnit.MINUTES); internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback()); ensureGreen(index); assertBusy(() -> assertThat(docID.get(), greaterThanOrEqualTo(200)), 1L, TimeUnit.MINUTES); stopped.set(true); for (Thread thread : threads) { thread.join(); } ClusterState clusterState = internalCluster().clusterService().state(); for (ShardRouting shardRouting : clusterState.routingTable().allShards(index)) { String nodeName = clusterState.nodes().get(shardRouting.currentNodeId()).getName(); IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName); IndexShard shard = indicesService.getShardOrNull(shardRouting.shardId()); Set docs = IndexShardTestCase.getShardDocUIDs(shard); assertThat( "shard [" + shard.routingEntry() + "] docIds [" + docs + "] vs " + " acked docIds [" + ackedDocs + "]", ackedDocs, everyItem(is(in(docs))) ); } } }