# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
# Modifications Copyright OpenSearch Contributors. See
# GitHub history for details.
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#	http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

import errno
import functools
import json
import os
import random
import socket
import time

import pytest

from osbenchmark import client, config, version
from osbenchmark.utils import process

CONFIG_NAMES = ["in-memory-it", "os-it"]
DISTRIBUTIONS = ["1.3.9", "2.5.0"]
WORKLOADS = ["geonames", "nyc_taxis", "http_logs", "nested"]
ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))


def all_benchmark_configs(t):
    @functools.wraps(t)
    @pytest.mark.parametrize("cfg", CONFIG_NAMES)
    def wrapper(cfg, *args, **kwargs):
        t(cfg, *args, **kwargs)

    return wrapper


def random_benchmark_config(t):
    @functools.wraps(t)
    @pytest.mark.parametrize("cfg", [random.choice(CONFIG_NAMES)])
    def wrapper(cfg, *args, **kwargs):
        t(cfg, *args, **kwargs)

    return wrapper


def benchmark_in_mem(t):
    @functools.wraps(t)
    @pytest.mark.parametrize("cfg", ["in-memory-it"])
    def wrapper(cfg, *args, **kwargs):
        t(cfg, *args, **kwargs)

    return wrapper


def benchmark_os(t):
    @functools.wraps(t)
    @pytest.mark.parametrize("cfg", ["os-it"])
    def wrapper(cfg, *args, **kwargs):
        t(cfg, *args, **kwargs)

    return wrapper


def osbenchmark_command_line_for(cfg, command_line):
    return f"opensearch-benchmark {command_line} --configuration-name='{cfg}'"


def osbenchmark(cfg, command_line):
    """
    This method should be used for benchmark invocations of the all commands besides test_execution.
    These commands may have different CLI options than test_execution.
    """
    err, retcode = process.run_subprocess_with_stderr(osbenchmark_command_line_for(cfg, command_line))
    if retcode != 0:
        print(err)
    return retcode


def execute_test(cfg, command_line):
    """
    This method should be used for benchmark invocations of the test_execution command.
    It sets up some defaults for how the integration tests expect to run test_executions.
    """
    return osbenchmark(cfg, f"execute-test {command_line} --kill-running-processes --on-error='abort'")


def shell_cmd(command_line):
    """
    Executes a given command_line in a subshell.

    :param command_line: (str) The command to execute
    :return: (int) the exit code
    """

    return os.system(command_line)


def command_in_docker(command_line, python_version):
    docker_command = f"docker run --rm -v {ROOT_DIR}:/benchmark_ro:ro python:{python_version} bash -c '{command_line}'"

    return shell_cmd(docker_command)


def wait_until_port_is_free(port_number=39200, timeout=120):
    start = time.perf_counter()
    end = start + timeout
    while time.perf_counter() < end:
        c = socket.socket()
        connect_result = c.connect_ex(("127.0.0.1", port_number))
        # noinspection PyBroadException
        try:
            if connect_result == errno.ECONNREFUSED:
                c.close()
                return
            else:
                c.close()
                time.sleep(0.5)
        except Exception:
            pass

    raise TimeoutError(f"Port [{port_number}] is occupied after [{timeout}] seconds")


def check_prerequisites():
    if process.run_subprocess_with_logging("docker ps") != 0:
        raise AssertionError("Docker must be installed and the daemon must be up and running to run integration tests.")
    if process.run_subprocess_with_logging("docker-compose --help") != 0:
        raise AssertionError("Docker Compose is required to run integration tests.")


class ConfigFile:
    def __init__(self, config_name):
        self.user_home = os.getenv("BENCHMARK_HOME", os.path.expanduser("~"))
        self.benchmark_home = os.path.join(self.user_home, ".benchmark")
        if config_name is not None:
            self.config_file_name = f"benchmark-{config_name}.ini"
        else:
            self.config_file_name = "benchmark.ini"
        self.source_path = os.path.join(os.path.dirname(__file__), "resources", self.config_file_name)
        self.target_path = os.path.join(self.benchmark_home, self.config_file_name)


class TestCluster:
    def __init__(self, cfg):
        self.cfg = cfg
        self.installation_id = None
        self.http_port = None

    def install(self, distribution_version, node_name, provision_config_instance, http_port):
        self.http_port = http_port
        transport_port = http_port + 100
        try:
            err, retcode = process.run_subprocess_with_stderr(
                "opensearch-benchmark install --configuration-name={cfg} --distribution-version={dist} --build-type=tar "
                "--http-port={http_port} --node={node_name} --master-nodes="
                "{node_name} --provision-config-instance={provision_config_instance} "
                "--seed-hosts=\"127.0.0.1:{transport_port}\"".format(cfg=self.cfg,
                                                                     dist=distribution_version,
                                                                     http_port=http_port,
                                                                     node_name=node_name,
                                                                     provision_config_instance=provision_config_instance,
                                                                     transport_port=transport_port))
            if retcode != 0:
                raise AssertionError("Failed to install OpenSearch {}.".format(distribution_version), err)
            self.installation_id = json.loads(err)["installation-id"]
        except BaseException as e:
            raise AssertionError("Failed to install OpenSearch {}.".format(distribution_version), e)

    def start(self, test_execution_id):
        cmd = "start --runtime-jdk=\"bundled\" --installation-id={} --test-execution-id={}".format(self.installation_id, test_execution_id)
        if osbenchmark(self.cfg, cmd) != 0:
            raise AssertionError("Failed to start OpenSearch test cluster.")
        opensearch = client.OsClientFactory(hosts=[{"host": "127.0.0.1", "port": self.http_port}], client_options={}).create()
        client.wait_for_rest_layer(opensearch)

    def stop(self):
        if self.installation_id:
            if osbenchmark(self.cfg, "stop --installation-id={}".format(self.installation_id)) != 0:
                raise AssertionError("Failed to stop OpenSearch test cluster.")

    def __str__(self):
        return f"TestCluster[installation-id={self.installation_id}]"


class OsMetricsStore:
    VERSION = "1.3.9"

    def __init__(self):
        self.cluster = TestCluster("in-memory-it")

    def start(self):
        self.cluster.install(distribution_version=OsMetricsStore.VERSION,
                             node_name="metrics-store",
                             provision_config_instance="defaults",
                             http_port=10200)
        self.cluster.start(test_execution_id="metrics-store")

    def stop(self):
        self.cluster.stop()


def install_integration_test_config():
    def copy_config(name):
        source_path = os.path.join(os.path.dirname(__file__), "resources", f"benchmark-{name}.ini")
        f = config.ConfigFile(name)
        f.store_default_config(template_path=source_path)

    for n in CONFIG_NAMES:
        copy_config(n)


def remove_integration_test_config():
    for config_name in CONFIG_NAMES:
        os.remove(config.ConfigFile(config_name).location)


OS_METRICS_STORE = OsMetricsStore()


def get_license():
    with open(os.path.join(ROOT_DIR, 'LICENSE')) as license_file:
        return license_file.readlines()[1].strip()


def build_docker_image():
    benchmark_version = version.__version__

    env_variables = os.environ.copy()
    env_variables['BENCHMARK_VERSION'] = benchmark_version
    env_variables['BENCHMARK_LICENSE'] = get_license()

    command = f"docker build -t opensearchproject/benchmark:{benchmark_version}" \
        f" --build-arg BENCHMARK_VERSION --build-arg BENCHMARK_LICENSE " \
              f"-f {ROOT_DIR}/docker/Dockerfiles/Dockerfile-dev {ROOT_DIR}"

    if process.run_subprocess_with_logging(command, env=env_variables) != 0:
        raise AssertionError("It was not possible to build the docker image from Dockerfile-dev")


def setup_module():
    check_prerequisites()
    install_integration_test_config()
    OS_METRICS_STORE.start()


def teardown_module():
    OS_METRICS_STORE.stop()
    remove_integration_test_config()