/* * SPDX-License-Identifier: Apache-2.0 * * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * compatible open source license. * * Modifications Copyright OpenSearch Contributors. See * GitHub history for details. */ package org.opensearch.ad; import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.search.join.ScoreMode; import org.opensearch.action.ActionListener; import org.opensearch.action.get.GetRequest; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; import org.opensearch.ad.constant.ADCommonMessages; import org.opensearch.ad.constant.ADCommonName; import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.model.AnomalyResult; import org.opensearch.ad.model.EntityProfile; import org.opensearch.ad.model.EntityProfileName; import org.opensearch.ad.model.EntityState; import org.opensearch.ad.model.InitProgressProfile; import org.opensearch.ad.settings.ADNumericSetting; import org.opensearch.ad.transport.EntityProfileAction; import org.opensearch.ad.transport.EntityProfileRequest; import org.opensearch.ad.transport.EntityProfileResponse; import org.opensearch.client.Client; import org.opensearch.cluster.routing.Preference; import org.opensearch.common.xcontent.LoggingDeprecationHandler; import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.query.BoolQueryBuilder; import org.opensearch.index.query.NestedQueryBuilder; import org.opensearch.index.query.QueryBuilders; import org.opensearch.index.query.TermQueryBuilder; import org.opensearch.search.aggregations.AggregationBuilders; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.timeseries.AnalysisType; import org.opensearch.timeseries.constant.CommonMessages; import org.opensearch.timeseries.constant.CommonName; import org.opensearch.timeseries.model.Entity; import org.opensearch.timeseries.model.IntervalTimeConfiguration; import org.opensearch.timeseries.model.Job; import org.opensearch.timeseries.util.MultiResponsesDelegateActionListener; import org.opensearch.timeseries.util.ParseUtils; import org.opensearch.timeseries.util.SecurityClientUtil; public class EntityProfileRunner extends AbstractProfileRunner { private final Logger logger = LogManager.getLogger(EntityProfileRunner.class); static final String NOT_HC_DETECTOR_ERR_MSG = "This is not a high cardinality detector"; static final String EMPTY_ENTITY_ATTRIBUTES = "Empty entity attributes"; static final String NO_ENTITY = "Cannot find entity"; private Client client; private SecurityClientUtil clientUtil; private NamedXContentRegistry xContentRegistry; public EntityProfileRunner(Client client, SecurityClientUtil clientUtil, NamedXContentRegistry xContentRegistry, long requiredSamples) { super(requiredSamples); this.client = client; this.clientUtil = clientUtil; this.xContentRegistry = xContentRegistry; } /** * Get profile info of specific entity. * * @param detectorId detector identifier * @param entityValue entity value * @param profilesToCollect profiles to collect * @param listener action listener to handle exception and process entity profile response */ public void profile( String detectorId, Entity entityValue, Set<EntityProfileName> profilesToCollect, ActionListener<EntityProfile> listener ) { if (profilesToCollect == null || profilesToCollect.size() == 0) { listener.onFailure(new IllegalArgumentException(ADCommonMessages.EMPTY_PROFILES_COLLECT)); return; } GetRequest getDetectorRequest = new GetRequest(CommonName.CONFIG_INDEX, detectorId); client.get(getDetectorRequest, ActionListener.wrap(getResponse -> { if (getResponse != null && getResponse.isExists()) { try ( XContentParser parser = XContentType.JSON .xContent() .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, getResponse.getSourceAsString()) ) { ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); AnomalyDetector detector = AnomalyDetector.parse(parser, detectorId); List<String> categoryFields = detector.getCategoryFields(); int maxCategoryFields = ADNumericSetting.maxCategoricalFields(); if (categoryFields == null || categoryFields.size() == 0) { listener.onFailure(new IllegalArgumentException(NOT_HC_DETECTOR_ERR_MSG)); } else if (categoryFields.size() > maxCategoryFields) { listener.onFailure(new IllegalArgumentException(CommonMessages.getTooManyCategoricalFieldErr(maxCategoryFields))); } else { validateEntity(entityValue, categoryFields, detectorId, profilesToCollect, detector, listener); } } catch (Exception t) { listener.onFailure(t); } } else { listener.onFailure(new IllegalArgumentException(CommonMessages.FAIL_TO_FIND_CONFIG_MSG + detectorId)); } }, listener::onFailure)); } /** * Verify if the input entity exists or not in case of typos. * * If a user deletes the entity after job start, then we will not be able to * get this entity in the index. For this case, we will not return a profile * for this entity even if it's running on some data node. the entity's model * will be deleted by another entity or by maintenance due to long inactivity. * * @param entity Entity accessor * @param categoryFields category fields defined for a detector * @param detectorId Detector Id * @param profilesToCollect Profile to collect from the input * @param detector Detector config accessor * @param listener Callback to send responses. */ private void validateEntity( Entity entity, List<String> categoryFields, String detectorId, Set<EntityProfileName> profilesToCollect, AnomalyDetector detector, ActionListener<EntityProfile> listener ) { Map<String, String> attributes = entity.getAttributes(); if (attributes == null || attributes.size() != categoryFields.size()) { listener.onFailure(new IllegalArgumentException(EMPTY_ENTITY_ATTRIBUTES)); return; } for (String field : categoryFields) { if (false == attributes.containsKey(field)) { listener.onFailure(new IllegalArgumentException("Cannot find " + field)); return; } } BoolQueryBuilder internalFilterQuery = QueryBuilders.boolQuery().filter(detector.getFilterQuery()); for (TermQueryBuilder term : entity.getTermQueryBuilders()) { internalFilterQuery.filter(term); } SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(internalFilterQuery).size(1); SearchRequest searchRequest = new SearchRequest(detector.getIndices().toArray(new String[0]), searchSourceBuilder) .preference(Preference.LOCAL.toString()); final ActionListener<SearchResponse> searchResponseListener = ActionListener.wrap(searchResponse -> { try { if (searchResponse.getHits().getHits().length == 0) { listener.onFailure(new IllegalArgumentException(NO_ENTITY)); return; } prepareEntityProfile(listener, detectorId, entity, profilesToCollect, detector, categoryFields.get(0)); } catch (Exception e) { listener.onFailure(new IllegalArgumentException(NO_ENTITY)); return; } }, e -> listener.onFailure(new IllegalArgumentException(NO_ENTITY))); // using the original context in listener as user roles have no permissions for internal operations like fetching a // checkpoint clientUtil .<SearchRequest, SearchResponse>asyncRequestWithInjectedSecurity( searchRequest, client::search, detector.getId(), client, AnalysisType.AD, searchResponseListener ); } private void prepareEntityProfile( ActionListener<EntityProfile> listener, String detectorId, Entity entityValue, Set<EntityProfileName> profilesToCollect, AnomalyDetector detector, String categoryField ) { EntityProfileRequest request = new EntityProfileRequest(detectorId, entityValue, profilesToCollect); client .execute( EntityProfileAction.INSTANCE, request, ActionListener.wrap(r -> getJob(detectorId, entityValue, profilesToCollect, detector, r, listener), listener::onFailure) ); } private void getJob( String detectorId, Entity entityValue, Set<EntityProfileName> profilesToCollect, AnomalyDetector detector, EntityProfileResponse entityProfileResponse, ActionListener<EntityProfile> listener ) { GetRequest getRequest = new GetRequest(CommonName.JOB_INDEX, detectorId); client.get(getRequest, ActionListener.wrap(getResponse -> { if (getResponse != null && getResponse.isExists()) { try ( XContentParser parser = XContentType.JSON .xContent() .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, getResponse.getSourceAsString()) ) { ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); Job job = Job.parse(parser); int totalResponsesToWait = 0; if (profilesToCollect.contains(EntityProfileName.INIT_PROGRESS) || profilesToCollect.contains(EntityProfileName.STATE)) { totalResponsesToWait++; } if (profilesToCollect.contains(EntityProfileName.ENTITY_INFO)) { totalResponsesToWait++; } if (profilesToCollect.contains(EntityProfileName.MODELS)) { totalResponsesToWait++; } MultiResponsesDelegateActionListener<EntityProfile> delegateListener = new MultiResponsesDelegateActionListener<EntityProfile>( listener, totalResponsesToWait, ADCommonMessages.FAIL_FETCH_ERR_MSG + entityValue + " of detector " + detectorId, false ); if (profilesToCollect.contains(EntityProfileName.MODELS)) { EntityProfile.Builder builder = new EntityProfile.Builder(); if (false == job.isEnabled()) { delegateListener.onResponse(builder.build()); } else { delegateListener.onResponse(builder.modelProfile(entityProfileResponse.getModelProfile()).build()); } } if (profilesToCollect.contains(EntityProfileName.INIT_PROGRESS) || profilesToCollect.contains(EntityProfileName.STATE)) { profileStateRelated( entityProfileResponse.getTotalUpdates(), detectorId, entityValue, profilesToCollect, detector, job, delegateListener ); } if (profilesToCollect.contains(EntityProfileName.ENTITY_INFO)) { long enabledTimeMs = job.getEnabledTime().toEpochMilli(); SearchRequest lastSampleTimeRequest = createLastSampleTimeRequest( detectorId, enabledTimeMs, entityValue, detector.getCustomResultIndex() ); EntityProfile.Builder builder = new EntityProfile.Builder(); Optional<Boolean> isActiveOp = entityProfileResponse.isActive(); if (isActiveOp.isPresent()) { builder.isActive(isActiveOp.get()); } builder.lastActiveTimestampMs(entityProfileResponse.getLastActiveMs()); client.search(lastSampleTimeRequest, ActionListener.wrap(searchResponse -> { Optional<Long> latestSampleTimeMs = ParseUtils.getLatestDataTime(searchResponse); if (latestSampleTimeMs.isPresent()) { builder.lastSampleTimestampMs(latestSampleTimeMs.get()); } delegateListener.onResponse(builder.build()); }, exception -> { // sth wrong like result index not created. Return what we have if (exception instanceof IndexNotFoundException) { // don't print out stack trace since it is not helpful logger.info("Result index hasn't been created", exception.getMessage()); } else { logger.warn("fail to get last sample time", exception); } delegateListener.onResponse(builder.build()); })); } } catch (Exception e) { logger.error(ADCommonMessages.FAIL_TO_GET_PROFILE_MSG, e); listener.onFailure(e); } } else { sendUnknownState(profilesToCollect, entityValue, true, listener); } }, exception -> { if (exception instanceof IndexNotFoundException) { logger.info(exception.getMessage()); sendUnknownState(profilesToCollect, entityValue, true, listener); } else { logger.error(ADCommonMessages.FAIL_TO_GET_PROFILE_MSG + detectorId, exception); listener.onFailure(exception); } })); } private void profileStateRelated( long totalUpdates, String detectorId, Entity entityValue, Set<EntityProfileName> profilesToCollect, AnomalyDetector detector, Job job, MultiResponsesDelegateActionListener<EntityProfile> delegateListener ) { if (totalUpdates == 0) { sendUnknownState(profilesToCollect, entityValue, false, delegateListener); } else if (false == job.isEnabled()) { sendUnknownState(profilesToCollect, entityValue, false, delegateListener); } else if (totalUpdates >= requiredSamples) { sendRunningState(profilesToCollect, entityValue, delegateListener); } else { sendInitState(profilesToCollect, entityValue, detector, totalUpdates, delegateListener); } } /** * Send unknown state back * @param profilesToCollect Profiles to Collect * @param entityValue Entity value * @param immediate whether we should terminate workflow and respond immediately * @param delegateListener Delegate listener */ private void sendUnknownState( Set<EntityProfileName> profilesToCollect, Entity entityValue, boolean immediate, ActionListener<EntityProfile> delegateListener ) { EntityProfile.Builder builder = new EntityProfile.Builder(); if (profilesToCollect.contains(EntityProfileName.STATE)) { builder.state(EntityState.UNKNOWN); } if (immediate) { delegateListener.onResponse(builder.build()); } else { delegateListener.onResponse(builder.build()); } } private void sendRunningState( Set<EntityProfileName> profilesToCollect, Entity entityValue, MultiResponsesDelegateActionListener<EntityProfile> delegateListener ) { EntityProfile.Builder builder = new EntityProfile.Builder(); if (profilesToCollect.contains(EntityProfileName.STATE)) { builder.state(EntityState.RUNNING); } if (profilesToCollect.contains(EntityProfileName.INIT_PROGRESS)) { InitProgressProfile initProgress = new InitProgressProfile("100%", 0, 0); builder.initProgress(initProgress); } delegateListener.onResponse(builder.build()); } private void sendInitState( Set<EntityProfileName> profilesToCollect, Entity entityValue, AnomalyDetector detector, long updates, MultiResponsesDelegateActionListener<EntityProfile> delegateListener ) { EntityProfile.Builder builder = new EntityProfile.Builder(); if (profilesToCollect.contains(EntityProfileName.STATE)) { builder.state(EntityState.INIT); } if (profilesToCollect.contains(EntityProfileName.INIT_PROGRESS)) { long intervalMins = ((IntervalTimeConfiguration) detector.getInterval()).toDuration().toMinutes(); InitProgressProfile initProgress = computeInitProgressProfile(updates, intervalMins); builder.initProgress(initProgress); } delegateListener.onResponse(builder.build()); } private SearchRequest createLastSampleTimeRequest(String detectorId, long enabledTime, Entity entity, String resultIndex) { BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder(); String path = "entity"; String entityName = path + ".name"; String entityValue = path + ".value"; for (Map.Entry<String, String> attribute : entity.getAttributes().entrySet()) { /* * each attribute pair corresponds to a nested query like "nested": { "query": { "bool": { "filter": [ { "term": { "entity.name": { "value": "turkey4", "boost": 1 } } }, { "term": { "entity.value": { "value": "Turkey", "boost": 1 } } } ] } }, "path": "entity", "ignore_unmapped": false, "score_mode": "none", "boost": 1 } },*/ BoolQueryBuilder nestedBoolQueryBuilder = new BoolQueryBuilder(); TermQueryBuilder entityNameFilterQuery = QueryBuilders.termQuery(entityName, attribute.getKey()); nestedBoolQueryBuilder.filter(entityNameFilterQuery); TermQueryBuilder entityValueFilterQuery = QueryBuilders.termQuery(entityValue, attribute.getValue()); nestedBoolQueryBuilder.filter(entityValueFilterQuery); NestedQueryBuilder nestedNameQueryBuilder = new NestedQueryBuilder(path, nestedBoolQueryBuilder, ScoreMode.None); boolQueryBuilder.filter(nestedNameQueryBuilder); } boolQueryBuilder.filter(QueryBuilders.termQuery(AnomalyResult.DETECTOR_ID_FIELD, detectorId)); boolQueryBuilder.filter(QueryBuilders.rangeQuery(CommonName.EXECUTION_END_TIME_FIELD).gte(enabledTime)); SearchSourceBuilder source = new SearchSourceBuilder() .query(boolQueryBuilder) .aggregation(AggregationBuilders.max(CommonName.AGG_NAME_MAX_TIME).field(CommonName.EXECUTION_END_TIME_FIELD)) .trackTotalHits(false) .size(0); SearchRequest request = new SearchRequest(ADCommonName.ANOMALY_RESULT_INDEX_ALIAS); request.source(source); if (resultIndex != null) { request.indices(resultIndex); } return request; } }