package org.opensearch.replication.bwc;

import org.apache.hc.core5.http.io.entity.EntityUtils
import org.assertj.core.api.Assertions
import org.junit.Assert
import org.junit.BeforeClass
import org.opensearch.action.DocWriteResponse
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest
import org.opensearch.action.delete.DeleteRequest
import org.opensearch.action.get.GetRequest
import org.opensearch.action.index.IndexRequest
import org.opensearch.client.Request
import org.opensearch.client.RequestOptions
import org.opensearch.client.RestHighLevelClient
import org.opensearch.client.indices.CreateIndexRequest
import org.opensearch.replication.MultiClusterAnnotations
import org.opensearch.replication.MultiClusterRestTestCase
import org.opensearch.replication.StartReplicationRequest
import org.opensearch.replication.startReplication
import org.opensearch.test.OpenSearchTestCase.assertBusy
import org.opensearch.test.rest.OpenSearchRestTestCase
import java.util.Collections
import java.util.concurrent.TimeUnit
import java.util.stream.Collectors


const val LEADER = "bwcLeader"
const val FOLLOWER = "bwcFollower"
const val NUM_NODES = 3
const val REPLICATION_PLUGIN_NAME = "opensearch-cross-cluster-replication"
const val LEADER_INDEX = "bwc_test_index"
const val FOLLOWER_INDEX = "bwc_test_index"
const val CONNECTION_NAME = "bwc_connection"

/*
    Verifies that replication is working with following upgrade scenarios:
    - mixed cluster: where only one of the nodes in cluster has upgraded to the new version.
    - rolling restart: where all nodes have upgraded to the latest version one by one.
    - full cluster restart: where all nodes have simultaneously upgraded to the latest version.
    - bwcTestSuite: Runs all 3 scenarios above.

    Here is the tasks order for each scenario:
    - mixed cluster:  oldVersionClusterTask0 --> mixedClusterTask
    - rolling restart:  oldVersionClusterTask0 --> mixedClusterTask -> twoThirdsUpgradedClusterTask -> rollingUpgradeClusterTask
    - full cluster restart:  oldVersionClusterTask1 --> fullRestartClusterTask
 */
class BackwardsCompatibilityIT : MultiClusterRestTestCase() {
    private val clusterSuffix = System.getProperty("tests.cluster_suffix")
    private val leaderName = "${LEADER}$clusterSuffix"
    private val followerName = "${FOLLOWER}$clusterSuffix"

    companion object {
        @BeforeClass
        @JvmStatic
        fun setupTestClusters() {
            val suffix = System.getProperty("tests.cluster_suffix")
            val leader = "${LEADER}$suffix"
            val follower = "${FOLLOWER}$suffix"
            val clusters = HashMap<String, TestCluster>()
            clusters.put(leader, createTestCluster(leader, true, true, true, false))
            clusters.put(follower, createTestCluster(follower, true, true, true, false))
            testClusters = clusters
        }

        enum class ClusterStatus(val value: String) {
            OLD("oldVersionClusterTask"),
            ONE_THIRD_UPGRADED("mixedClusterTask"),
            TWO_THIRD_UPGRADED("twoThirdsUpgradedClusterTask"),
            ROLLING_UPGRADED("rollingUpgradeClusterTask"),
            FULL_CLUSTER_RESTART("fullRestartClusterTask"),
            COMPLETE_SUITE("bwcTestSuite");

            companion object {
                fun from(s: String): ClusterStatus? = values().find { it.value == s }
            }
        }
    }
    @Throws(Exception::class)
    fun testReplicationPlugin() {
        when(ClusterStatus.from(System.getProperty("tests.bwcTask"))) {
            ClusterStatus.OLD -> setupReplication()
            ClusterStatus.ONE_THIRD_UPGRADED, ClusterStatus.TWO_THIRD_UPGRADED, ClusterStatus.ROLLING_UPGRADED,
                ClusterStatus.FULL_CLUSTER_RESTART -> verifyReplication()
            ClusterStatus.COMPLETE_SUITE -> {} // Do nothing as all tests have run already
            else -> {throw AssertionError("${ClusterStatus.from(System.getProperty("tests.bwcTask"))} is not a valid option for ClusterStatus")}
        }
    }

    // Set up the replication between two clusters.
    private fun setupReplication() {
        val follower = getClientForCluster(followerName)
        val leader = getClientForCluster(leaderName)

        // Verify that both clusters are up.
        leader.cluster().health(ClusterHealthRequest(), RequestOptions.DEFAULT)
        follower.cluster().health(ClusterHealthRequest(), RequestOptions.DEFAULT)

        createConnectionBetweenClusters(followerName, leaderName, CONNECTION_NAME)

        try {
            // Create an empty index on the leader and trigger replication on it
            val createIndexResponse = leader.indices().create(CreateIndexRequest(LEADER_INDEX), RequestOptions.DEFAULT)
            Assertions.assertThat(createIndexResponse.isAcknowledged).isTrue()

            follower.startReplication(StartReplicationRequest(CONNECTION_NAME, LEADER_INDEX, FOLLOWER_INDEX), waitForRestore=true)

            val source = mapOf("name" to randomAlphaOfLength(20), "age" to randomInt().toString())
            var response = leader.index(IndexRequest(LEADER_INDEX).id("1").source(source), RequestOptions.DEFAULT)
            Assertions.assertThat(response.result).isEqualTo(DocWriteResponse.Result.CREATED)

            assertBusy({
                val getResponse = follower.get(GetRequest(FOLLOWER_INDEX, "1"), RequestOptions.DEFAULT)
                Assertions.assertThat(getResponse.isExists).isTrue()
                Assertions.assertThat(getResponse.sourceAsMap).isEqualTo(source)
            }, 60, TimeUnit.SECONDS)
        } catch (e: Exception) {
            logger.info("Exception while starting the replication ${e.printStackTrace()}")
            throw e
        }
    }

    // Verifies that replication is still ongoing.
    private fun verifyReplication() {
        verifyReplicationPluginInstalled(leaderName)
        verifyReplicationPluginInstalled(followerName)

        val follower = getClientForCluster(followerName)
        val leader = getClientForCluster(leaderName)

        // Update the seed nodes.
        createConnectionBetweenClusters(followerName, leaderName, CONNECTION_NAME)

        try {
            val id = randomInt().toString()
            // Ensures that this document ID isn't already present
            leader.delete(DeleteRequest(LEADER_INDEX).id(id), RequestOptions.DEFAULT)

            val source = mapOf("name" to randomAlphaOfLength(20), "age" to randomInt().toString())
            var response = leader.index(IndexRequest(LEADER_INDEX).id(id).source(source), RequestOptions.DEFAULT)
            Assertions.assertThat(response.result).isEqualTo(DocWriteResponse.Result.CREATED)

            assertBusy({
                val getResponse = follower.get(GetRequest(FOLLOWER_INDEX, id), RequestOptions.DEFAULT)
                Assertions.assertThat(getResponse.isExists).isTrue()
                Assertions.assertThat(getResponse.sourceAsMap).isEqualTo(source)
            }, 60, TimeUnit.SECONDS)

            //Check for latest retention lease when full cluster restart is done
            if (ClusterStatus.from(System.getProperty("tests.bwcTask")) == ClusterStatus.FULL_CLUSTER_RESTART || ClusterStatus.from(
                    System.getProperty("tests.bwcTask")) == ClusterStatus.ROLLING_UPGRADED) {
                validateNewRetentionLeaseId(follower, leader)
            }

        } catch (e: Exception) {
            logger.info("Exception while verifying the replication ${e.printStackTrace()}")
            throw e
        }
    }

    private fun validateNewRetentionLeaseId(
        follower: RestHighLevelClient,
        leader: RestHighLevelClient
    ) {
            assertBusy({
                val followerClusterInfo: Map<String, Any> =
                    OpenSearchRestTestCase.entityAsMap(follower.lowLevelClient.performRequest(Request("GET", "/")))
                val clusterUUID = (followerClusterInfo["cluster_uuid"] as String)
                val clusterName = (followerClusterInfo["cluster_name"] as String)
                assert(clusterUUID.isNotEmpty())
                assert(clusterName.isNotEmpty())
                val expectedRetentionLeaseId =
                    "replication" + ":" + clusterName + ":" + clusterUUID + ":[" + LEADER_INDEX + "]"

                val retentionLeaseinfo =
                    leader.lowLevelClient.performRequest(Request("GET", "/$LEADER_INDEX/_stats/docs?level=shards"))
                val retentionLeaseInfoString = EntityUtils.toString(retentionLeaseinfo.entity)
                assertTrue(retentionLeaseInfoString.contains(expectedRetentionLeaseId))
            }, 60, TimeUnit.SECONDS)
    }

    // Verifies that replication plugin is installed on all the nodes og the cluster.
    @Throws(java.lang.Exception::class)
    private fun verifyReplicationPluginInstalled(clusterName: String) {
        val restClient = getClientForCluster(clusterName)
        for (i in 0 until NUM_NODES) {
            val responseMap = getAsMap(restClient.lowLevelClient, "_nodes/$clusterName-$i/plugins")["nodes"]
                    as Map<String, Map<String, Any>>?
            Assert.assertTrue(responseMap!!.values.isNotEmpty())
            for (response in responseMap!!.values) {
                val plugins = response["plugins"] as List<Map<String, Any>>?
                val pluginNames: Set<Any?> = plugins!!.stream().map { map: Map<String, Any> ->
                    map["name"]
                }.collect(Collectors.toSet()).orEmpty()
                Assert.assertTrue(pluginNames.contains(REPLICATION_PLUGIN_NAME))
            }
        }
    }
}