/*
 * Copyright OpenSearch Contributors
 * SPDX-License-Identifier: Apache-2.0
 */

package org.opensearch.search.asynchronous.integTests;

import org.opensearch.search.asynchronous.commons.AsynchronousSearchSingleNodeTestCase;
import org.opensearch.search.asynchronous.request.DeleteAsynchronousSearchRequest;
import org.opensearch.search.asynchronous.request.SubmitAsynchronousSearchRequest;
import org.opensearch.search.asynchronous.response.AcknowledgedResponse;
import org.opensearch.search.asynchronous.response.AsynchronousSearchResponse;
import org.opensearch.ResourceNotFoundException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.common.TriConsumer;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.query.MatchQueryBuilder;
import org.opensearch.script.Script;
import org.opensearch.script.ScriptType;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.hamcrest.Matchers;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.search.asynchronous.commons.AsynchronousSearchSingleNodeTestCase.SearchDelayPlugin.SCRIPT_NAME;
import static org.opensearch.index.query.QueryBuilders.scriptQuery;

public class DeleteAsynchronousSearchSingleNodeIT extends AsynchronousSearchSingleNodeTestCase {

    public void testDeleteAsynchronousSearchForRetainedResponseRandomTime() throws InterruptedException {
        SearchRequest searchRequest = new SearchRequest();
        searchRequest.indices("index");
        searchRequest.source(new SearchSourceBuilder().query(new MatchQueryBuilder("field", "value0")));
        SubmitAsynchronousSearchRequest submitAsynchronousSearchRequest = new SubmitAsynchronousSearchRequest(searchRequest);
        submitAsynchronousSearchRequest.keepOnCompletion(true);
        submitAsynchronousSearchRequest.waitForCompletionTimeout(TimeValue.timeValueMillis(randomLongBetween(1, 500)));
        AsynchronousSearchResponse submitResponse = executeSubmitAsynchronousSearch(client(), submitAsynchronousSearchRequest)
        .actionGet();
        assertNotNull(submitResponse);
        int concurrentRuns = randomIntBetween(20, 50);
        assertConcurrentDeletes(submitResponse.getId(),
                (numDeleteAcknowledged, numDeleteUnAcknowledged, numResourceNotFound) -> {
                    assertThat(numDeleteAcknowledged.get(), Matchers.greaterThan(0));
                    assertEquals(concurrentRuns, numDeleteAcknowledged.get() + numResourceNotFound.get() + numDeleteUnAcknowledged.get());
                }, concurrentRuns);
        assertAsynchronousSearchResourceCleanUp(submitResponse.getId());
    }

    public void testDeleteAsynchronousSearchNoRetainedResponseRandomTime() throws InterruptedException {
        SearchRequest searchRequest = new SearchRequest();
        searchRequest.indices("index");
        searchRequest.source(new SearchSourceBuilder().query(new MatchQueryBuilder("field", "value0")));
        SubmitAsynchronousSearchRequest submitAsynchronousSearchRequest = new SubmitAsynchronousSearchRequest(searchRequest);
        submitAsynchronousSearchRequest.keepOnCompletion(false);
        submitAsynchronousSearchRequest.waitForCompletionTimeout(TimeValue.timeValueMillis(randomLongBetween(1, 500)));
        AsynchronousSearchResponse submitResponse = executeSubmitAsynchronousSearch(client(), submitAsynchronousSearchRequest)
        .actionGet();
        assertNotNull(submitResponse);
        int concurrentRuns = randomIntBetween(20, 50);
        assertConcurrentDeletes(submitResponse.getId(),
                (numDeleteAcknowledged, numDeleteUnAcknowledged, numResourceNotFound) -> {
                    assertEquals(concurrentRuns, numDeleteAcknowledged.get() + numResourceNotFound.get());
                    assertEquals(0, numDeleteUnAcknowledged.get());
                }, concurrentRuns);
        assertAsynchronousSearchResourceCleanUp(submitResponse.getId());
    }

    public void testDeleteAsynchronousSearchPostCompletionNoRetainedResponse() throws InterruptedException {
        SearchRequest searchRequest = new SearchRequest();
        searchRequest.indices("index");
        searchRequest.source(new SearchSourceBuilder().query(new MatchQueryBuilder("field", "value0")));
        SubmitAsynchronousSearchRequest submitAsynchronousSearchRequest = new SubmitAsynchronousSearchRequest(searchRequest);
        submitAsynchronousSearchRequest.keepOnCompletion(false);
        submitAsynchronousSearchRequest.waitForCompletionTimeout(TimeValue.timeValueMillis(5000));
        AsynchronousSearchResponse submitResponse = executeSubmitAsynchronousSearch(client(), submitAsynchronousSearchRequest)
        .actionGet();
        assertNotNull(submitResponse);
        int concurrentRuns = randomIntBetween(20, 50);
        assertConcurrentDeletes(submitResponse.getId(),
                (numDeleteAcknowledged, numDeleteUnAcknowledged, numResourceNotFound) -> {
                    assertEquals(0, numDeleteAcknowledged.get());
                    assertEquals(0, numDeleteUnAcknowledged.get());
                    assertEquals(concurrentRuns, numResourceNotFound.get());
                }, concurrentRuns);
        assertAsynchronousSearchResourceCleanUp(submitResponse.getId());
    }

    public void testDeleteAsynchronousSearchPostCompletionForRetainedResponse() throws InterruptedException {
        SearchRequest searchRequest = new SearchRequest();
        searchRequest.indices("index");
        searchRequest.source(new SearchSourceBuilder().query(new MatchQueryBuilder("field", "value0")));
        SubmitAsynchronousSearchRequest submitAsynchronousSearchRequest = new SubmitAsynchronousSearchRequest(searchRequest);
        submitAsynchronousSearchRequest.keepOnCompletion(true);
        submitAsynchronousSearchRequest.waitForCompletionTimeout(TimeValue.timeValueMillis(5000));
        AsynchronousSearchResponse submitResponse = executeSubmitAsynchronousSearch(client(), submitAsynchronousSearchRequest)
        .actionGet();
        assertNotNull(submitResponse);
        int concurrentRuns = randomIntBetween(20, 50);
        assertConcurrentDeletes(submitResponse.getId(),
                (numDeleteAcknowledged, numDeleteUnAcknowledged, numResourceNotFound) -> {
                    assertThat(numDeleteAcknowledged.get(), Matchers.greaterThan(0));
                    assertEquals(0, numDeleteUnAcknowledged.get());
                    assertEquals(concurrentRuns, numDeleteAcknowledged.get() + numResourceNotFound.get() + numDeleteUnAcknowledged.get());
                }, concurrentRuns);
        assertAsynchronousSearchResourceCleanUp(submitResponse.getId());
    }

    public void testDeleteAsynchronousSearchInBlockedStateForRetainedResponse() throws Exception {
        List<SearchDelayPlugin> plugins = initPluginFactory();
        SearchRequest searchRequest = new SearchRequest("index");
        searchRequest.source(new SearchSourceBuilder());
        searchRequest.source().query(scriptQuery(new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap())));
        SubmitAsynchronousSearchRequest submitAsynchronousSearchRequest = new SubmitAsynchronousSearchRequest(searchRequest);
        submitAsynchronousSearchRequest.keepOnCompletion(true);
        submitAsynchronousSearchRequest.waitForCompletionTimeout(TimeValue.timeValueMillis(randomLongBetween(1, 500)));
        AsynchronousSearchResponse submitResponse = executeSubmitAsynchronousSearch(client(), submitAsynchronousSearchRequest)
        .actionGet();
        assertNotNull(submitResponse);
        int concurrentRuns = randomIntBetween(10, 20);
        assertConcurrentDeletesForBlockedSearch(submitResponse.getId(),
                (numDeleteAcknowledged, numDeleteUnAcknowledged, numResourceNotFound) -> {
                    assertThat(numDeleteAcknowledged.get(), Matchers.greaterThan(0));
                    assertEquals(concurrentRuns, numDeleteAcknowledged.get() + numResourceNotFound.get() + numDeleteUnAcknowledged.get());
                }, concurrentRuns, plugins);
        assertAsynchronousSearchResourceCleanUp(submitResponse.getId());
    }

    public void testDeleteAsynchronousSearchInBlockedStateForNoRetainedResponse() throws Exception {
        List<SearchDelayPlugin> plugins = initPluginFactory();
        SearchRequest searchRequest = new SearchRequest("index");
        searchRequest.source(new SearchSourceBuilder());
        searchRequest.source().query(scriptQuery(new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap())));
        SubmitAsynchronousSearchRequest submitAsynchronousSearchRequest = new SubmitAsynchronousSearchRequest(searchRequest);
        submitAsynchronousSearchRequest.keepOnCompletion(false);
        submitAsynchronousSearchRequest.waitForCompletionTimeout(TimeValue.timeValueMillis(randomLongBetween(1, 500)));
        AsynchronousSearchResponse submitResponse = executeSubmitAsynchronousSearch(client(), submitAsynchronousSearchRequest)
        .actionGet();
        assertNotNull(submitResponse);
        int concurrentRuns = randomIntBetween(10, 20);
        assertConcurrentDeletesForBlockedSearch(submitResponse.getId(),
                (numDeleteAcknowledged, numDeleteUnAcknowledged, numResourceNotFound) -> {
                    assertThat(numDeleteAcknowledged.get(), Matchers.greaterThan(0));
                    assertEquals(concurrentRuns, numDeleteAcknowledged.get() + numResourceNotFound.get() + numDeleteUnAcknowledged.get());
                }, concurrentRuns, plugins);
        assertAsynchronousSearchResourceCleanUp(submitResponse.getId());
    }

    private void assertConcurrentDeletesForBlockedSearch(String id, TriConsumer<AtomicInteger, AtomicInteger,
            AtomicInteger> assertionConsumer, int concurrentRuns, List<SearchDelayPlugin> plugins) throws Exception {
        AtomicInteger numDeleteAcknowledged = new AtomicInteger();
        AtomicInteger numDeleteUnAcknowledged = new AtomicInteger();
        AtomicInteger numResourceNotFound = new AtomicInteger();
        TestThreadPool testThreadPool = null;
        try {
            testThreadPool = new TestThreadPool(DeleteAsynchronousSearchSingleNodeIT.class.getName());
            int numThreads = concurrentRuns;
            List<Runnable> operationThreads = new ArrayList<>();
            CountDownLatch countDownLatch = new CountDownLatch(numThreads);
            for (int i = 0; i < numThreads; i++) {
                Runnable thread = () -> {
                    logger.info("Triggering asynchronous search delete --->");
                    DeleteAsynchronousSearchRequest deleteAsynchronousSearchRequest = new DeleteAsynchronousSearchRequest(id);
                    executeDeleteAsynchronousSearch(client(), deleteAsynchronousSearchRequest, new LatchedActionListener<>
                            (new ActionListener<AcknowledgedResponse>() {
                                @Override
                                public void onResponse(AcknowledgedResponse acknowledgedResponse) {
                                    if (acknowledgedResponse.isAcknowledged()) {
                                        numDeleteAcknowledged.incrementAndGet();
                                    } else {
                                        numDeleteUnAcknowledged.incrementAndGet();
                                    }
                                }

                                @Override
                                public void onFailure(Exception e) {
                                    if (e instanceof ResourceNotFoundException) {
                                        numResourceNotFound.incrementAndGet();
                                    }
                                }
                            }, countDownLatch));
                };
                operationThreads.add(thread);
            }
            TestThreadPool finalTestThreadPool = testThreadPool;
            operationThreads.forEach(runnable -> finalTestThreadPool.executor("generic").execute(runnable));
            countDownLatch.await();
            disableBlocks(plugins);
            assertionConsumer.apply(numDeleteAcknowledged, numDeleteUnAcknowledged, numResourceNotFound);
        } finally {
            ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS);
        }
    }

    private void assertConcurrentDeletes(String id, TriConsumer<AtomicInteger, AtomicInteger, AtomicInteger> assertionConsumer,
                                         int concurrentRuns) throws InterruptedException {
        AtomicInteger numDeleteAcknowledged = new AtomicInteger();
        AtomicInteger numDeleteUnAcknowledged = new AtomicInteger();
        AtomicInteger numResourceNotFound = new AtomicInteger();
        TestThreadPool testThreadPool = null;
        try {
            testThreadPool = new TestThreadPool(DeleteAsynchronousSearchSingleNodeIT.class.getName());
            int numThreads = concurrentRuns;
            List<Runnable> operationThreads = new ArrayList<>();
            CountDownLatch countDownLatch = new CountDownLatch(numThreads);
            for (int i = 0; i < numThreads; i++) {
                Runnable thread = () -> {
                    logger.info("Triggering asynchronous search delete --->");
                    DeleteAsynchronousSearchRequest deleteAsynchronousSearchRequest = new DeleteAsynchronousSearchRequest(id);
                    executeDeleteAsynchronousSearch(client(), deleteAsynchronousSearchRequest, new LatchedActionListener<>
                            (new ActionListener<AcknowledgedResponse>() {
                                @Override
                                public void onResponse(AcknowledgedResponse acknowledgedResponse) {
                                    if (acknowledgedResponse.isAcknowledged()) {
                                        numDeleteAcknowledged.incrementAndGet();
                                    } else {
                                        numDeleteUnAcknowledged.incrementAndGet();
                                    }
                                }

                                @Override
                                public void onFailure(Exception e) {
                                    if (e instanceof ResourceNotFoundException) {
                                        numResourceNotFound.incrementAndGet();
                                    }
                                }
                            }, countDownLatch));
                };
                operationThreads.add(thread);
            }
            TestThreadPool finalTestThreadPool = testThreadPool;
            operationThreads.forEach(runnable -> finalTestThreadPool.executor("generic").execute(runnable));
            countDownLatch.await();
            assertionConsumer.apply(numDeleteAcknowledged, numDeleteUnAcknowledged, numResourceNotFound);
        } finally {
            ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS);
        }
    }
}