/* * SPDX-License-Identifier: Apache-2.0 * * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * compatible open source license. * * Modifications Copyright OpenSearch Contributors. See * GitHub history for details. */ package org.opensearch.ad.ratelimit; import static org.opensearch.ad.settings.AnomalyDetectorSettings.AD_CHECKPOINT_WRITE_QUEUE_BATCH_SIZE; import static org.opensearch.ad.settings.AnomalyDetectorSettings.AD_CHECKPOINT_WRITE_QUEUE_CONCURRENCY; import java.time.Clock; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Random; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.action.ActionListener; import org.opensearch.action.bulk.BulkItemResponse; import org.opensearch.action.bulk.BulkRequest; import org.opensearch.action.bulk.BulkResponse; import org.opensearch.action.update.UpdateRequest; import org.opensearch.ad.breaker.ADCircuitBreakerService; import org.opensearch.ad.ml.CheckpointDao; import org.opensearch.ad.ml.EntityModel; import org.opensearch.ad.ml.ModelState; import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.core.common.Strings; import org.opensearch.threadpool.ThreadPool; import org.opensearch.timeseries.AnalysisType; import org.opensearch.timeseries.NodeStateManager; import org.opensearch.timeseries.model.Config; import org.opensearch.timeseries.util.ExceptionUtil; public class CheckpointWriteWorker extends BatchWorker { private static final Logger LOG = LogManager.getLogger(CheckpointWriteWorker.class); public static final String WORKER_NAME = "checkpoint-write"; private final CheckpointDao checkpoint; private final String indexName; private final Duration checkpointInterval; public CheckpointWriteWorker( long heapSizeInBytes, int singleRequestSizeInBytes, Setting maxHeapPercentForQueueSetting, ClusterService clusterService, Random random, ADCircuitBreakerService adCircuitBreakerService, ThreadPool threadPool, Settings settings, float maxQueuedTaskRatio, Clock clock, float mediumSegmentPruneRatio, float lowSegmentPruneRatio, int maintenanceFreqConstant, Duration executionTtl, CheckpointDao checkpoint, String indexName, Duration checkpointInterval, NodeStateManager stateManager, Duration stateTtl ) { super( WORKER_NAME, heapSizeInBytes, singleRequestSizeInBytes, maxHeapPercentForQueueSetting, clusterService, random, adCircuitBreakerService, threadPool, settings, maxQueuedTaskRatio, clock, mediumSegmentPruneRatio, lowSegmentPruneRatio, maintenanceFreqConstant, AD_CHECKPOINT_WRITE_QUEUE_CONCURRENCY, executionTtl, AD_CHECKPOINT_WRITE_QUEUE_BATCH_SIZE, stateTtl, stateManager ); this.checkpoint = checkpoint; this.indexName = indexName; this.checkpointInterval = checkpointInterval; } @Override protected void executeBatchRequest(BulkRequest request, ActionListener listener) { checkpoint.batchWrite(request, listener); } @Override protected BulkRequest toBatchRequest(List toProcess) { final BulkRequest bulkRequest = new BulkRequest(); for (CheckpointWriteRequest request : toProcess) { bulkRequest.add(request.getUpdateRequest()); } return bulkRequest; } @Override protected ActionListener getResponseListener(List toProcess, BulkRequest batchRequest) { return ActionListener.wrap(response -> { for (BulkItemResponse r : response.getItems()) { if (r.getFailureMessage() != null) { // maybe indicating a bug // don't retry failed requests since checkpoints are too large (250KB+) // Later maintenance window or cold start or cache remove will retry saving LOG.error(r.getFailureMessage()); } } }, exception -> { if (ExceptionUtil.isOverloaded(exception)) { LOG.error("too many get AD model checkpoint requests or shard not avialble"); setCoolDownStart(); } for (CheckpointWriteRequest request : toProcess) { nodeStateManager.setException(request.getId(), exception); } // don't retry failed requests since checkpoints are too large (250KB+) // Later maintenance window or cold start or cache remove will retry saving LOG.error("Fail to save models", exception); }); } /** * Prepare bulking the input model state to the checkpoint index. * We don't save checkpoints within checkpointInterval again, except this * is a high priority request (e.g., from cold start). * This method will update the input state's last checkpoint time if the * checkpoint is staged (ready to be written in the next batch). * @param modelState Model state * @param forceWrite whether we should write no matter what * @param priority how urgent the write is */ public void write(ModelState modelState, boolean forceWrite, RequestPriority priority) { Instant instant = modelState.getLastCheckpointTime(); if (!checkpoint.shouldSave(instant, forceWrite, checkpointInterval, clock)) { return; } if (modelState.getModel() != null) { String detectorId = modelState.getId(); String modelId = modelState.getModelId(); if (modelId == null || detectorId == null) { return; } nodeStateManager.getConfig(detectorId, AnalysisType.AD, onGetDetector(detectorId, modelId, modelState, priority)); } } private ActionListener> onGetDetector( String detectorId, String modelId, ModelState modelState, RequestPriority priority ) { return ActionListener.wrap(detectorOptional -> { if (false == detectorOptional.isPresent()) { LOG.warn(new ParameterizedMessage("AnomalyDetector [{}] is not available.", detectorId)); return; } AnomalyDetector detector = (AnomalyDetector) detectorOptional.get(); try { Map source = checkpoint.toIndexSource(modelState); // the model state is bloated or we have bugs, skip if (source == null || source.isEmpty()) { return; } modelState.setLastCheckpointTime(clock.instant()); CheckpointWriteRequest request = new CheckpointWriteRequest( System.currentTimeMillis() + detector.getIntervalInMilliseconds(), detectorId, priority, // If the document does not already exist, the contents of the upsert element // are inserted as a new document. // If the document exists, update fields in the map new UpdateRequest(indexName, modelId).docAsUpsert(true).doc(source) ); put(request); } catch (Exception e) { // Example exception: // ConcurrentModificationException when calling toCheckpoint // and updating rcf model at the same time. To prevent this, // we need to have a deep copy of models or have a lock. Both // options are costly. // As we are gonna retry serializing either when the entity is // evicted out of cache or during the next maintenance period, // don't do anything when the exception happens. LOG.error(new ParameterizedMessage("Exception while serializing models for [{}]", modelId), e); } }, exception -> { LOG.error(new ParameterizedMessage("fail to get detector [{}]", detectorId), exception); }); } public void writeAll(List> modelStates, String detectorId, boolean forceWrite, RequestPriority priority) { ActionListener> onGetForAll = ActionListener.wrap(detectorOptional -> { if (false == detectorOptional.isPresent()) { LOG.warn(new ParameterizedMessage("AnomalyDetector [{}] is not available.", detectorId)); return; } AnomalyDetector detector = (AnomalyDetector) detectorOptional.get(); try { List allRequests = new ArrayList<>(); for (ModelState state : modelStates) { Instant instant = state.getLastCheckpointTime(); if (!checkpoint.shouldSave(instant, forceWrite, checkpointInterval, clock)) { continue; } Map source = checkpoint.toIndexSource(state); String modelId = state.getModelId(); // the model state is bloated or empty (empty samples and models), skip if (source == null || source.isEmpty() || Strings.isEmpty(modelId)) { continue; } state.setLastCheckpointTime(clock.instant()); allRequests .add( new CheckpointWriteRequest( System.currentTimeMillis() + detector.getIntervalInMilliseconds(), detectorId, priority, // If the document does not already exist, the contents of the upsert element // are inserted as a new document. // If the document exists, update fields in the map new UpdateRequest(indexName, modelId).docAsUpsert(true).doc(source) ) ); } putAll(allRequests); } catch (Exception e) { // Example exception: // ConcurrentModificationException when calling toCheckpoint // and updating rcf model at the same time. To prevent this, // we need to have a deep copy of models or have a lock. Both // options are costly. // As we are gonna retry serializing either when the entity is // evicted out of cache or during the next maintenance period, // don't do anything when the exception happens. LOG.info(new ParameterizedMessage("Exception while serializing models for [{}]", detectorId), e); } }, exception -> { LOG.error(new ParameterizedMessage("fail to get detector [{}]", detectorId), exception); }); nodeStateManager.getConfig(detectorId, AnalysisType.AD, onGetForAll); } }