/*
 * SPDX-License-Identifier: Apache-2.0
 *
 * The OpenSearch Contributors require contributions made to
 * this file be licensed under the Apache-2.0 license or a
 * compatible open source license.
 *
 * Modifications Copyright OpenSearch Contributors. See
 * GitHub history for details.
 */

package org.opensearch.ad.ratelimit;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Random;
import java.util.concurrent.Semaphore;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ad.breaker.ADCircuitBreakerService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.NodeStateManager;
import org.opensearch.timeseries.TimeSeriesAnalyticsPlugin;

/**
 * A queue to run concurrent requests (either batch or single request).
 * The concurrency is configurable. The callers use the put method to put requests
 * in and the queue tries to execute them if there are concurrency slots.
 *
 * @param <RequestType> Individual request type that is a subtype of ADRequest
 */
public abstract class ConcurrentWorker<RequestType extends QueuedRequest> extends RateLimitedRequestWorker<RequestType> {
    private static final Logger LOG = LogManager.getLogger(ConcurrentWorker.class);

    private Semaphore permits;

    private Instant lastExecuteTime;
    private Duration executionTtl;

    /**
     *
     * Constructor with dependencies and configuration.
     *
     * @param queueName queue's name
     * @param heapSizeInBytes ES heap size
     * @param singleRequestSizeInBytes single request's size in bytes
     * @param maxHeapPercentForQueueSetting max heap size used for the queue. Used for
     *   rate AD's usage on ES threadpools.
     * @param clusterService Cluster service accessor
     * @param random Random number generator
     * @param adCircuitBreakerService AD Circuit breaker service
     * @param threadPool threadpool accessor
     * @param settings Cluster settings getter
     * @param maxQueuedTaskRatio maximum queued tasks ratio in ES threadpools
     * @param clock Clock to get current time
     * @param mediumSegmentPruneRatio the percent of medium priority requests to prune when the queue is full
     * @param lowSegmentPruneRatio the percent of low priority requests to prune when the queue is full
     * @param maintenanceFreqConstant a constant help define the frequency of maintenance.  We cannot do
     *   the expensive maintenance too often.
     * @param concurrencySetting Max concurrent processing of the queued events
     * @param executionTtl Max execution time of a single request
     * @param stateTtl max idle state duration.  Used to clean unused states.
     * @param nodeStateManager node state accessor
     */
    public ConcurrentWorker(
        String queueName,
        long heapSizeInBytes,
        int singleRequestSizeInBytes,
        Setting<Float> maxHeapPercentForQueueSetting,
        ClusterService clusterService,
        Random random,
        ADCircuitBreakerService adCircuitBreakerService,
        ThreadPool threadPool,
        Settings settings,
        float maxQueuedTaskRatio,
        Clock clock,
        float mediumSegmentPruneRatio,
        float lowSegmentPruneRatio,
        int maintenanceFreqConstant,
        Setting<Integer> concurrencySetting,
        Duration executionTtl,
        Duration stateTtl,
        NodeStateManager nodeStateManager
    ) {
        super(
            queueName,
            heapSizeInBytes,
            singleRequestSizeInBytes,
            maxHeapPercentForQueueSetting,
            clusterService,
            random,
            adCircuitBreakerService,
            threadPool,
            settings,
            maxQueuedTaskRatio,
            clock,
            mediumSegmentPruneRatio,
            lowSegmentPruneRatio,
            maintenanceFreqConstant,
            stateTtl,
            nodeStateManager
        );

        this.permits = new Semaphore(concurrencySetting.get(settings));
        clusterService.getClusterSettings().addSettingsUpdateConsumer(concurrencySetting, it -> permits = new Semaphore(it));

        this.lastExecuteTime = clock.instant();
        this.executionTtl = executionTtl;
    }

    @Override
    public void maintenance() {
        super.maintenance();

        if (lastExecuteTime.plus(executionTtl).isBefore(clock.instant()) && permits.availablePermits() == 0 && false == isQueueEmpty()) {
            LOG.warn("previous execution has been running for too long.  Maybe there are bugs.");

            // Release one permit. This is a stop gap solution as I don't know
            // whether the system is under heavy workload or not. Release multiple
            // permits might cause the situation even worse. So I am conservative here.
            permits.release();
        }
    }

    /**
     * try to execute queued requests if there are concurrency slots and return right away.
     */
    @Override
    protected void triggerProcess() {
        threadPool.executor(TimeSeriesAnalyticsPlugin.AD_THREAD_POOL_NAME).execute(() -> {
            if (permits.tryAcquire()) {
                try {
                    lastExecuteTime = clock.instant();
                    execute(() -> {
                        permits.release();
                        process();
                    }, () -> { permits.release(); });
                } catch (Exception e) {
                    permits.release();
                    // throw to the root level to catch
                    throw e;
                }
            }
        });
    }

    /**
     * Execute requests in toProcess.  The implementation needs to call cleanUp after done.
     * The 1st callback is executed after processing one request. So we keep looking for
     * new requests if there is any after finishing one request. Otherwise, just release
     * (the 2nd callback) without calling process.
     * @param afterProcessCallback callback after processing requests
     * @param emptyQueueCallback callback for empty queues
     */
    protected abstract void execute(Runnable afterProcessCallback, Runnable emptyQueueCallback);
}