/*
* Copyright 2015-2021 floragunn GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.test.framework.cluster;

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import com.google.common.collect.ImmutableList;
import com.google.common.net.InetAddresses;
import org.apache.commons.io.FileUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.client.AdminClient;
import org.opensearch.client.Client;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.common.Strings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.http.BindHttpException;
import org.opensearch.node.PluginAwareNode;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.framework.certificate.TestCertificates;
import org.opensearch.test.framework.cluster.ClusterManager.NodeSettings;
import org.opensearch.transport.BindTransportException;

import static java.util.Objects.requireNonNull;
import static org.junit.Assert.assertEquals;
import static org.opensearch.test.framework.cluster.NodeType.CLIENT;
import static org.opensearch.test.framework.cluster.NodeType.CLUSTER_MANAGER;
import static org.opensearch.test.framework.cluster.NodeType.DATA;
import static org.opensearch.test.framework.cluster.PortAllocator.TCP;

/**
* Encapsulates all the logic to start a local OpenSearch cluster - without any configuration of the security plugin.
*
* The security plugin configuration is the job of LocalCluster, which uses this class under the hood. Thus, test code
* for the security plugin should always use LocalCluster.
*/
public class LocalOpenSearchCluster {

    static {
        System.setProperty("opensearch.enforce.bootstrap.checks", "true");
    }

    private static final Logger log = LogManager.getLogger(LocalOpenSearchCluster.class);

    private final String clusterName;
    private final ClusterManager clusterManager;
    private final NodeSettingsSupplier nodeSettingsSupplier;
    private final List<Class<? extends Plugin>> additionalPlugins;
    private final List<Node> nodes = new ArrayList<>();
    private final TestCertificates testCertificates;

    private File clusterHomeDir;
    private List<String> seedHosts;
    private List<String> initialClusterManagerHosts;
    private int retry = 0;
    private boolean started;
    private Random random = new Random();

    private File snapshotDir;

    public LocalOpenSearchCluster(
        String clusterName,
        ClusterManager clusterManager,
        NodeSettingsSupplier nodeSettingsSupplier,
        List<Class<? extends Plugin>> additionalPlugins,
        TestCertificates testCertificates
    ) {
        this.clusterName = clusterName;
        this.clusterManager = clusterManager;
        this.nodeSettingsSupplier = nodeSettingsSupplier;
        this.additionalPlugins = additionalPlugins;
        this.testCertificates = testCertificates;
        try {
            createClusterDirectory(clusterName);
        } catch (IOException e) {
            throw new IllegalStateException(e);
        }
    }

    public String getSnapshotDirPath() {
        return snapshotDir.getAbsolutePath();
    }

    private void createClusterDirectory(String clusterName) throws IOException {
        this.clusterHomeDir = Files.createTempDirectory("local_cluster_" + clusterName).toFile();
        log.debug("Cluster home directory '{}'.", clusterHomeDir.getAbsolutePath());
        this.snapshotDir = new File(this.clusterHomeDir, "snapshots");
        this.snapshotDir.mkdir();
    }

    private List<Node> getNodesByType(NodeType nodeType) {
        return nodes.stream().filter(currentNode -> currentNode.hasAssignedType(nodeType)).collect(Collectors.toList());
    }

    private long countNodesByType(NodeType nodeType) {
        return getNodesByType(nodeType).stream().count();
    }

    public void start() throws Exception {
        log.info("Starting {}", clusterName);

        int clusterManagerNodeCount = clusterManager.getClusterManagerNodes();
        int nonClusterManagerNodeCount = clusterManager.getDataNodes() + clusterManager.getClientNodes();

        SortedSet<Integer> clusterManagerNodeTransportPorts = TCP.allocate(
            clusterName,
            Math.max(clusterManagerNodeCount, 4),
            5000 + 42 * 1000 + 300
        );
        SortedSet<Integer> clusterManagerNodeHttpPorts = TCP.allocate(clusterName, clusterManagerNodeCount, 5000 + 42 * 1000 + 200);

        this.seedHosts = toHostList(clusterManagerNodeTransportPorts);
        Set<Integer> clusterManagerPorts = clusterManagerNodeTransportPorts.stream()
            .limit(clusterManagerNodeCount)
            .collect(Collectors.toSet());
        this.initialClusterManagerHosts = toHostList(clusterManagerPorts);

        started = true;

        CompletableFuture<Void> clusterManagerNodeFuture = startNodes(
            clusterManager.getClusterManagerNodeSettings(),
            clusterManagerNodeTransportPorts,
            clusterManagerNodeHttpPorts
        );

        SortedSet<Integer> nonClusterManagerNodeTransportPorts = TCP.allocate(
            clusterName,
            nonClusterManagerNodeCount,
            5000 + 42 * 1000 + 310
        );
        SortedSet<Integer> nonClusterManagerNodeHttpPorts = TCP.allocate(clusterName, nonClusterManagerNodeCount, 5000 + 42 * 1000 + 210);

        CompletableFuture<Void> nonClusterManagerNodeFuture = startNodes(
            clusterManager.getNonClusterManagerNodeSettings(),
            nonClusterManagerNodeTransportPorts,
            nonClusterManagerNodeHttpPorts
        );

        CompletableFuture.allOf(clusterManagerNodeFuture, nonClusterManagerNodeFuture).join();

        if (isNodeFailedWithPortCollision()) {
            log.info("Detected port collision for cluster manager node. Retrying.");

            retry();
            return;
        }

        log.info("Startup finished. Waiting for GREEN");

        waitForCluster(ClusterHealthStatus.GREEN, TimeValue.timeValueSeconds(10), nodes.size());

        log.info("Started: {}", this);

    }

    public String getClusterName() {
        return clusterName;
    }

    public boolean isStarted() {
        return started;
    }

    public void stop() {
        List<CompletableFuture<Boolean>> stopFutures = new ArrayList<>();
        for (Node node : nodes) {
            stopFutures.add(node.stop(2, TimeUnit.SECONDS));
        }
        CompletableFuture.allOf(stopFutures.toArray(size -> new CompletableFuture[size])).join();
    }

    public void destroy() {
        stop();
        nodes.clear();

        try {
            FileUtils.deleteDirectory(clusterHomeDir);
        } catch (IOException e) {
            log.warn("Error while deleting " + clusterHomeDir, e);
        }
    }

    public Node clientNode() {
        return findRunningNode(getNodesByType(CLIENT), getNodesByType(DATA), getNodesByType(CLUSTER_MANAGER));
    }

    public Node clusterManagerNode() {
        return findRunningNode(getNodesByType(CLUSTER_MANAGER));
    }

    public List<Node> getNodes() {
        return Collections.unmodifiableList(nodes);
    }

    public Node getNodeByName(String name) {
        return nodes.stream()
            .filter(node -> node.getNodeName().equals(name))
            .findAny()
            .orElseThrow(
                () -> new RuntimeException(
                    "No such node with name: " + name + "; available: " + nodes.stream().map(Node::getNodeName).collect(Collectors.toList())
                )
            );
    }

    private boolean isNodeFailedWithPortCollision() {
        return nodes.stream().anyMatch(Node::isPortCollision);
    }

    private void retry() throws Exception {
        retry++;

        if (retry > 10) {
            throw new RuntimeException("Detected port collisions for cluster manager node. Giving up.");
        }

        stop();

        this.nodes.clear();
        this.seedHosts = null;
        this.initialClusterManagerHosts = null;
        createClusterDirectory("local_cluster_" + clusterName + "_retry_" + retry);
        start();
    }

    @SafeVarargs
    private final Node findRunningNode(List<Node> nodes, List<Node>... moreNodes) {
        for (Node node : nodes) {
            if (node.isRunning()) {
                return node;
            }
        }

        if (moreNodes != null && moreNodes.length > 0) {
            for (List<Node> nodesList : moreNodes) {
                for (Node node : nodesList) {
                    if (node.isRunning()) {
                        return node;
                    }
                }
            }
        }

        return null;
    }

    private CompletableFuture<Void> startNodes(
        List<NodeSettings> nodeSettingList,
        SortedSet<Integer> transportPorts,
        SortedSet<Integer> httpPorts
    ) {
        Iterator<Integer> transportPortIterator = transportPorts.iterator();
        Iterator<Integer> httpPortIterator = httpPorts.iterator();
        List<CompletableFuture<StartStage>> futures = new ArrayList<>();

        for (NodeSettings nodeSettings : nodeSettingList) {
            Node node = new Node(nodeSettings, transportPortIterator.next(), httpPortIterator.next());
            futures.add(node.start());
        }

        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
    }

    public void waitForCluster(ClusterHealthStatus status, TimeValue timeout, int expectedNodeCount) throws IOException {
        Client client = clientNode().getInternalNodeClient();

        log.debug("waiting for cluster state {} and {} nodes", status.name(), expectedNodeCount);
        AdminClient adminClient = client.admin();

        final ClusterHealthResponse healthResponse = adminClient.cluster()
            .prepareHealth()
            .setWaitForStatus(status)
            .setTimeout(timeout)
            .setClusterManagerNodeTimeout(timeout)
            .setWaitForNodes("" + expectedNodeCount)
            .execute()
            .actionGet();

        if (log.isDebugEnabled()) {
            log.debug("Current ClusterState:\n{}", Strings.toString(XContentType.JSON, healthResponse));
        }

        if (healthResponse.isTimedOut()) {
            throw new IOException(
                "cluster state is " + healthResponse.getStatus().name() + " with " + healthResponse.getNumberOfNodes() + " nodes"
            );
        } else {
            log.debug("... cluster state ok {} with {} nodes", healthResponse.getStatus().name(), healthResponse.getNumberOfNodes());
        }

        assertEquals(expectedNodeCount, healthResponse.getNumberOfNodes());

    }

    @Override
    public String toString() {
        String clusterManagerNodes = nodeByTypeToString(CLUSTER_MANAGER);
        String dataNodes = nodeByTypeToString(DATA);
        String clientNodes = nodeByTypeToString(CLIENT);
        return "\nES Cluster "
            + clusterName
            + "\ncluster manager nodes: "
            + clusterManagerNodes
            + "\n  data nodes: "
            + dataNodes
            + "\nclient nodes: "
            + clientNodes
            + "\n";
    }

    private String nodeByTypeToString(NodeType type) {
        return getNodesByType(type).stream().map(Objects::toString).collect(Collectors.joining(", "));
    }

    private static List<String> toHostList(Collection<Integer> ports) {
        return ports.stream().map(port -> "127.0.0.1:" + port).collect(Collectors.toList());
    }

    private String createNextNodeName(NodeSettings nodeSettings) {
        NodeType type = nodeSettings.recognizeNodeType();
        long nodeTypeCount = countNodesByType(type);
        String nodeType = type.name().toLowerCase(Locale.ROOT);
        return nodeType + "_" + nodeTypeCount;
    }

    public class Node implements OpenSearchClientProvider {
        private final NodeType nodeType;
        private final String nodeName;
        private final NodeSettings nodeSettings;
        private final File nodeHomeDir;
        private final File dataDir;
        private final File logsDir;
        private final int transportPort;
        private final int httpPort;
        private final InetSocketAddress httpAddress;
        private final InetSocketAddress transportAddress;
        private PluginAwareNode node;
        private boolean running = false;
        private boolean portCollision = false;

        Node(NodeSettings nodeSettings, int transportPort, int httpPort) {
            this.nodeName = createNextNodeName(requireNonNull(nodeSettings, "Node settings are required."));
            this.nodeSettings = nodeSettings;
            this.nodeHomeDir = new File(clusterHomeDir, nodeName);
            this.dataDir = new File(this.nodeHomeDir, "data");
            this.logsDir = new File(this.nodeHomeDir, "logs");
            this.transportPort = transportPort;
            this.httpPort = httpPort;
            InetAddress hostAddress = InetAddresses.forString("127.0.0.1");
            this.httpAddress = new InetSocketAddress(hostAddress, httpPort);
            this.transportAddress = new InetSocketAddress(hostAddress, transportPort);

            this.nodeType = nodeSettings.recognizeNodeType();
            nodes.add(this);
        }

        boolean hasAssignedType(NodeType type) {
            return requireNonNull(type, "Node type is required.").equals(this.nodeType);
        }

        CompletableFuture<StartStage> start() {
            CompletableFuture<StartStage> completableFuture = new CompletableFuture<>();
            Class<? extends Plugin>[] mergedPlugins = nodeSettings.pluginsWithAddition(additionalPlugins);
            this.node = new PluginAwareNode(nodeSettings.containRole(NodeRole.CLUSTER_MANAGER), getOpenSearchSettings(), mergedPlugins);

            new Thread(new Runnable() {

                @Override
                public void run() {
                    try {
                        node.start();
                        running = true;
                        completableFuture.complete(StartStage.INITIALIZED);
                    } catch (BindTransportException | BindHttpException e) {
                        log.warn("Port collision detected for {}", this, e);
                        portCollision = true;
                        try {
                            node.close();
                        } catch (IOException e1) {
                            log.error(e1);
                        }

                        node = null;
                        TCP.reserve(transportPort, httpPort);

                        completableFuture.complete(StartStage.RETRY);

                    } catch (Throwable e) {
                        log.error("Unable to start {}", this, e);
                        node = null;
                        completableFuture.completeExceptionally(e);
                    }
                }
            }).start();

            return completableFuture;
        }

        public Client getInternalNodeClient() {
            return node.client();
        }

        public PluginAwareNode esNode() {
            return node;
        }

        public boolean isRunning() {
            return running;
        }

        public <X> X getInjectable(Class<X> clazz) {
            return node.injector().getInstance(clazz);
        }

        public CompletableFuture<Boolean> stop(long timeout, TimeUnit timeUnit) {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    log.info("Stopping {}", this);

                    running = false;

                    if (node != null) {
                        node.close();
                        boolean stopped = node.awaitClose(timeout, timeUnit);
                        node = null;
                        return stopped;
                    } else {
                        return false;
                    }
                } catch (Throwable e) {
                    String message = "Error while stopping " + this;
                    log.warn(message, e);
                    throw new RuntimeException(message, e);
                }
            });
        }

        @Override
        public String toString() {
            String state = running ? "RUNNING" : node != null ? "INITIALIZING" : "STOPPED";

            return nodeName + " " + state + " [" + transportPort + ", " + httpPort + "]";
        }

        public boolean isPortCollision() {
            return portCollision;
        }

        public String getNodeName() {
            return nodeName;
        }

        @Override
        public InetSocketAddress getHttpAddress() {
            return httpAddress;
        }

        @Override
        public InetSocketAddress getTransportAddress() {
            return transportAddress;
        }

        private Settings getOpenSearchSettings() {
            Settings settings = Settings.builder()
                .put(getMinimalOpenSearchSettings())
                .putList("path.repo", List.of(getSnapshotDirPath()))
                .build();

            if (nodeSettingsSupplier != null) {
                // TODO node number
                return Settings.builder().put(settings).put(nodeSettingsSupplier.get(0)).build();
            }
            return settings;
        }

        private Settings getMinimalOpenSearchSettings() {
            return Settings.builder()
                .put("node.name", nodeName)
                .putList("node.roles", createNodeRolesSettings())
                .put("cluster.name", clusterName)
                .put("path.home", nodeHomeDir.toPath())
                .put("path.data", dataDir.toPath())
                .put("path.logs", logsDir.toPath())
                .putList("cluster.initial_cluster_manager_nodes", initialClusterManagerHosts)
                .put("discovery.initial_state_timeout", "8s")
                .putList("discovery.seed_hosts", seedHosts)
                .put("transport.tcp.port", transportPort)
                .put("http.port", httpPort)
                .put("cluster.routing.allocation.disk.threshold_enabled", false)
                .put("discovery.probe.connect_timeout", "10s")
                .put("discovery.probe.handshake_timeout", "10s")
                .put("http.cors.enabled", true)
                .put("gateway.auto_import_dangling_indices", "true")
                .build();
        }

        private List<String> createNodeRolesSettings() {
            final ImmutableList.Builder<String> nodeRolesBuilder = ImmutableList.<String>builder();
            if (nodeSettings.containRole(NodeRole.DATA)) {
                nodeRolesBuilder.add("data");
            }
            if (nodeSettings.containRole(NodeRole.CLUSTER_MANAGER)) {
                nodeRolesBuilder.add("cluster_manager");
            }
            if (nodeSettings.containRole(NodeRole.REMOTE_CLUSTER_CLIENT)) {
                nodeRolesBuilder.add("remote_cluster_client");
            }
            return nodeRolesBuilder.build();
        }

        @Override
        public String getClusterName() {
            return clusterName;
        }

        @Override
        public TestCertificates getTestCertificates() {
            return testCertificates;
        }
    }

    public Random getRandom() {
        return random;
    }

}