/*
 * SPDX-License-Identifier: Apache-2.0
 *
 * The OpenSearch Contributors require contributions made to
 * this file be licensed under the Apache-2.0 license or a
 * compatible open source license.
 *
 * Modifications Copyright OpenSearch Contributors. See
 * GitHub history for details.
 */
package org.opensearch.ad.task;
import static org.opensearch.ad.MemoryTracker.Origin.HISTORICAL_SINGLE_ENTITY_DETECTOR;
import static org.opensearch.ad.constant.ADCommonMessages.DETECTOR_IS_RUNNING;
import static org.opensearch.ad.constant.ADCommonMessages.EXCEED_HISTORICAL_ANALYSIS_LIMIT;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.MAX_BATCH_TASK_PER_NODE;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.MAX_CACHED_DELETED_TASKS;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.NUM_TREES;
import static org.opensearch.timeseries.util.ParseUtils.isNullOrEmpty;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionListener;
import org.opensearch.ad.MemoryTracker;
import org.opensearch.ad.model.ADTask;
import org.opensearch.ad.model.ADTaskState;
import org.opensearch.ad.model.ADTaskType;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.timeseries.common.exception.DuplicateTaskException;
import org.opensearch.timeseries.common.exception.LimitExceededException;
import org.opensearch.timeseries.model.Entity;
import org.opensearch.transport.TransportService;
import com.amazon.randomcutforest.RandomCutForest;
import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest;
import com.google.common.collect.ImmutableList;
public class ADTaskCacheManager {
    private final Logger logger = LogManager.getLogger(ADTaskCacheManager.class);
    private volatile Integer maxAdBatchTaskPerNode;
    private volatile Integer maxCachedDeletedTask;
    private final MemoryTracker memoryTracker;
    private final int numberSize = 8;
    public static final int TASK_RETRY_LIMIT = 3;
    private final Semaphore cleanExpiredHCBatchTaskRunStatesSemaphore;
    // ===================================================================
    // Fields below are caches on coordinating node
    // ===================================================================
    /**
     * This field is to cache all detector level tasks which running on the
     * coordinating node to resolve race condition. Will check if detector id
     * exists in cache or not first. If user starts multiple tasks for the same
     * detector, we will put the first task in cache and reject following tasks.
     * 
Node: coordinating node
     * Key: detector id; Value: detector level task id
     */
    private Map detectorTasks;
    /**
     * This field is to cache all HC detector level data on coordinating node, like
     * pending/running entities, check more details in comments of {@link ADHCBatchTaskCache}.
     * Node: coordinating node
     * Key: detector id
     */
    private Map hcBatchTaskCaches;
    /**
     * This field is to cache all detectors' task slot and task lane limit on coordinating
     * node.
     * Node: coordinating node
     * Key: detector id
     */
    private Map detectorTaskSlotLimit;
    /**
     * This field is to cache all realtime tasks on coordinating node.
     * Node: coordinating node
     * Key is detector id
     */
    private Map realtimeTaskCaches;
    /**
     * This field is to cache all deleted detector level tasks on coordinating node.
     * Will try to clean up child task and AD result later.
     * Node: coordinating node
     * Check {@link ADTaskManager#cleanChildTasksAndADResultsOfDeletedTask()}
     */
    private Queue deletedDetectorTasks;
    // ===================================================================
    // Fields below are caches on worker node
    // ===================================================================
    /**
     * This field is to cache all batch tasks running on worker node. Both single
     * entity detector task and HC entity task will be cached in this field.
     * Node: worker node
     * Key: task id
     */
    private final Map batchTaskCaches;
    // ===================================================================
    // Fields below are caches on both coordinating and worker node
    // ===================================================================
    /**
     * This field is to cache HC detector batch task running state on worker node.
     * For example, is detector historical analysis cancelled or not, HC detector
     * level task state.
     * Node: worker node
     * Outer Key: detector Id; Inner Key: detector level task id
     */
    private Map> hcBatchTaskRunState;
    // ===================================================================
    // Fields below are caches on any data node serves delete detector
    // request. Check ADTaskManager#deleteADResultOfDetector
    // ===================================================================
    /**
     * This field is to cache deleted detector IDs. Hourly cron will poll this queue
     * and clean AD results. Check {@link ADTaskManager#cleanADResultOfDeletedDetector()}
     * Node: any data node servers delete detector request
     */
    private Queue deletedDetectors;
    /**
     * Constructor to create AD task cache manager.
     *
     * @param settings ES settings
     * @param clusterService ES cluster service
     * @param memoryTracker AD memory tracker
     */
    public ADTaskCacheManager(Settings settings, ClusterService clusterService, MemoryTracker memoryTracker) {
        this.maxAdBatchTaskPerNode = MAX_BATCH_TASK_PER_NODE.get(settings);
        clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_BATCH_TASK_PER_NODE, it -> maxAdBatchTaskPerNode = it);
        this.maxCachedDeletedTask = MAX_CACHED_DELETED_TASKS.get(settings);
        clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_CACHED_DELETED_TASKS, it -> maxCachedDeletedTask = it);
        this.batchTaskCaches = new ConcurrentHashMap<>();
        this.memoryTracker = memoryTracker;
        this.detectorTasks = new ConcurrentHashMap<>();
        this.hcBatchTaskCaches = new ConcurrentHashMap<>();
        this.realtimeTaskCaches = new ConcurrentHashMap<>();
        this.deletedDetectorTasks = new ConcurrentLinkedQueue<>();
        this.deletedDetectors = new ConcurrentLinkedQueue<>();
        this.detectorTaskSlotLimit = new ConcurrentHashMap<>();
        this.hcBatchTaskRunState = new ConcurrentHashMap<>();
        this.cleanExpiredHCBatchTaskRunStatesSemaphore = new Semaphore(1);
    }
    /**
     * Put AD task into cache.
     * If AD task is already in cache, will throw {@link IllegalArgumentException}
     * If there is one AD task in cache for detector, will throw {@link IllegalArgumentException}
     * If there is not enough memory for this AD task, will throw {@link LimitExceededException}
     *
     * @param adTask AD task
     */
    public synchronized void add(ADTask adTask) {
        String taskId = adTask.getTaskId();
        String detectorId = adTask.getId();
        if (contains(taskId)) {
            throw new DuplicateTaskException(DETECTOR_IS_RUNNING);
        }
        // It's possible that multiple entity tasks of one detector run on same data node.
        if (!adTask.isEntityTask() && containsTaskOfDetector(detectorId)) {
            throw new DuplicateTaskException(DETECTOR_IS_RUNNING);
        }
        checkRunningTaskLimit();
        long neededCacheSize = calculateADTaskCacheSize(adTask);
        if (!memoryTracker.canAllocateReserved(neededCacheSize)) {
            throw new LimitExceededException("Not enough memory to run detector");
        }
        memoryTracker.consumeMemory(neededCacheSize, true, HISTORICAL_SINGLE_ENTITY_DETECTOR);
        ADBatchTaskCache taskCache = new ADBatchTaskCache(adTask);
        taskCache.getCacheMemorySize().set(neededCacheSize);
        batchTaskCaches.put(taskId, taskCache);
        if (adTask.isEntityTask()) {
            ADHCBatchTaskRunState hcBatchTaskRunState = getHCBatchTaskRunState(detectorId, adTask.getDetectorLevelTaskId());
            if (hcBatchTaskRunState != null) {
                hcBatchTaskRunState.setLastTaskRunTimeInMillis(Instant.now().toEpochMilli());
            }
        }
        // clean expired HC batch task run states when new task starts on worker node.
        cleanExpiredHCBatchTaskRunStates();
    }
    /**
     * Put detector id in running detector cache.
     *
     * @param detectorId detector id
     * @param adTask AD task
     * @throws DuplicateTaskException throw DuplicateTaskException when the detector id already in cache
     */
    public synchronized void add(String detectorId, ADTask adTask) {
        if (detectorTasks.containsKey(detectorId)) {
            logger.warn("detector is already in running detector cache, detectorId: " + detectorId);
            throw new DuplicateTaskException(DETECTOR_IS_RUNNING);
        }
        logger.info("add detector in running detector cache, detectorId: {}, taskId: {}", detectorId, adTask.getTaskId());
        this.detectorTasks.put(detectorId, adTask.getTaskId());
        if (ADTaskType.HISTORICAL_HC_DETECTOR.name().equals(adTask.getTaskType())) {
            ADHCBatchTaskCache adhcBatchTaskCache = new ADHCBatchTaskCache();
            this.hcBatchTaskCaches.put(detectorId, adhcBatchTaskCache);
        }
        // If new historical analysis starts, clean its old batch task run state directly.
        hcBatchTaskRunState.remove(detectorId);
    }
    /**
     * check if current running batch task on current node exceeds
     * max running task limitation.
     * If executing task count exceeds limitation, will throw
     * {@link LimitExceededException}
     */
    public void checkRunningTaskLimit() {
        if (size() >= maxAdBatchTaskPerNode) {
            String error = EXCEED_HISTORICAL_ANALYSIS_LIMIT + ": " + maxAdBatchTaskPerNode;
            throw new LimitExceededException(error);
        }
    }
    /**
     * Get task RCF model.
     * If task doesn't exist in cache, will throw {@link java.lang.IllegalArgumentException}.
     *
     * @param taskId AD task id
     * @return RCF model
     */
    public ThresholdedRandomCutForest getTRcfModel(String taskId) {
        return getBatchTaskCache(taskId).getTRcfModel();
    }
    /**
     * Get threshhold model training data size in bytes.
     *
     * @param taskId task id
     * @return training data size in bytes
     */
    public int getThresholdModelTrainingDataSize(String taskId) {
        return getBatchTaskCache(taskId).getThresholdModelTrainingDataSize().get();
    }
    /**
     * Threshold model trained or not.
     * If task doesn't exist in cache, will throw {@link java.lang.IllegalArgumentException}.
     *
     * @param taskId AD task id
     * @return true if threshold model trained; otherwise, return false
     */
    public boolean isThresholdModelTrained(String taskId) {
        return getBatchTaskCache(taskId).isThresholdModelTrained();
    }
    /**
     * Set threshold model trained or not.
     *
     * @param taskId task id
     * @param trained threshold model trained or not
     */
    protected void setThresholdModelTrained(String taskId, boolean trained) {
        ADBatchTaskCache taskCache = getBatchTaskCache(taskId);
        taskCache.setThresholdModelTrained(trained);
    }
    /**
     * Get shingle data.
     *
     * @param taskId AD task id
     * @return shingle data
     */
    public Deque>> getShingle(String taskId) {
        return getBatchTaskCache(taskId).getShingle();
    }
    /**
     * Check if task exists in cache.
     *
     * @param taskId task id
     * @return true if task exists in cache; otherwise, return false.
     */
    public boolean contains(String taskId) {
        return batchTaskCaches.containsKey(taskId);
    }
    /**
     * Check if there is task in cache for detector.
     *
     * @param detectorId detector id
     * @return true if there is task in cache; otherwise return false
     */
    public boolean containsTaskOfDetector(String detectorId) {
        return batchTaskCaches.values().stream().filter(v -> Objects.equals(detectorId, v.getId())).findAny().isPresent();
    }
    /**
     * Get task id list of detector.
     *
     * @param detectorId detector id
     * @return list of task id
     */
    public List getTasksOfDetector(String detectorId) {
        return batchTaskCaches
            .values()
            .stream()
            .filter(v -> Objects.equals(detectorId, v.getId()))
            .map(c -> c.getTaskId())
            .collect(Collectors.toList());
    }
    /**
     * Get batch task cache. If task doesn't exist in cache, will throw
     * {@link java.lang.IllegalArgumentException}
     * We throw exception rather than return {@code Optional.empty} or null
     * here, so don't need to check task existence by writing duplicate null
     * checking code. All AD task exceptions will be handled in AD task manager.
     *
     * @param taskId task id
     * @return AD batch task cache
     */
    private ADBatchTaskCache getBatchTaskCache(String taskId) {
        if (!contains(taskId)) {
            throw new IllegalArgumentException("AD task not in cache");
        }
        return batchTaskCaches.get(taskId);
    }
    private List getBatchTaskCacheByDetectorId(String detectorId) {
        return batchTaskCaches.values().stream().filter(v -> Objects.equals(detectorId, v.getId())).collect(Collectors.toList());
    }
    /**
     * Calculate AD task cache memory usage.
     *
     * @param adTask AD task
     * @return how many bytes will consume
     */
    private long calculateADTaskCacheSize(ADTask adTask) {
        AnomalyDetector detector = adTask.getDetector();
        int dimension = detector.getEnabledFeatureIds().size() * detector.getShingleSize();
        return memoryTracker
            .estimateTRCFModelSize(
                dimension,
                NUM_TREES,
                AnomalyDetectorSettings.BATCH_BOUNDING_BOX_CACHE_RATIO,
                detector.getShingleSize().intValue(),
                false
            ) + shingleMemorySize(detector.getShingleSize(), detector.getEnabledFeatureIds().size());
    }
    /**
     * Get RCF model size in bytes.
     *
     * @param taskId task id
     * @return model size in bytes
     */
    public long getModelSize(String taskId) {
        ADBatchTaskCache batchTaskCache = getBatchTaskCache(taskId);
        ThresholdedRandomCutForest tRCF = batchTaskCache.getTRcfModel();
        RandomCutForest rcfForest = tRCF.getForest();
        int dimensions = rcfForest.getDimensions();
        int numberOfTrees = rcfForest.getNumberOfTrees();
        return memoryTracker
            .estimateTRCFModelSize(dimensions, numberOfTrees, AnomalyDetectorSettings.BATCH_BOUNDING_BOX_CACHE_RATIO, 1, false);
    }
    /**
     * Remove task from cache and refresh last run time of HC batch task run state.
     * Don't remove all detector cache here as it's possible that some entity task running on other worker nodes
     *
     * @param taskId AD task id
     * @param detectorId detector id
     * @param detectorTaskId detector level task id
     */
    public void remove(String taskId, String detectorId, String detectorTaskId) {
        ADBatchTaskCache taskCache = batchTaskCaches.get(taskId);
        if (taskCache != null) {
            logger.debug("Remove batch task from cache, task id: {}", taskId);
            memoryTracker.releaseMemory(taskCache.getCacheMemorySize().get(), true, HISTORICAL_SINGLE_ENTITY_DETECTOR);
            batchTaskCaches.remove(taskId);
            ADHCBatchTaskRunState hcBatchTaskRunState = getHCBatchTaskRunState(detectorId, detectorTaskId);
            if (hcBatchTaskRunState != null) {
                hcBatchTaskRunState.setLastTaskRunTimeInMillis(Instant.now().toEpochMilli());
            }
        }
    }
    /**
     * Only remove detector cache if no running entities.
     *
     * @param detectorId detector id
     */
    public void removeHistoricalTaskCacheIfNoRunningEntity(String detectorId) {
        ADHCBatchTaskCache taskCache = hcBatchTaskCaches.get(detectorId);
        if (taskCache != null) {
            if (taskCache.hasRunningEntity()) {
                throw new IllegalArgumentException("HC detector still has running entities");
            }
        }
        removeHistoricalTaskCache(detectorId);
    }
    /**
     * Remove detector id from running detector cache
     *
     * @param detectorId detector id
     */
    public void removeHistoricalTaskCache(String detectorId) {
        ADHCBatchTaskCache taskCache = hcBatchTaskCaches.get(detectorId);
        if (taskCache != null) {
            // this will happen only on coordinating node. When worker nodes left,
            // we will reset task state as STOPPED and clean up cache, add this warning
            // to make it easier to debug issue.
            if (hasEntity(detectorId)) {
                logger
                    .warn(
                        "There are still entities for detector. pending: {}, running: {}, temp: {}",
                        Arrays.toString(taskCache.getPendingEntities()),
                        Arrays.toString(taskCache.getRunningEntities()),
                        Arrays.toString(taskCache.getTempEntities())
                    );
            }
            taskCache.clear();
            hcBatchTaskCaches.remove(detectorId);
        }
        List tasksOfDetector = getTasksOfDetector(detectorId);
        for (String taskId : tasksOfDetector) {
            remove(taskId, null, null);
        }
        if (tasksOfDetector.size() > 0) {
            logger
                .warn(
                    "Removed historical AD task from cache for detector {}, taskId: {}",
                    detectorId,
                    Arrays.toString(tasksOfDetector.toArray(new String[0]))
                );
        }
        if (detectorTasks.containsKey(detectorId)) {
            detectorTasks.remove(detectorId);
            logger.info("Removed detector from AD task coordinating node cache, detectorId: " + detectorId);
        }
        detectorTaskSlotLimit.remove(detectorId);
        hcBatchTaskRunState.remove(detectorId);
    }
    /**
     * Cancel AD task by detector id.
     *
     * @param detectorId detector id
     * @param detectorTaskId detector level task id
     * @param reason why need to cancel task
     * @param userName user name
     * @return AD task cancellation state
     */
    public ADTaskCancellationState cancelByDetectorId(String detectorId, String detectorTaskId, String reason, String userName) {
        if (detectorId == null || detectorTaskId == null) {
            throw new IllegalArgumentException("Can't cancel task for null detector id or detector task id");
        }
        ADHCBatchTaskCache hcTaskCache = hcBatchTaskCaches.get(detectorId);
        List taskCaches = getBatchTaskCacheByDetectorId(detectorId);
        if (hcTaskCache != null) {
            // coordinating node
            logger.debug("Set HC historical analysis as cancelled for detector {}", detectorId);
            hcTaskCache.clearPendingEntities();
            hcTaskCache.setEntityTaskLanes(0);
        }
        ADHCBatchTaskRunState taskStateCache = getOrCreateHCDetectorTaskStateCache(detectorId, detectorTaskId);
        taskStateCache.setCancelledTimeInMillis(Instant.now().toEpochMilli());
        taskStateCache.setHistoricalAnalysisCancelled(true);
        taskStateCache.setCancelReason(reason);
        taskStateCache.setCancelledBy(userName);
        if (isNullOrEmpty(taskCaches)) {
            return ADTaskCancellationState.NOT_FOUND;
        }
        ADTaskCancellationState cancellationState = ADTaskCancellationState.ALREADY_CANCELLED;
        for (ADBatchTaskCache cache : taskCaches) {
            if (!cache.isCancelled()) {
                cancellationState = ADTaskCancellationState.CANCELLED;
                cache.cancel(reason, userName);
            }
        }
        return cancellationState;
    }
    /**
     * Check if single entity detector level task or HC entity task is cancelled or not.
     *
     * @param taskId AD task id, should not be HC detector level task
     * @return true if task is cancelled; otherwise return false
     */
    public boolean isCancelled(String taskId) {
        // For HC detector, ADBatchTaskCache is entity task.
        ADBatchTaskCache taskCache = getBatchTaskCache(taskId);
        String detectorId = taskCache.getId();
        String detectorTaskId = taskCache.getDetectorTaskId();
        ADHCBatchTaskRunState taskStateCache = getHCBatchTaskRunState(detectorId, detectorTaskId);
        boolean hcDetectorStopped = false;
        if (taskStateCache != null) {
            hcDetectorStopped = taskStateCache.getHistoricalAnalysisCancelled();
        }
        // If a new entity task comes after cancel event, then we have no chance to set it as cancelled.
        // So we need to check hcDetectorStopped for HC detector to know if it's cancelled or not.
        // For single entity detector, it has just 1 task, just need to check taskCache.isCancelled.
        return taskCache.isCancelled() || hcDetectorStopped;
    }
    /**
     * Get current task count in cache.
     *
     * @return task count
     */
    public int size() {
        return batchTaskCaches.size();
    }
    /**
     * Clear all tasks.
     */
    public void clear() {
        batchTaskCaches.clear();
        detectorTasks.clear();
    }
    /**
     * Estimate max memory usage of model training data.
     * The training data is double and will cache in double array.
     * One double consumes 8 bytes.
     *
     * @param size training data point count
     * @return how many bytes will consume
     */
    public long trainingDataMemorySize(int size) {
        return numberSize * size;
    }
    /**
     * Estimate max memory usage of shingle data.
     * One feature aggregated data point(double) consumes 8 bytes.
     * The shingle data is stored in {@link java.util.Deque}. From testing,
     * other parts except feature data consume 80 bytes.
     *
     * Check {@link ADBatchTaskCache#getShingle()}
     *
     * @param shingleSize shingle data point count
     * @param enabledFeatureSize enabled feature count
     * @return how many bytes will consume
     */
    public long shingleMemorySize(int shingleSize, int enabledFeatureSize) {
        return (80 + numberSize * enabledFeatureSize) * shingleSize;
    }
    /**
     * HC top entity initied or not
     *
     * @param detectorId detector id
     * @return true if top entity inited; otherwise return false
     */
    public synchronized boolean topEntityInited(String detectorId) {
        return hcBatchTaskCaches.containsKey(detectorId) ? hcBatchTaskCaches.get(detectorId).getTopEntitiesInited() : false;
    }
    /**
     * Set top entity inited as true.
     *
     * @param detectorId detector id
     */
    public void setTopEntityInited(String detectorId) {
        getExistingHCTaskCache(detectorId).setTopEntitiesInited(true);
    }
    /**
     * Get pending to run entity count.
     *
     * @param detectorId detector id
     * @return entity count
     */
    public int getPendingEntityCount(String detectorId) {
        return hcBatchTaskCaches.containsKey(detectorId) ? hcBatchTaskCaches.get(detectorId).getPendingEntityCount() : 0;
    }
    /**
     * Get current running entity count in cache of detector.
     *
     * @param detectorId detector id
     * @return count of detector's running entity in cache
     */
    public int getRunningEntityCount(String detectorId) {
        ADHCBatchTaskCache taskCache = hcBatchTaskCaches.get(detectorId);
        if (taskCache != null) {
            return taskCache.getRunningEntityCount();
        }
        return 0;
    }
    public int getTempEntityCount(String detectorId) {
        ADHCBatchTaskCache taskCache = hcBatchTaskCaches.get(detectorId);
        if (taskCache != null) {
            return taskCache.getTempEntityCount();
        }
        return 0;
    }
    /**
     * Get total top entity count for detector.
     *
     * @param detectorId detector id
     * @return total top entity count
     */
    public Integer getTopEntityCount(String detectorId) {
        ADHCBatchTaskCache batchTaskCache = hcBatchTaskCaches.get(detectorId);
        if (batchTaskCache != null) {
            return batchTaskCache.getTopEntityCount();
        } else {
            return 0;
        }
    }
    /**
     * Get current running entities of detector.
     * Profile API will call this method.
     *
     * @param detectorId detector id
     * @return detector's running entities in cache
     */
    public List getRunningEntities(String detectorId) {
        if (hcBatchTaskCaches.containsKey(detectorId)) {
            ADHCBatchTaskCache hcTaskCache = getExistingHCTaskCache(detectorId);
            return Arrays.asList(hcTaskCache.getRunningEntities());
        }
        return null;
    }
    /**
     * Set max allowed running entities for HC detector.
     *
     * @param detectorId detector id
     * @param allowedRunningEntities max allowed running entities
     */
    public void setAllowedRunningEntities(String detectorId, int allowedRunningEntities) {
        logger.debug("Set allowed running entities of detector {} as {}", detectorId, allowedRunningEntities);
        getExistingHCTaskCache(detectorId).setEntityTaskLanes(allowedRunningEntities);
    }
    /**
     * Set detector task slots. We cache task slots assigned to detector on coordinating node.
     * When start new historical analysis, will gather detector task slots on all nodes and
     * check how many task slots available for new historical analysis.
     *
     * @param detectorId detector id
     * @param taskSlots task slots
     */
    public synchronized void setDetectorTaskSlots(String detectorId, int taskSlots) {
        logger.debug("Set task slots of detector {} as {}", detectorId, taskSlots);
        ADTaskSlotLimit adTaskSlotLimit = detectorTaskSlotLimit
            .computeIfAbsent(detectorId, key -> new ADTaskSlotLimit(taskSlots, taskSlots));
        adTaskSlotLimit.setDetectorTaskSlots(taskSlots);
    }
    /**
     * Scale up detector task slots.
     * @param detectorId detector id
     * @param delta scale delta
     */
    public synchronized void scaleUpDetectorTaskSlots(String detectorId, int delta) {
        ADTaskSlotLimit adTaskSlotLimit = detectorTaskSlotLimit.get(detectorId);
        int taskSlots = this.getDetectorTaskSlots(detectorId);
        if (adTaskSlotLimit != null && delta > 0) {
            int newTaskSlots = adTaskSlotLimit.getDetectorTaskSlots() + delta;
            logger.info("Scale up task slots of detector {} from {} to {}", detectorId, taskSlots, newTaskSlots);
            adTaskSlotLimit.setDetectorTaskSlots(newTaskSlots);
        }
    }
    /**
     * Check how many unfinished entities in cache. If it's less than detector task slots, we
     * can scale down detector task slots to same as unfinished entities count. We can save
     * task slots in this way. The released task slots can be reused for other task run.
     * @param detectorId detector id
     * @param delta scale delta
     * @return new task slots
     */
    public synchronized int scaleDownHCDetectorTaskSlots(String detectorId, int delta) {
        ADTaskSlotLimit adTaskSlotLimit = this.detectorTaskSlotLimit.get(detectorId);
        int taskSlots = this.getDetectorTaskSlots(detectorId);
        if (adTaskSlotLimit != null && delta > 0) {
            int newTaskSlots = taskSlots - delta;
            if (newTaskSlots > 0) {
                logger.info("Scale down task slots of detector {} from {} to {}", detectorId, taskSlots, newTaskSlots);
                adTaskSlotLimit.setDetectorTaskSlots(newTaskSlots);
                return newTaskSlots;
            }
        }
        return taskSlots;
    }
    /**
     * Set detector task lane limit.
     * @param detectorId detector id
     * @param taskLaneLimit task lane limit
     */
    public synchronized void setDetectorTaskLaneLimit(String detectorId, int taskLaneLimit) {
        ADTaskSlotLimit adTaskSlotLimit = detectorTaskSlotLimit.get(detectorId);
        if (adTaskSlotLimit != null) {
            adTaskSlotLimit.setDetectorTaskLaneLimit(taskLaneLimit);
        }
    }
    /**
     * Get how many task slots assigned to detector
     * @param detectorId detector id
     * @return detector task slot count
     */
    public int getDetectorTaskSlots(String detectorId) {
        ADTaskSlotLimit taskSlotLimit = detectorTaskSlotLimit.get(detectorId);
        if (taskSlotLimit != null) {
            return taskSlotLimit.getDetectorTaskSlots();
        }
        return 0;
    }
    public int getUnfinishedEntityCount(String detectorId) {
        ADHCBatchTaskCache taskCache = hcBatchTaskCaches.get(detectorId);
        if (taskCache != null) {
            return taskCache.getUnfinishedEntityCount();
        }
        return 0;
    }
    /**
     * Get total task slots on this node.
     * @return total task slots
     */
    public int getTotalDetectorTaskSlots() {
        int totalTaskSLots = 0;
        for (Map.Entry entry : detectorTaskSlotLimit.entrySet()) {
            totalTaskSLots += entry.getValue().getDetectorTaskSlots();
        }
        return totalTaskSLots;
    }
    public int getTotalBatchTaskCount() {
        return batchTaskCaches.size();
    }
    /**
     * Get current allowed entity task lanes and decrease it by 1.
     *
     * @param detectorId detector id
     * @return current allowed entity task lane count
     */
    public synchronized int getAndDecreaseEntityTaskLanes(String detectorId) {
        return getExistingHCTaskCache(detectorId).getAndDecreaseEntityTaskLanes();
    }
    /**
     * Get current available new entity task lanes.
     * @param detectorId detector id
     * @return how many task lane available now
     */
    public int getAvailableNewEntityTaskLanes(String detectorId) {
        return getExistingHCTaskCache(detectorId).getEntityTaskLanes();
    }
    private ADHCBatchTaskCache getExistingHCTaskCache(String detectorId) {
        ADHCBatchTaskCache taskCache = hcBatchTaskCaches.get(detectorId);
        if (taskCache != null) {
            return taskCache;
        } else {
            throw new IllegalArgumentException("Can't find HC detector in cache");
        }
    }
    /**
     * Add list of entities into pending entities queue. And will remove these entities
     * from temp entities queue.
     *
     * @param detectorId detector id
     * @param entities list of entities
     */
    public void addPendingEntities(String detectorId, List entities) {
        getExistingHCTaskCache(detectorId).addPendingEntities(entities);
    }
    /**
     * Check if there is any HC task running on current node.
     * @param detectorId detector id
     * @return true if find detector id in any entity task or HC cache
     */
    public boolean isHCTaskRunning(String detectorId) {
        if (isHCTaskCoordinatingNode(detectorId)) {
            return true;
        }
        // Only running tasks will be in cache.
        Optional entityTask = this.batchTaskCaches
            .values()
            .stream()
            .filter(cache -> Objects.equals(detectorId, cache.getId()) && cache.getEntity() != null)
            .findFirst();
        return entityTask.isPresent();
    }
    /**
     * Check if current node is coordianting node of HC detector.
     * @param detectorId detector id
     * @return true if find detector id in HC cache
     */
    public boolean isHCTaskCoordinatingNode(String detectorId) {
        return hcBatchTaskCaches.containsKey(detectorId);
    }
    /**
     * Set top entity count.
     *
     * @param detectorId detector id
     * @param count top entity count
     */
    public void setTopEntityCount(String detectorId, Integer count) {
        ADHCBatchTaskCache hcTaskCache = getExistingHCTaskCache(detectorId);
        hcTaskCache.setTopEntityCount(count);
        ADTaskSlotLimit adTaskSlotLimit = detectorTaskSlotLimit.get(detectorId);
        if (count != null && adTaskSlotLimit != null) {
            Integer detectorTaskSlots = adTaskSlotLimit.getDetectorTaskSlots();
            if (detectorTaskSlots != null && detectorTaskSlots > count) {
                logger.debug("Scale down task slots from {} to the same as top entity count {}", detectorTaskSlots, count);
                adTaskSlotLimit.setDetectorTaskSlots(count);
            }
        }
    }
    /**
     * Poll one entity from HC detector entities cache. If entity exists, will move
     * entity to temp entites cache; otherwise return null.
     *
     * @param detectorId detector id
     * @return one entity
     */
    public synchronized String pollEntity(String detectorId) {
        if (this.hcBatchTaskCaches.containsKey(detectorId)) {
            ADHCBatchTaskCache hcTaskCache = this.hcBatchTaskCaches.get(detectorId);
            String entity = hcTaskCache.pollEntity();
            return entity;
        } else {
            return null;
        }
    }
    /**
     * Add entity into pending entities queue. And will remove the entity from temp
     * and running entities queue.
     *
     * @param detectorId detector id
     * @param entity entity value
     */
    public void addPendingEntity(String detectorId, String entity) {
        addPendingEntities(detectorId, ImmutableList.of(entity));
    }
    /**
     * Move one entity to running entity queue.
     *
     * @param detectorId detector id
     * @param entity entity value
     */
    public void moveToRunningEntity(String detectorId, String entity) {
        ADHCBatchTaskCache hcTaskCache = hcBatchTaskCaches.get(detectorId);
        if (hcTaskCache != null) {
            hcTaskCache.moveToRunningEntity(entity);
        }
    }
    /**
     * Task exceeds max retry limit or not.
     *
     * @param detectorId detector id
     * @param taskId task id
     * @return true if exceed retry limit; otherwise return false
     */
    public boolean exceedRetryLimit(String detectorId, String taskId) {
        return getExistingHCTaskCache(detectorId).getTaskRetryTimes(taskId) > TASK_RETRY_LIMIT;
    }
    /**
     * Push entity back to the end of pending entity queue.
     *
     * @param taskId task id
     * @param detectorId detector id
     * @param entity entity value
     */
    public void pushBackEntity(String taskId, String detectorId, String entity) {
        addPendingEntity(detectorId, entity);
        increaseEntityTaskRetry(detectorId, taskId);
    }
    /**
     * Increase entity task retry times.
     *
     * @param detectorId detector id
     * @param taskId task id
     * @return how many times retried
     */
    public int increaseEntityTaskRetry(String detectorId, String taskId) {
        return getExistingHCTaskCache(detectorId).increaseTaskRetry(taskId);
    }
    /**
     * Remove entity from cache.
     *
     * @param detectorId detector id
     * @param entity entity value
     */
    public void removeEntity(String detectorId, String entity) {
        if (hcBatchTaskCaches.containsKey(detectorId)) {
            hcBatchTaskCaches.get(detectorId).removeEntity(entity);
        }
    }
    /**
     * Return AD task's entity list.
     *
     * @param taskId AD task id
     * @return entity
     */
    public Entity getEntity(String taskId) {
        return getBatchTaskCache(taskId).getEntity();
    }
    /**
     * Check if detector still has entity in cache.
     *
     * @param detectorId detector id
     * @return true if detector still has entity in cache
     */
    public synchronized boolean hasEntity(String detectorId) {
        return hcBatchTaskCaches.containsKey(detectorId) && hcBatchTaskCaches.get(detectorId).hasEntity();
    }
    /**
     * Remove entity from HC task running entity cache.
     *
     * @param detectorId detector id
     * @param entity entity
     * @return true if entity was removed as a result of this call
     */
    public boolean removeRunningEntity(String detectorId, String entity) {
        ADHCBatchTaskCache hcTaskCache = hcBatchTaskCaches.get(detectorId);
        if (hcTaskCache != null) {
            boolean removed = hcTaskCache.removeRunningEntity(entity);
            logger.debug("Remove entity from running entities cache: {}: {}", entity, removed);
            return removed;
        }
        return false;
    }
    /**
     * Try to get semaphore to update detector task.
     *
     * If the timeout is less than or equal to zero, will not wait at all to get 1 permit.
     * If permit is available, will acquire 1 permit and return true immediately. If no permit,
     * will wait for other thread release. If no permit available until timeout elapses, will
     * return false.
     *
     * @param detectorId detector id
     * @param timeoutInMillis timeout in milliseconds to wait for a permit, zero or negative means don't wait at all
     * @return true if can get semaphore
     * @throws InterruptedException if the current thread is interrupted
     */
    public boolean tryAcquireTaskUpdatingSemaphore(String detectorId, long timeoutInMillis) throws InterruptedException {
        ADHCBatchTaskCache taskCache = hcBatchTaskCaches.get(detectorId);
        if (taskCache != null) {
            return taskCache.tryAcquireTaskUpdatingSemaphore(timeoutInMillis);
        }
        return false;
    }
    /**
     * Try to release semaphore of updating detector task.
     * @param detectorId detector id
     */
    public void releaseTaskUpdatingSemaphore(String detectorId) {
        ADHCBatchTaskCache taskCache = hcBatchTaskCaches.get(detectorId);
        if (taskCache != null) {
            taskCache.releaseTaskUpdatingSemaphore();
        }
    }
    /**
     * Clear pending entities of HC detector.
     *
     * @param detectorId detector id
     */
    public void clearPendingEntities(String detectorId) {
        ADHCBatchTaskCache taskCache = hcBatchTaskCaches.get(detectorId);
        if (taskCache != null) {
            taskCache.clearPendingEntities();
        }
    }
    /**
     * Check if realtime task field value change needed or not by comparing with cache.
     * 1. If new field value is null, will consider changed needed to this field.
     * 2. will consider the real time task change needed if
     * 1) init progress is larger or the old init progress is null, or
     * 2) if the state is different, and it is not changing from running to init.
     *  for other fields, as long as field values changed, will consider the realtime
     *  task change needed. We did this so that the init progress or state won't go backwards.
     * 3. If realtime task cache not found, will consider the realtime task change needed.
     *
     * @param detectorId detector id
     * @param newState new task state
     * @param newInitProgress new init progress
     * @param newError new error
     * @return true if realtime task change needed.
     */
    public boolean isRealtimeTaskChangeNeeded(String detectorId, String newState, Float newInitProgress, String newError) {
        if (realtimeTaskCaches.containsKey(detectorId)) {
            ADRealtimeTaskCache realtimeTaskCache = realtimeTaskCaches.get(detectorId);
            boolean stateChangeNeeded = false;
            String oldState = realtimeTaskCache.getState();
            if (newState != null
                && !newState.equals(oldState)
                && !(ADTaskState.INIT.name().equals(newState) && ADTaskState.RUNNING.name().equals(oldState))) {
                stateChangeNeeded = true;
            }
            boolean initProgressChangeNeeded = false;
            Float existingProgress = realtimeTaskCache.getInitProgress();
            if (newInitProgress != null
                && !newInitProgress.equals(existingProgress)
                && (existingProgress == null || newInitProgress > existingProgress)) {
                initProgressChangeNeeded = true;
            }
            boolean errorChanged = false;
            if (newError != null && !newError.equals(realtimeTaskCache.getError())) {
                errorChanged = true;
            }
            if (stateChangeNeeded || initProgressChangeNeeded || errorChanged) {
                return true;
            }
            return false;
        } else {
            return true;
        }
    }
    /**
     * Update realtime task cache with new field values. If realtime task cache exist, update it
     * directly if task is not done; if task is done, remove the detector's realtime task cache.
     *
     * If realtime task cache doesn't exist, will do nothing. Next realtime job run will re-init
     * realtime task cache when it finds task cache not inited yet.
     * Check {@link ADTaskManager#initRealtimeTaskCacheAndCleanupStaleCache(String, AnomalyDetector, TransportService, ActionListener)},
     * {@link ADTaskManager#updateLatestRealtimeTaskOnCoordinatingNode(String, String, Long, Long, String, ActionListener)}
     *
     * @param detectorId detector id
     * @param newState new task state
     * @param newInitProgress new init progress
     * @param newError new error
     */
    public void updateRealtimeTaskCache(String detectorId, String newState, Float newInitProgress, String newError) {
        ADRealtimeTaskCache realtimeTaskCache = realtimeTaskCaches.get(detectorId);
        if (realtimeTaskCache != null) {
            if (newState != null) {
                realtimeTaskCache.setState(newState);
            }
            if (newInitProgress != null) {
                realtimeTaskCache.setInitProgress(newInitProgress);
            }
            if (newError != null) {
                realtimeTaskCache.setError(newError);
            }
            if (newState != null && !ADTaskState.NOT_ENDED_STATES.contains(newState)) {
                // If task is done, will remove its realtime task cache.
                logger.info("Realtime task done with state {}, remove RT task cache for detector ", newState, detectorId);
                removeRealtimeTaskCache(detectorId);
            }
        } else {
            logger.debug("Realtime task cache is not inited yet for detector {}", detectorId);
        }
    }
    public void initRealtimeTaskCache(String detectorId, long detectorIntervalInMillis) {
        realtimeTaskCaches.put(detectorId, new ADRealtimeTaskCache(null, null, null, detectorIntervalInMillis));
        logger.debug("Realtime task cache inited");
    }
    public void refreshRealtimeJobRunTime(String detectorId) {
        ADRealtimeTaskCache taskCache = realtimeTaskCaches.get(detectorId);
        if (taskCache != null) {
            taskCache.setLastJobRunTime(Instant.now().toEpochMilli());
        }
    }
    /**
     * Get detector IDs from realtime task cache.
     * @return array of detector id
     */
    public String[] getDetectorIdsInRealtimeTaskCache() {
        return realtimeTaskCaches.keySet().toArray(new String[0]);
    }
    /**
     * Remove detector's realtime task from cache.
     * @param detectorId detector id
     */
    public void removeRealtimeTaskCache(String detectorId) {
        if (realtimeTaskCaches.containsKey(detectorId)) {
            logger.info("Delete realtime cache for detector {}", detectorId);
            realtimeTaskCaches.remove(detectorId);
        }
    }
    public ADRealtimeTaskCache getRealtimeTaskCache(String detectorId) {
        return realtimeTaskCaches.get(detectorId);
    }
    /**
     * Clear realtime task cache.
     */
    public void clearRealtimeTaskCache() {
        realtimeTaskCaches.clear();
    }
    /**
     * Add deleted task's id to deleted detector tasks queue.
     * @param taskId task id
     */
    public void addDeletedDetectorTask(String taskId) {
        if (deletedDetectorTasks.size() < maxCachedDeletedTask) {
            deletedDetectorTasks.add(taskId);
        }
    }
    /**
     * Check if deleted task queue has items.
     * @return true if has deleted detector task in cache
     */
    public boolean hasDeletedDetectorTask() {
        return !deletedDetectorTasks.isEmpty();
    }
    /**
     * Poll one deleted detector task.
     * @return task id
     */
    public String pollDeletedDetectorTask() {
        return this.deletedDetectorTasks.poll();
    }
    /**
     * Add deleted detector's id to deleted detector queue.
     * @param detectorId detector id
     */
    public void addDeletedDetector(String detectorId) {
        if (deletedDetectors.size() < maxCachedDeletedTask) {
            deletedDetectors.add(detectorId);
        }
    }
    /**
     * Poll one deleted detector.
     * @return detector id
     */
    public String pollDeletedDetector() {
        return this.deletedDetectors.poll();
    }
    public String getDetectorTaskId(String detectorId) {
        return detectorTasks.get(detectorId);
    }
    public Instant getLastScaleEntityTaskLaneTime(String detectorId) {
        ADHCBatchTaskCache taskCache = hcBatchTaskCaches.get(detectorId);
        if (taskCache != null) {
            return taskCache.getLastScaleEntityTaskSlotsTime();
        }
        return null;
    }
    public void refreshLastScaleEntityTaskLaneTime(String detectorId) {
        ADHCBatchTaskCache taskCache = hcBatchTaskCaches.get(detectorId);
        if (taskCache != null) {
            taskCache.setLastScaleEntityTaskSlotsTime(Instant.now());
        }
    }
    public Instant getLatestHCTaskRunTime(String detectorId) {
        ADHCBatchTaskCache taskCache = hcBatchTaskCaches.get(detectorId);
        if (taskCache != null) {
            return taskCache.getLatestTaskRunTime();
        }
        return null;
    }
    public void refreshLatestHCTaskRunTime(String detectorId) {
        ADHCBatchTaskCache taskCache = hcBatchTaskCaches.get(detectorId);
        if (taskCache != null) {
            taskCache.refreshLatestTaskRunTime();
        }
    }
    /**
     * Update detector level task's state in cache.
     * @param detectorId detector id
     * @param detectorTaskId detector level task id
     * @param newState new state
     */
    public synchronized void updateDetectorTaskState(String detectorId, String detectorTaskId, String newState) {
        ADHCBatchTaskRunState cache = getOrCreateHCDetectorTaskStateCache(detectorId, detectorTaskId);
        if (cache != null) {
            cache.setDetectorTaskState(newState);
            cache.setLastTaskRunTimeInMillis(Instant.now().toEpochMilli());
        }
    }
    public ADHCBatchTaskRunState getOrCreateHCDetectorTaskStateCache(String detectorId, String detectorTaskId) {
        Map states = hcBatchTaskRunState.computeIfAbsent(detectorId, it -> new ConcurrentHashMap<>());
        return states.computeIfAbsent(detectorTaskId, it -> new ADHCBatchTaskRunState());
    }
    public String getDetectorTaskState(String detectorId, String detectorTaskId) {
        ADHCBatchTaskRunState batchTaskRunStates = getHCBatchTaskRunState(detectorId, detectorTaskId);
        if (batchTaskRunStates != null) {
            return batchTaskRunStates.getDetectorTaskState();
        }
        return null;
    }
    public boolean detectorTaskStateExists(String detectorId, String detectorTaskId) {
        Map taskStateCache = hcBatchTaskRunState.get(detectorId);
        return taskStateCache != null && taskStateCache.containsKey(detectorTaskId);
    }
    private ADHCBatchTaskRunState getHCBatchTaskRunState(String detectorId, String detectorTaskId) {
        if (detectorId == null || detectorTaskId == null) {
            return null;
        }
        Map batchTaskRunStates = hcBatchTaskRunState.get(detectorId);
        if (batchTaskRunStates != null) {
            return batchTaskRunStates.get(detectorTaskId);
        }
        return null;
    }
    /**
     * Check if HC detector's historical analysis cancelled or not.
     *
     * @param detectorId detector id
     * @param detectorTaskId detector level task id
     * @return true if HC detector historical analysis cancelled; otherwise return false
     */
    public boolean isHistoricalAnalysisCancelledForHC(String detectorId, String detectorTaskId) {
        ADHCBatchTaskRunState taskStateCache = getHCBatchTaskRunState(detectorId, detectorTaskId);
        if (taskStateCache != null) {
            return taskStateCache.getHistoricalAnalysisCancelled();
        }
        return false;
    }
    /**
     * Get why task cancelled.
     *
     * @param taskId AD task id
     * @return task cancellation reason
     */
    public String getCancelReason(String taskId) {
        return getBatchTaskCache(taskId).getCancelReason();
    }
    /**
     * Get task cancelled by which user.
     *
     * @param taskId AD task id
     * @return user name
     */
    public String getCancelledBy(String taskId) {
        return getBatchTaskCache(taskId).getCancelledBy();
    }
    public String getCancelledByForHC(String detectorId, String detectorTaskId) {
        ADHCBatchTaskRunState taskCache = getHCBatchTaskRunState(detectorId, detectorTaskId);
        if (taskCache != null) {
            return taskCache.getCancelledBy();
        }
        return null;
    }
    public String getCancelReasonForHC(String detectorId, String detectorTaskId) {
        ADHCBatchTaskRunState taskCache = getHCBatchTaskRunState(detectorId, detectorTaskId);
        if (taskCache != null) {
            return taskCache.getCancelReason();
        }
        return null;
    }
    public void cleanExpiredHCBatchTaskRunStates() {
        if (!cleanExpiredHCBatchTaskRunStatesSemaphore.tryAcquire()) {
            return;
        }
        try {
            List detectorIdOfEmptyStates = new ArrayList<>();
            for (Map.Entry> detectorRunStates : hcBatchTaskRunState.entrySet()) {
                List taskIdOfExpiredStates = new ArrayList<>();
                String detectorId = detectorRunStates.getKey();
                boolean noRunningTask = isNullOrEmpty(getTasksOfDetector(detectorId));
                Map taskRunStates = detectorRunStates.getValue();
                if (taskRunStates == null) {
                    // If detector's task run state is null, add detector id to detectorIdOfEmptyStates and remove it from
                    // hcBatchTaskRunState later.
                    detectorIdOfEmptyStates.add(detectorId);
                    continue;
                }
                if (!noRunningTask) {
                    // If a detector has running task, we should not clean up task run state cache for it.
                    // It's possible that some entity task is on the way to worker node. So we should not
                    // remove detector level state if no running task found. Otherwise the task may arrive
                    // after run state cache deleted, then it can run on work node. We should delete cache
                    // if no running task and run state expired.
                    continue;
                }
                for (Map.Entry taskRunState : taskRunStates.entrySet()) {
                    ADHCBatchTaskRunState state = taskRunState.getValue();
                    if (state != null && noRunningTask && state.expired()) {
                        taskIdOfExpiredStates.add(taskRunState.getKey());
                    }
                }
                logger
                    .debug(
                        "Remove expired HC batch task run states for these tasks: {}",
                        Arrays.toString(taskIdOfExpiredStates.toArray(new String[0]))
                    );
                taskIdOfExpiredStates.forEach(id -> taskRunStates.remove(id));
                if (taskRunStates.isEmpty()) {
                    detectorIdOfEmptyStates.add(detectorId);
                }
            }
            logger
                .debug(
                    "Remove empty HC batch task run states for these detectors : {}",
                    Arrays.toString(detectorIdOfEmptyStates.toArray(new String[0]))
                );
            detectorIdOfEmptyStates.forEach(id -> hcBatchTaskRunState.remove(id));
        } catch (Exception e) {
            logger.error("Failed to clean expired HC batch task run states", e);
        } finally {
            cleanExpiredHCBatchTaskRunStatesSemaphore.release();
        }
    }
    /**
     * We query result index to check if there are any result generated for detector to tell whether it passed initialization of not.
     * To avoid repeated query when there is no data, record whether we have done that or not.
     * @param id detector id
     */
    public void markResultIndexQueried(String id) {
        ADRealtimeTaskCache realtimeTaskCache = realtimeTaskCaches.get(id);
        // we initialize a real time cache at the beginning of AnomalyResultTransportAction if it
        // cannot be found. If the cache is empty, we will return early and wait it for it to be
        // initialized.
        if (realtimeTaskCache != null) {
            realtimeTaskCache.setQueriedResultIndex(true);
        }
    }
    /**
     * We query result index to check if there are any result generated for detector to tell whether it passed initialization of not.
     *
     * @param id detector id
     * @return whether we have queried result index or not.
     */
    public boolean hasQueriedResultIndex(String id) {
        ADRealtimeTaskCache realtimeTaskCache = realtimeTaskCaches.get(id);
        if (realtimeTaskCache != null) {
            return realtimeTaskCache.hasQueriedResultIndex();
        }
        return false;
    }
}