/*
 * SPDX-License-Identifier: Apache-2.0
 *
 * The OpenSearch Contributors require contributions made to
 * this file be licensed under the Apache-2.0 license or a
 * compatible open source license.
 */

/*
 * Licensed to Elasticsearch under one or more contributor
 * license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright
 * ownership. Elasticsearch licenses this file to you under
 * the Apache License, Version 2.0 (the "License"); you may
 * not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

/*
 * Modifications Copyright OpenSearch Contributors. See
 * GitHub history for details.
 */

package org.opensearch.monitor.fs;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.lucene.tests.mockfile.FilterFileChannel;
import org.apache.lucene.tests.mockfile.FilterFileSystemProvider;
import org.opensearch.cluster.coordination.DeterministicTaskQueue;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.io.PathUtilsForTesting;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.monitor.StatusInfo;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.MockLogAppender;
import org.opensearch.test.junit.annotations.TestLogging;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.junit.Before;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.FileChannel;
import java.nio.file.FileSystem;
import java.nio.file.Path;
import java.nio.file.OpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.hamcrest.Matchers.equalTo;
import static org.opensearch.monitor.StatusInfo.Status.HEALTHY;
import static org.opensearch.monitor.StatusInfo.Status.UNHEALTHY;
import static org.opensearch.node.Node.NODE_NAME_SETTING;
import static org.hamcrest.Matchers.is;

public class FsHealthServiceTests extends OpenSearchTestCase {

    private DeterministicTaskQueue deterministicTaskQueue;

    @Before
    public void createObjects() {
        Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build();
        deterministicTaskQueue = new DeterministicTaskQueue(settings, random());
    }

    public void testSchedulesHealthCheckAtRefreshIntervals() throws Exception {
        long refreshInterval = randomLongBetween(1000, 12000);
        final Settings settings = Settings.builder().put(FsHealthService.REFRESH_INTERVAL_SETTING.getKey(), refreshInterval + "ms").build();
        final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
        try (NodeEnvironment env = newNodeEnvironment()) {
            FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, deterministicTaskQueue.getThreadPool(), env);
            final long startTimeMillis = deterministicTaskQueue.getCurrentTimeMillis();
            fsHealthService.doStart();
            assertFalse(deterministicTaskQueue.hasRunnableTasks());
            assertTrue(deterministicTaskQueue.hasDeferredTasks());
            int rescheduledCount = 0;
            for (int i = 1; i <= randomIntBetween(5, 10); i++) {
                if (deterministicTaskQueue.hasRunnableTasks()) {
                    deterministicTaskQueue.runRandomTask();
                } else {
                    assertThat(deterministicTaskQueue.getLatestDeferredExecutionTime(), is(refreshInterval * (rescheduledCount + 1)));
                    deterministicTaskQueue.advanceTime();
                    rescheduledCount++;
                }
                assertThat(deterministicTaskQueue.getCurrentTimeMillis() - startTimeMillis, is(refreshInterval * rescheduledCount));
            }

            fsHealthService.doStop();
            deterministicTaskQueue.runAllTasksInTimeOrder();

            assertFalse(deterministicTaskQueue.hasRunnableTasks());
            assertFalse(deterministicTaskQueue.hasDeferredTasks());
        }
    }

    public void testFailsHealthOnIOException() throws IOException {
        FileSystem fileSystem = PathUtils.getDefaultFileSystem();
        FileSystemIOExceptionProvider disruptFileSystemProvider = new FileSystemIOExceptionProvider(fileSystem);
        fileSystem = disruptFileSystemProvider.getFileSystem(null);
        PathUtilsForTesting.installMock(fileSystem);
        final Settings settings = Settings.EMPTY;
        final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
        TestThreadPool testThreadPool = new TestThreadPool(getClass().getName(), settings);
        try (NodeEnvironment env = newNodeEnvironment()) {
            FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env);
            fsHealthService.new FsHealthMonitor().run();
            assertEquals(HEALTHY, fsHealthService.getHealth().getStatus());
            assertEquals("health check passed", fsHealthService.getHealth().getInfo());

            // disrupt file system
            disruptFileSystemProvider.restrictPathPrefix(""); // disrupt all paths
            disruptFileSystemProvider.injectIOException.set(true);
            fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env);
            fsHealthService.new FsHealthMonitor().run();
            assertEquals(UNHEALTHY, fsHealthService.getHealth().getStatus());
            for (Path path : env.nodeDataPaths()) {
                assertTrue(fsHealthService.getHealth().getInfo().contains(path.toString()));
            }
            assertEquals(env.nodeDataPaths().length, disruptFileSystemProvider.getInjectedPathCount());
        } finally {
            disruptFileSystemProvider.injectIOException.set(false);
            PathUtilsForTesting.teardown();
            ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS);
        }
    }

    @TestLogging(value = "org.opensearch.monitor.fs:WARN", reason = "to ensure that we log on hung IO at WARN level")
    public void testLoggingOnHungIO() throws Exception {
        long slowLogThreshold = randomLongBetween(100, 200);
        final Settings settings = Settings.builder()
            .put(FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING.getKey(), slowLogThreshold + "ms")
            .build();
        FileSystem fileSystem = PathUtils.getDefaultFileSystem();
        TestThreadPool testThreadPool = new TestThreadPool(getClass().getName(), settings);
        FileSystemFsyncHungProvider disruptFileSystemProvider = new FileSystemFsyncHungProvider(
            fileSystem,
            randomLongBetween(slowLogThreshold + 1, 400),
            testThreadPool
        );
        fileSystem = disruptFileSystemProvider.getFileSystem(null);
        PathUtilsForTesting.installMock(fileSystem);
        final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);

        try (
            MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(FsHealthService.class));
            NodeEnvironment env = newNodeEnvironment()
        ) {
            FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env);
            int counter = 0;
            for (Path path : env.nodeDataPaths()) {
                mockAppender.addExpectation(
                    new MockLogAppender.SeenEventExpectation(
                        "test" + ++counter,
                        FsHealthService.class.getCanonicalName(),
                        Level.WARN,
                        "health check of [" + path + "] took [*ms] which is above the warn threshold*"
                    )
                );
            }

            // disrupt file system
            disruptFileSystemProvider.injectIODelay.set(true);
            fsHealthService.new FsHealthMonitor().run();
            assertEquals(env.nodeDataPaths().length, disruptFileSystemProvider.getInjectedPathCount());
            assertBusy(mockAppender::assertAllExpectationsMatched);
        } finally {
            PathUtilsForTesting.teardown();
            ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS);
        }
    }

    public void testFailsHealthOnHungIOBeyondHealthyTimeout() throws Exception {
        long healthyTimeoutThreshold = randomLongBetween(500, 1000);
        long refreshInterval = randomLongBetween(500, 1000);
        long slowLogThreshold = randomLongBetween(100, 200);
        long delayBetweenChecks = 100;
        final Settings settings = Settings.builder()
            .put(FsHealthService.HEALTHY_TIMEOUT_SETTING.getKey(), healthyTimeoutThreshold + "ms")
            .put(FsHealthService.REFRESH_INTERVAL_SETTING.getKey(), refreshInterval + "ms")
            .put(FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING.getKey(), slowLogThreshold + "ms")
            .put(ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.getKey(), 0)// we need to verify exact time
            .build();
        FileSystem fileSystem = PathUtils.getDefaultFileSystem();
        TestThreadPool testThreadPool = new TestThreadPool(getClass().getName(), settings);
        FileSystemFsyncHungProvider disruptFileSystemProvider = new FileSystemFsyncHungProvider(fileSystem, testThreadPool);
        fileSystem = disruptFileSystemProvider.getFileSystem(null);
        PathUtilsForTesting.installMock(fileSystem);
        final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
        try (NodeEnvironment env = newNodeEnvironment()) {
            FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env);
            logger.info("--> Initial health status prior to the first monitor run");
            StatusInfo fsHealth = fsHealthService.getHealth();
            assertEquals(HEALTHY, fsHealth.getStatus());
            assertEquals("health check passed", fsHealth.getInfo());
            logger.info("--> First monitor run");
            fsHealthService.new FsHealthMonitor().run();
            fsHealth = fsHealthService.getHealth();
            assertEquals(HEALTHY, fsHealth.getStatus());
            assertEquals("health check passed", fsHealth.getInfo());
            logger.info("--> Disrupt file system");
            disruptFileSystemProvider.injectIODelay.set(true);
            final FsHealthService fsHealthSrvc = new FsHealthService(settings, clusterSettings, testThreadPool, env);
            fsHealthSrvc.doStart();
            waitUntil(
                () -> fsHealthSrvc.getHealth().getStatus() == UNHEALTHY,
                healthyTimeoutThreshold + (2 * refreshInterval),
                TimeUnit.MILLISECONDS
            );
            fsHealth = fsHealthSrvc.getHealth();
            assertEquals(UNHEALTHY, fsHealth.getStatus());
            assertEquals("healthy threshold breached", fsHealth.getInfo());
            int disruptedPathCount = disruptFileSystemProvider.getInjectedPathCount();
            assertThat(disruptedPathCount, equalTo(1));
            logger.info("--> Fix file system disruption");
            disruptFileSystemProvider.injectIODelay.set(false);
            waitUntil(
                () -> fsHealthSrvc.getHealth().getStatus() == HEALTHY,
                delayBetweenChecks + (4 * refreshInterval),
                TimeUnit.MILLISECONDS
            );
            fsHealth = fsHealthSrvc.getHealth();
            assertEquals(HEALTHY, fsHealth.getStatus());
            assertEquals("health check passed", fsHealth.getInfo());
            assertEquals(disruptedPathCount, disruptFileSystemProvider.getInjectedPathCount());
            fsHealthSrvc.doStop();
        } finally {
            PathUtilsForTesting.teardown();
            ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS);
        }
    }

    public void testFailsHealthOnSinglePathFsyncFailure() throws IOException {
        FileSystem fileSystem = PathUtils.getDefaultFileSystem();
        FileSystemFsyncIOExceptionProvider disruptFsyncFileSystemProvider = new FileSystemFsyncIOExceptionProvider(fileSystem);
        fileSystem = disruptFsyncFileSystemProvider.getFileSystem(null);
        PathUtilsForTesting.installMock(fileSystem);
        final Settings settings = Settings.EMPTY;
        final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
        TestThreadPool testThreadPool = new TestThreadPool(getClass().getName(), settings);
        try (NodeEnvironment env = newNodeEnvironment()) {
            Path[] paths = env.nodeDataPaths();
            FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env);
            fsHealthService.new FsHealthMonitor().run();
            assertEquals(HEALTHY, fsHealthService.getHealth().getStatus());
            assertEquals("health check passed", fsHealthService.getHealth().getInfo());

            // disrupt file system fsync on single path
            disruptFsyncFileSystemProvider.injectIOException.set(true);
            String disruptedPath = randomFrom(paths).toString();
            disruptFsyncFileSystemProvider.restrictPathPrefix(disruptedPath);
            fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env);
            fsHealthService.new FsHealthMonitor().run();
            assertEquals(UNHEALTHY, fsHealthService.getHealth().getStatus());
            assertThat(fsHealthService.getHealth().getInfo(), is("health check failed on [" + disruptedPath + "]"));
            assertEquals(1, disruptFsyncFileSystemProvider.getInjectedPathCount());
        } finally {
            disruptFsyncFileSystemProvider.injectIOException.set(false);
            PathUtilsForTesting.teardown();
            ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS);
        }
    }

    public void testFailsHealthOnSinglePathWriteFailure() throws IOException {
        FileSystem fileSystem = PathUtils.getDefaultFileSystem();
        FileSystemIOExceptionProvider disruptWritesFileSystemProvider = new FileSystemIOExceptionProvider(fileSystem);
        fileSystem = disruptWritesFileSystemProvider.getFileSystem(null);
        PathUtilsForTesting.installMock(fileSystem);
        final Settings settings = Settings.EMPTY;
        final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
        TestThreadPool testThreadPool = new TestThreadPool(getClass().getName(), settings);
        try (NodeEnvironment env = newNodeEnvironment()) {
            Path[] paths = env.nodeDataPaths();
            FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env);
            fsHealthService.new FsHealthMonitor().run();
            assertEquals(HEALTHY, fsHealthService.getHealth().getStatus());
            assertEquals("health check passed", fsHealthService.getHealth().getInfo());

            // disrupt file system writes on single path
            String disruptedPath = randomFrom(paths).toString();
            disruptWritesFileSystemProvider.restrictPathPrefix(disruptedPath);
            disruptWritesFileSystemProvider.injectIOException.set(true);
            fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env);
            fsHealthService.new FsHealthMonitor().run();
            assertEquals(UNHEALTHY, fsHealthService.getHealth().getStatus());
            assertThat(fsHealthService.getHealth().getInfo(), is("health check failed on [" + disruptedPath + "]"));
            assertEquals(1, disruptWritesFileSystemProvider.getInjectedPathCount());
        } finally {
            disruptWritesFileSystemProvider.injectIOException.set(false);
            PathUtilsForTesting.teardown();
            ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS);
        }
    }

    public void testFailsHealthOnUnexpectedLockFileSize() throws IOException {
        FileSystem fileSystem = PathUtils.getDefaultFileSystem();
        final Settings settings = Settings.EMPTY;
        TestThreadPool testThreadPool = new TestThreadPool(getClass().getName(), settings);
        FileSystemUnexpectedLockFileSizeProvider unexpectedLockFileSizeFileSystemProvider = new FileSystemUnexpectedLockFileSizeProvider(
            fileSystem,
            1,
            testThreadPool
        );
        fileSystem = unexpectedLockFileSizeFileSystemProvider.getFileSystem(null);
        PathUtilsForTesting.installMock(fileSystem);
        final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
        try (NodeEnvironment env = newNodeEnvironment()) {
            FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env);
            fsHealthService.new FsHealthMonitor().run();
            assertEquals(HEALTHY, fsHealthService.getHealth().getStatus());
            assertEquals("health check passed", fsHealthService.getHealth().getInfo());

            // enabling unexpected file size injection
            unexpectedLockFileSizeFileSystemProvider.injectUnexpectedFileSize.set(true);

            fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env);
            fsHealthService.new FsHealthMonitor().run();
            assertEquals(UNHEALTHY, fsHealthService.getHealth().getStatus());
            assertThat(fsHealthService.getHealth().getInfo(), is("health check failed due to broken node lock"));
            assertEquals(1, unexpectedLockFileSizeFileSystemProvider.getInjectedPathCount());
        } finally {
            unexpectedLockFileSizeFileSystemProvider.injectUnexpectedFileSize.set(false);
            PathUtilsForTesting.teardown();
            ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS);
        }
    }

    private static class FileSystemIOExceptionProvider extends FilterFileSystemProvider {

        AtomicBoolean injectIOException = new AtomicBoolean();
        AtomicInteger injectedPaths = new AtomicInteger();

        private String pathPrefix;

        FileSystemIOExceptionProvider(FileSystem inner) {
            super("disrupt_fs_health://", inner);
        }

        public void restrictPathPrefix(String pathPrefix) {
            this.pathPrefix = pathPrefix;
        }

        public int getInjectedPathCount() {
            return injectedPaths.get();
        }

        @Override
        public OutputStream newOutputStream(Path path, OpenOption... options) throws IOException {
            if (injectIOException.get()) {
                assert pathPrefix != null : "must set pathPrefix before starting disruptions";
                if (path.toString().startsWith(pathPrefix) && path.toString().endsWith(FsHealthService.FsHealthMonitor.TEMP_FILE_NAME)) {
                    injectedPaths.incrementAndGet();
                    throw new IOException("fake IOException");
                }
            }
            return super.newOutputStream(path, options);
        }
    }

    private static class FileSystemFsyncIOExceptionProvider extends FilterFileSystemProvider {

        AtomicBoolean injectIOException = new AtomicBoolean();
        AtomicInteger injectedPaths = new AtomicInteger();

        private String pathPrefix = null;

        FileSystemFsyncIOExceptionProvider(FileSystem inner) {
            super("disrupt_fs_health://", inner);
        }

        public void restrictPathPrefix(String pathPrefix) {
            this.pathPrefix = pathPrefix;
        }

        public int getInjectedPathCount() {
            return injectedPaths.get();
        }

        @Override
        public FileChannel newFileChannel(Path path, Set<? extends OpenOption> options, FileAttribute<?>... attrs) throws IOException {
            return new FilterFileChannel(super.newFileChannel(path, options, attrs)) {
                @Override
                public void force(boolean metaData) throws IOException {
                    if (injectIOException.get()) {
                        assert pathPrefix != null : "must set pathPrefix before starting disruptions";
                        if (path.toString().startsWith(pathPrefix)
                            && path.toString().endsWith(FsHealthService.FsHealthMonitor.TEMP_FILE_NAME)) {
                            injectedPaths.incrementAndGet();
                            throw new IOException("fake IOException");
                        }
                    }
                    super.force(metaData);
                }
            };
        }
    }

    private static class FileSystemFsyncHungProvider extends FilterFileSystemProvider {

        AtomicBoolean injectIODelay = new AtomicBoolean();
        AtomicInteger injectedPaths = new AtomicInteger();

        private final long delay;
        private final ThreadPool threadPool;
        private static final long AWAIT_BUSY_THRESHOLD = 100L;

        FileSystemFsyncHungProvider(FileSystem inner, long delay, ThreadPool threadPool) {
            super("disrupt_fs_health://", inner);
            this.delay = delay;
            this.threadPool = threadPool;
        }

        FileSystemFsyncHungProvider(FileSystem inner, ThreadPool threadPool) {
            super("disrupt_fs_health://", inner);
            this.threadPool = threadPool;
            this.delay = Long.MAX_VALUE;
        }

        public int getInjectedPathCount() {
            return injectedPaths.get();
        }

        @Override
        public FileChannel newFileChannel(Path path, Set<? extends OpenOption> options, FileAttribute<?>... attrs) throws IOException {
            return new FilterFileChannel(super.newFileChannel(path, options, attrs)) {
                @Override
                public void force(boolean metaData) throws IOException {
                    if (injectIODelay.get()) {
                        if (path.getFileName().toString().equals(FsHealthService.FsHealthMonitor.TEMP_FILE_NAME)) {
                            injectedPaths.incrementAndGet();
                            final long startTimeMillis = threadPool.relativeTimeInMillis();
                            long timeInMillis = 1;
                            long maxWaitTimeMillis = startTimeMillis + delay >= 0 ? startTimeMillis + delay : Long.MAX_VALUE;// long
                                                                                                                             // overflow
                            do {
                                try {
                                    Thread.sleep(timeInMillis);
                                } catch (InterruptedException e) {
                                    throw new AssertionError(e);
                                }
                                timeInMillis = Math.min(AWAIT_BUSY_THRESHOLD, timeInMillis * 2);
                            } while (threadPool.relativeTimeInMillis() <= maxWaitTimeMillis && injectIODelay.get());
                        }
                    }
                    super.force(metaData);
                }
            };
        }
    }

    private static class FileSystemUnexpectedLockFileSizeProvider extends FilterFileSystemProvider {

        AtomicBoolean injectUnexpectedFileSize = new AtomicBoolean();
        AtomicInteger injectedPaths = new AtomicInteger();

        private final long size;
        private final ThreadPool threadPool;

        FileSystemUnexpectedLockFileSizeProvider(FileSystem inner, long size, ThreadPool threadPool) {
            super("disrupt_fs_health://", inner);
            this.size = size;
            this.threadPool = threadPool;
        }

        public int getInjectedPathCount() {
            return injectedPaths.get();
        }

        @Override
        public FileChannel newFileChannel(Path path, Set<? extends OpenOption> options, FileAttribute<?>... attrs) throws IOException {
            return new FilterFileChannel(super.newFileChannel(path, options, attrs)) {
                @Override
                public long size() throws IOException {
                    if (injectUnexpectedFileSize.get()) {
                        if (path.getFileName().toString().equals(NodeEnvironment.NODE_LOCK_FILENAME)) {
                            injectedPaths.incrementAndGet();
                            return size;
                        }
                    }
                    return super.size();
                }
            };
        }
    }
}