/*
 * SPDX-License-Identifier: Apache-2.0
 *
 * The OpenSearch Contributors require contributions made to
 * this file be licensed under the Apache-2.0 license or a
 * compatible open source license.
 */

/*
 * Licensed to Elasticsearch under one or more contributor
 * license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright
 * ownership. Elasticsearch licenses this file to you under
 * the Apache License, Version 2.0 (the "License"); you may
 * not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

/*
 * Modifications Copyright OpenSearch Contributors. See
 * GitHub history for details.
 */

package org.opensearch.discovery;

import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.admin.indices.stats.ShardStats;
import org.opensearch.action.bulk.BulkRequestBuilder;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.NoClusterManagerBlockService;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.disruption.BlockClusterManagerServiceOnClusterManager;
import org.opensearch.test.disruption.IntermittentLongGCDisruption;
import org.opensearch.test.disruption.NetworkDisruption;
import org.opensearch.test.disruption.NetworkDisruption.TwoPartitions;
import org.opensearch.test.disruption.ServiceDisruptionScheme;
import org.opensearch.test.disruption.SingleNodeDisruption;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
import static org.junit.Assume.assumeThat;

/**
 * Tests relating to the loss of the cluster-manager.
 */
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class ClusterManagerDisruptionIT extends AbstractDisruptionTestCase {

    /**
     * Test that cluster recovers from a long GC on cluster-manager that causes other nodes to elect a new one
     */
    public void testClusterManagerNodeGCs() throws Exception {
        List<String> nodes = startCluster(3);
        assumeThat("Thread::resume / Thread::suspend are not supported anymore", Runtime.version(), lessThan(Runtime.Version.parse("20")));

        String oldClusterManagerNode = internalCluster().getClusterManagerName();
        // a very long GC, but it's OK as we remove the disruption when it has had an effect
        SingleNodeDisruption clusterManagerNodeDisruption = new IntermittentLongGCDisruption(
            random(),
            oldClusterManagerNode,
            100,
            200,
            30000,
            60000
        );

        internalCluster().setDisruptionScheme(clusterManagerNodeDisruption);
        clusterManagerNodeDisruption.startDisrupting();

        Set<String> oldNonClusterManagerNodesSet = new HashSet<>(nodes);
        oldNonClusterManagerNodesSet.remove(oldClusterManagerNode);

        List<String> oldNonClusterManagerNodes = new ArrayList<>(oldNonClusterManagerNodesSet);

        logger.info("waiting for nodes to de-elect cluster-manager [{}]", oldClusterManagerNode);
        for (String node : oldNonClusterManagerNodesSet) {
            assertDifferentClusterManager(node, oldClusterManagerNode);
        }

        logger.info("waiting for nodes to elect a new cluster-manager");
        ensureStableCluster(2, oldNonClusterManagerNodes.get(0));

        // restore GC
        clusterManagerNodeDisruption.stopDisrupting();
        final TimeValue waitTime = new TimeValue(
            DISRUPTION_HEALING_OVERHEAD.millis() + clusterManagerNodeDisruption.expectedTimeToHeal().millis()
        );
        ensureStableCluster(3, waitTime, false, oldNonClusterManagerNodes.get(0));

        // make sure all nodes agree on cluster-manager
        String newClusterManager = internalCluster().getClusterManagerName();
        assertThat(newClusterManager, not(equalTo(oldClusterManagerNode)));
        assertClusterManager(newClusterManager, nodes);
    }

    /**
     * This test isolates the cluster-manager from rest of the cluster, waits for a new cluster-manager to be elected, restores the partition
     * and verifies that all node agree on the new cluster state
     */
    public void testIsolateClusterManagerAndVerifyClusterStateConsensus() throws Exception {
        final List<String> nodes = startCluster(3);

        assertAcked(
            prepareCreate("test").setSettings(
                Settings.builder()
                    .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2))
                    .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, randomInt(2))
            )
        );

        ensureGreen();
        String isolatedNode = internalCluster().getClusterManagerName();
        TwoPartitions partitions = isolateNode(isolatedNode);
        NetworkDisruption networkDisruption = addRandomDisruptionType(partitions);
        networkDisruption.startDisrupting();

        String nonIsolatedNode = partitions.getMajoritySide().iterator().next();

        // make sure cluster reforms
        ensureStableCluster(2, nonIsolatedNode);

        // make sure isolated need picks up on things.
        assertNoClusterManager(isolatedNode, TimeValue.timeValueSeconds(40));

        // restore isolation
        networkDisruption.stopDisrupting();

        for (String node : nodes) {
            ensureStableCluster(
                3,
                new TimeValue(DISRUPTION_HEALING_OVERHEAD.millis() + networkDisruption.expectedTimeToHeal().millis()),
                true,
                node
            );
        }

        logger.info("issue a reroute");
        // trigger a reroute now, instead of waiting for the background reroute of RerouteService
        assertAcked(client().admin().cluster().prepareReroute());
        // and wait for it to finish and for the cluster to stabilize
        ensureGreen("test");

        // verify all cluster states are the same
        // use assert busy to wait for cluster states to be applied (as publish_timeout has low value)
        assertBusy(() -> {
            ClusterState state = null;
            for (String node : nodes) {
                ClusterState nodeState = getNodeClusterState(node);
                if (state == null) {
                    state = nodeState;
                    continue;
                }
                // assert nodes are identical
                try {
                    assertEquals("unequal versions", state.version(), nodeState.version());
                    assertEquals("unequal node count", state.nodes().getSize(), nodeState.nodes().getSize());
                    assertEquals(
                        "different cluster-managers ",
                        state.nodes().getClusterManagerNodeId(),
                        nodeState.nodes().getClusterManagerNodeId()
                    );
                    assertEquals("different meta data version", state.metadata().version(), nodeState.metadata().version());
                    assertEquals("different routing", state.routingTable().toString(), nodeState.routingTable().toString());
                } catch (AssertionError t) {
                    fail(
                        "failed comparing cluster state: "
                            + t.getMessage()
                            + "\n"
                            + "--- cluster state of node ["
                            + nodes.get(0)
                            + "]: ---\n"
                            + state
                            + "\n--- cluster state ["
                            + node
                            + "]: ---\n"
                            + nodeState
                    );
                }

            }
        });
    }

    /**
     * Verify that the proper block is applied when nodes lose their cluster-manager
     */
    public void testVerifyApiBlocksDuringPartition() throws Exception {
        internalCluster().startNodes(
            3,
            Settings.builder().putNull(NoClusterManagerBlockService.NO_CLUSTER_MANAGER_BLOCK_SETTING.getKey()).build()
        );

        // Makes sure that the get request can be executed on each node locally:
        assertAcked(
            prepareCreate("test").setSettings(
                Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2)
            )
        );

        // Everything is stable now, it is now time to simulate evil...
        // but first make sure we have no initializing shards and all is green
        // (waiting for green here, because indexing / search in a yellow index is fine as long as no other nodes go down)
        ensureGreen("test");

        TwoPartitions partitions = TwoPartitions.random(random(), internalCluster().getNodeNames());
        NetworkDisruption networkDisruption = addRandomDisruptionType(partitions);

        assertEquals(1, partitions.getMinoritySide().size());
        final String isolatedNode = partitions.getMinoritySide().iterator().next();
        assertEquals(2, partitions.getMajoritySide().size());
        final String nonIsolatedNode = partitions.getMajoritySide().iterator().next();

        // Simulate a network issue between the unlucky node and the rest of the cluster.
        networkDisruption.startDisrupting();

        // The unlucky node must report *no* cluster-manager node, since it can't connect to cluster-manager and in fact it should
        // continuously ping until network failures have been resolved. However
        // It may a take a bit before the node detects it has been cut off from the elected cluster-manager
        logger.info("waiting for isolated node [{}] to have no cluster-manager", isolatedNode);
        assertNoClusterManager(
            isolatedNode,
            NoClusterManagerBlockService.NO_CLUSTER_MANAGER_BLOCK_METADATA_WRITES,
            TimeValue.timeValueSeconds(30)
        );

        logger.info("wait until elected cluster-manager has been removed and a new 2 node cluster was from (via [{}])", isolatedNode);
        ensureStableCluster(2, nonIsolatedNode);

        for (String node : partitions.getMajoritySide()) {
            ClusterState nodeState = getNodeClusterState(node);
            boolean success = true;
            if (nodeState.nodes().getClusterManagerNode() == null) {
                success = false;
            }
            if (!nodeState.blocks().global().isEmpty()) {
                success = false;
            }
            if (!success) {
                fail(
                    "node ["
                        + node
                        + "] has no cluster-manager or has blocks, despite of being on the right side of the partition. State dump:\n"
                        + nodeState
                );
            }
        }

        networkDisruption.stopDisrupting();

        // Wait until the cluster-manager node sees al 3 nodes again.
        ensureStableCluster(3, new TimeValue(DISRUPTION_HEALING_OVERHEAD.millis() + networkDisruption.expectedTimeToHeal().millis()));

        logger.info(
            "Verify no cluster-manager block with {} set to {}",
            NoClusterManagerBlockService.NO_CLUSTER_MANAGER_BLOCK_SETTING.getKey(),
            "all"
        );
        client().admin()
            .cluster()
            .prepareUpdateSettings()
            .setTransientSettings(Settings.builder().put(NoClusterManagerBlockService.NO_CLUSTER_MANAGER_BLOCK_SETTING.getKey(), "all"))
            .get();

        networkDisruption.startDisrupting();

        // The unlucky node must report *no* cluster-manager node, since it can't connect to cluster-manager and in fact it should
        // continuously ping until network failures have been resolved. However
        // It may a take a bit before the node detects it has been cut off from the elected cluster-manager
        logger.info("waiting for isolated node [{}] to have no cluster-manager", isolatedNode);
        assertNoClusterManager(isolatedNode, NoClusterManagerBlockService.NO_CLUSTER_MANAGER_BLOCK_ALL, TimeValue.timeValueSeconds(30));

        // make sure we have stable cluster & cross partition recoveries are canceled by the removal of the missing node
        // the unresponsive partition causes recoveries to only time out after 15m (default) and these will cause
        // the test to fail due to unfreed resources
        ensureStableCluster(2, nonIsolatedNode);

    }

    public void testMappingTimeout() throws Exception {
        startCluster(3);
        createIndex(
            "test",
            Settings.builder()
                .put("index.number_of_shards", 1)
                .put("index.number_of_replicas", 1)
                .put("index.routing.allocation.exclude._name", internalCluster().getClusterManagerName())
                .build()
        );

        // create one field
        index("test", "doc", "1", "{ \"f\": 1 }");

        ensureGreen();

        assertAcked(
            client().admin()
                .cluster()
                .prepareUpdateSettings()
                .setTransientSettings(Settings.builder().put("indices.mapping.dynamic_timeout", "1ms"))
        );

        ServiceDisruptionScheme disruption = new BlockClusterManagerServiceOnClusterManager(random());
        setDisruptionScheme(disruption);

        disruption.startDisrupting();

        BulkRequestBuilder bulk = client().prepareBulk();
        bulk.add(client().prepareIndex("test").setId("2").setSource("{ \"f\": 1 }", XContentType.JSON));
        bulk.add(client().prepareIndex("test").setId("3").setSource("{ \"g\": 1 }", XContentType.JSON));
        bulk.add(client().prepareIndex("test").setId("4").setSource("{ \"f\": 1 }", XContentType.JSON));
        BulkResponse bulkResponse = bulk.get();
        assertTrue(bulkResponse.hasFailures());

        disruption.stopDisrupting();

        assertBusy(() -> {
            IndicesStatsResponse stats = client().admin().indices().prepareStats("test").clear().get();
            for (ShardStats shardStats : stats.getShards()) {
                assertThat(
                    shardStats.getShardRouting().toString(),
                    shardStats.getSeqNoStats().getGlobalCheckpoint(),
                    equalTo(shardStats.getSeqNoStats().getLocalCheckpoint())
                );
            }
        });

    }
}