/* * SPDX-License-Identifier: Apache-2.0 * * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * compatible open source license. * * Modifications Copyright OpenSearch Contributors. See * GitHub history for details. */ package org.opensearch.ad.transport.handler; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.ExceptionsHelper; import org.opensearch.ResourceAlreadyExistsException; import org.opensearch.action.ActionListener; import org.opensearch.ad.constant.ADCommonName; import org.opensearch.ad.indices.ADIndexManagement; import org.opensearch.ad.model.AnomalyResult; import org.opensearch.ad.transport.ADResultBulkAction; import org.opensearch.ad.transport.ADResultBulkRequest; import org.opensearch.ad.transport.ADResultBulkResponse; import org.opensearch.ad.util.IndexUtils; import org.opensearch.client.Client; import org.opensearch.cluster.block.ClusterBlockLevel; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.Settings; import org.opensearch.threadpool.ThreadPool; import org.opensearch.timeseries.common.exception.TimeSeriesException; import org.opensearch.timeseries.util.ClientUtil; /** * EntityResultTransportAction depends on this class. I cannot use * AnomalyIndexHandler < AnomalyResult > . All transport actions * needs dependency injection. Guice has a hard time initializing generics class * AnomalyIndexHandler < AnomalyResult > due to type erasure. * To avoid that, I create a class with a built-in details so * that Guice would be able to work out the details. * */ public class MultiEntityResultHandler extends AnomalyIndexHandler { private static final Logger LOG = LogManager.getLogger(MultiEntityResultHandler.class); // package private for testing static final String SUCCESS_SAVING_RESULT_MSG = "Result saved successfully."; static final String CANNOT_SAVE_RESULT_ERR_MSG = "Cannot save results due to write block."; @Inject public MultiEntityResultHandler( Client client, Settings settings, ThreadPool threadPool, ADIndexManagement anomalyDetectionIndices, ClientUtil clientUtil, IndexUtils indexUtils, ClusterService clusterService ) { super( client, settings, threadPool, ADCommonName.ANOMALY_RESULT_INDEX_ALIAS, anomalyDetectionIndices, clientUtil, indexUtils, clusterService ); } /** * Execute the bulk request * @param currentBulkRequest The bulk request * @param listener callback after flushing */ public void flush(ADResultBulkRequest currentBulkRequest, ActionListener listener) { if (indexUtils.checkIndicesBlocked(clusterService.state(), ClusterBlockLevel.WRITE, this.indexName)) { listener.onFailure(new TimeSeriesException(CANNOT_SAVE_RESULT_ERR_MSG)); return; } try { if (!anomalyDetectionIndices.doesDefaultResultIndexExist()) { anomalyDetectionIndices.initDefaultResultIndexDirectly(ActionListener.wrap(initResponse -> { if (initResponse.isAcknowledged()) { bulk(currentBulkRequest, listener); } else { LOG.warn("Creating result index with mappings call not acknowledged."); listener.onFailure(new TimeSeriesException("", "Creating result index with mappings call not acknowledged.")); } }, exception -> { if (ExceptionsHelper.unwrapCause(exception) instanceof ResourceAlreadyExistsException) { // It is possible the index has been created while we sending the create request bulk(currentBulkRequest, listener); } else { LOG.warn("Unexpected error creating result index", exception); listener.onFailure(exception); } })); } else { bulk(currentBulkRequest, listener); } } catch (Exception e) { LOG.warn("Error in bulking results", e); listener.onFailure(e); } } private void bulk(ADResultBulkRequest currentBulkRequest, ActionListener listener) { if (currentBulkRequest.numberOfActions() <= 0) { listener.onFailure(new TimeSeriesException("no result to save")); return; } client.execute(ADResultBulkAction.INSTANCE, currentBulkRequest, ActionListener.wrap(response -> { LOG.debug(SUCCESS_SAVING_RESULT_MSG); listener.onResponse(response); }, exception -> { LOG.error("Error in bulking results", exception); listener.onFailure(exception); })); } }