/* * SPDX-License-Identifier: Apache-2.0 * * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * compatible open source license. * * Modifications Copyright OpenSearch Contributors. See * GitHub history for details. */ package org.opensearch.timeseries.feature; import static java.util.Arrays.asList; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.AnyOf.anyOf; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.time.Clock; import java.time.ZoneId; import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; import java.util.concurrent.ExecutorService; import org.apache.lucene.search.TotalHits; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.opensearch.action.ActionFuture; import org.opensearch.action.ActionListener; import org.opensearch.action.search.MultiSearchRequest; import org.opensearch.action.search.MultiSearchResponse; import org.opensearch.action.search.MultiSearchResponse.Item; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; import org.opensearch.action.search.SearchResponseSections; import org.opensearch.action.search.ShardSearchFailure; import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.settings.AnomalyDetectorSettings; import org.opensearch.client.Client; import org.opensearch.common.settings.Settings; import org.opensearch.common.time.DateFormatter; import org.opensearch.common.xcontent.LoggingDeprecationHandler; import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.index.mapper.DateFieldMapper; import org.opensearch.index.query.QueryBuilders; import org.opensearch.script.ScriptService; import org.opensearch.script.TemplateScript; import org.opensearch.script.TemplateScript.Factory; import org.opensearch.search.DocValueFormat; import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; import org.opensearch.search.aggregations.Aggregation; import org.opensearch.search.aggregations.AggregationBuilder; import org.opensearch.search.aggregations.AggregationBuilders; import org.opensearch.search.aggregations.Aggregations; import org.opensearch.search.aggregations.AggregatorFactories; import org.opensearch.search.aggregations.InternalAggregations; import org.opensearch.search.aggregations.bucket.MultiBucketsAggregation; import org.opensearch.search.aggregations.metrics.InternalMin; import org.opensearch.search.aggregations.metrics.InternalTDigestPercentiles; import org.opensearch.search.aggregations.metrics.Max; import org.opensearch.search.aggregations.metrics.MaxAggregationBuilder; import org.opensearch.search.aggregations.metrics.MinAggregationBuilder; import org.opensearch.search.aggregations.metrics.Percentile; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.threadpool.ThreadPool; import org.opensearch.timeseries.AnalysisType; import org.opensearch.timeseries.NodeStateManager; import org.opensearch.timeseries.TimeSeriesAnalyticsPlugin; import org.opensearch.timeseries.constant.CommonName; import org.opensearch.timeseries.dataprocessor.Imputer; import org.opensearch.timeseries.dataprocessor.LinearUniformImputer; import org.opensearch.timeseries.model.Entity; import org.opensearch.timeseries.model.IntervalTimeConfiguration; import org.opensearch.timeseries.util.ParseUtils; import org.opensearch.timeseries.util.SecurityClientUtil; import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; @PowerMockIgnore("javax.management.*") @RunWith(PowerMockRunner.class) @PrepareForTest({ ParseUtils.class }) public class SearchFeatureDaoTests { private SearchFeatureDao searchFeatureDao; @Mock private Client client; @Mock private ScriptService scriptService; @Mock private NamedXContentRegistry xContent; private SecurityClientUtil clientUtil; @Mock private Factory factory; @Mock private TemplateScript templateScript; @Mock private ActionFuture searchResponseFuture; @Mock private ActionFuture multiSearchResponseFuture; @Mock private SearchResponse searchResponse; @Mock private MultiSearchResponse multiSearchResponse; @Mock private Item multiSearchResponseItem; @Mock private Aggregations aggs; @Mock private Max max; @Mock private NodeStateManager stateManager; @Mock private AnomalyDetector detector; @Mock private ThreadPool threadPool; @Mock private Clock clock; private SearchRequest searchRequest; private SearchSourceBuilder searchSourceBuilder; private MultiSearchRequest multiSearchRequest; private Map aggsMap; private IntervalTimeConfiguration detectionInterval; private String detectorId; private Imputer imputer; private Settings settings; @Before public void setup() throws Exception { MockitoAnnotations.initMocks(this); PowerMockito.mockStatic(ParseUtils.class); imputer = new LinearUniformImputer(false); ExecutorService executorService = mock(ExecutorService.class); when(threadPool.executor(TimeSeriesAnalyticsPlugin.AD_THREAD_POOL_NAME)).thenReturn(executorService); doAnswer(invocation -> { Runnable runnable = invocation.getArgument(0); runnable.run(); return null; }).when(executorService).execute(any(Runnable.class)); settings = Settings.EMPTY; when(client.threadPool()).thenReturn(threadPool); NodeStateManager nodeStateManager = mock(NodeStateManager.class); doAnswer(invocation -> { ActionListener> listener = invocation.getArgument(2); listener.onResponse(Optional.of(detector)); return null; }).when(nodeStateManager).getConfig(any(String.class), eq(AnalysisType.AD), any(ActionListener.class)); clientUtil = new SecurityClientUtil(nodeStateManager, settings); searchFeatureDao = spy( new SearchFeatureDao(client, xContent, imputer, clientUtil, settings, null, AnomalyDetectorSettings.NUM_SAMPLES_PER_TREE) ); detectionInterval = new IntervalTimeConfiguration(1, ChronoUnit.MINUTES); detectorId = "123"; when(detector.getId()).thenReturn(detectorId); when(detector.getTimeField()).thenReturn("testTimeField"); when(detector.getIndices()).thenReturn(Arrays.asList("testIndices")); when(detector.getInterval()).thenReturn(detectionInterval); when(detector.getFilterQuery()).thenReturn(QueryBuilders.matchAllQuery()); when(detector.getCategoryFields()).thenReturn(Collections.singletonList("a")); searchSourceBuilder = SearchSourceBuilder .fromXContent(XContentType.JSON.xContent().createParser(xContent, LoggingDeprecationHandler.INSTANCE, "{}")); searchRequest = new SearchRequest(detector.getIndices().toArray(new String[0])); aggsMap = new HashMap<>(); when(max.getName()).thenReturn(CommonName.AGG_NAME_MAX_TIME); List list = new ArrayList<>(); list.add(max); Aggregations aggregations = new Aggregations(list); SearchHits hits = new SearchHits(new SearchHit[0], new TotalHits(1L, TotalHits.Relation.EQUAL_TO), 1f); when(searchResponse.getHits()).thenReturn(hits); doAnswer(invocation -> { Object[] args = invocation.getArguments(); ActionListener listener = (ActionListener) args[1]; listener.onResponse(searchResponse); return null; }).when(client).search(eq(searchRequest), any()); when(searchResponse.getAggregations()).thenReturn(aggregations); multiSearchRequest = new MultiSearchRequest(); SearchRequest request = new SearchRequest(detector.getIndices().toArray(new String[0])); multiSearchRequest.add(request); doAnswer(invocation -> { Object[] args = invocation.getArguments(); ActionListener listener = (ActionListener) args[1]; listener.onResponse(multiSearchResponse); return null; }).when(client).multiSearch(eq(multiSearchRequest), any()); when(multiSearchResponse.getResponses()).thenReturn(new Item[] { multiSearchResponseItem }); when(multiSearchResponseItem.getResponse()).thenReturn(searchResponse); // gson = PowerMockito.mock(Gson.class); } @SuppressWarnings("unchecked") private Object[] getFeaturesForPeriodThrowIllegalStateData() { String aggName = "aggName"; InternalTDigestPercentiles empty = mock(InternalTDigestPercentiles.class); Iterator emptyIterator = mock(Iterator.class); when(empty.iterator()).thenReturn(emptyIterator); when(emptyIterator.hasNext()).thenReturn(false); when(empty.getName()).thenReturn(aggName); MultiBucketsAggregation multiBucket = mock(MultiBucketsAggregation.class); when(multiBucket.getName()).thenReturn(aggName); return new Object[] { new Object[] { asList(empty), asList(aggName), null }, new Object[] { asList(multiBucket), asList(aggName), null }, }; } @Test @SuppressWarnings("unchecked") public void getLatestDataTime_returnExpectedToListener() { SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() .aggregation(AggregationBuilders.max(CommonName.AGG_NAME_MAX_TIME).field(detector.getTimeField())) .size(0); searchRequest.source(searchSourceBuilder); long epochTime = 100L; aggsMap.put(CommonName.AGG_NAME_MAX_TIME, max); when(max.getValue()).thenReturn((double) epochTime); doAnswer(invocation -> { ActionListener listener = invocation.getArgument(1); listener.onResponse(searchResponse); return null; }).when(client).search(eq(searchRequest), any(ActionListener.class)); when(ParseUtils.getLatestDataTime(eq(searchResponse))).thenReturn(Optional.of(epochTime)); ActionListener> listener = mock(ActionListener.class); searchFeatureDao.getLatestDataTime(detector, listener); ArgumentCaptor> captor = ArgumentCaptor.forClass(Optional.class); verify(listener).onResponse(captor.capture()); Optional result = captor.getValue(); assertEquals(epochTime, result.get().longValue()); } @Test @SuppressWarnings("unchecked") public void getFeaturesForSampledPeriods_throwToListener_whenSamplingFail() { doAnswer(invocation -> { ActionListener> listener = invocation.getArgument(3); listener.onFailure(new RuntimeException()); return null; }).when(searchFeatureDao).getFeaturesForPeriod(any(), anyLong(), anyLong(), any(ActionListener.class)); ActionListener>> listener = mock(ActionListener.class); searchFeatureDao.getFeaturesForSampledPeriods(detector, 1, 1, 0, listener); verify(listener).onFailure(any(Exception.class)); } @Test @SuppressWarnings("unchecked") public void getFeaturesForPeriod_throwToListener_whenResponseParsingFails() throws Exception { long start = 100L; long end = 200L; when(ParseUtils.generateInternalFeatureQuery(eq(detector), eq(start), eq(end), eq(xContent))).thenReturn(searchSourceBuilder); when(detector.getEnabledFeatureIds()).thenReturn(null); doAnswer(invocation -> { ActionListener listener = invocation.getArgument(1); listener.onResponse(searchResponse); return null; }).when(client).search(eq(searchRequest), any(ActionListener.class)); ActionListener> listener = mock(ActionListener.class); searchFeatureDao.getFeaturesForPeriod(detector, start, end, listener); verify(listener).onFailure(any(Exception.class)); } @Test @SuppressWarnings("unchecked") public void getFeaturesForPeriod_throwToListener_whenSearchFails() throws Exception { long start = 100L; long end = 200L; when(ParseUtils.generateInternalFeatureQuery(eq(detector), eq(start), eq(end), eq(xContent))).thenReturn(searchSourceBuilder); doAnswer(invocation -> { ActionListener listener = invocation.getArgument(1); listener.onFailure(new RuntimeException()); return null; }).when(client).search(eq(searchRequest), any(ActionListener.class)); ActionListener> listener = mock(ActionListener.class); searchFeatureDao.getFeaturesForPeriod(detector, start, end, listener); verify(listener).onFailure(any(Exception.class)); } @SuppressWarnings("unchecked") @Test public void testGetEntityMinDataTime() { // simulate response {"took":11,"timed_out":false,"_shards":{"total":1, // "successful":1,"skipped":0,"failed":0},"hits":{"max_score":null,"hits":[]}, // "aggregations":{"min_timefield":{"value":1.602211285E12, // "value_as_string":"2020-10-09T02:41:25.000Z"}, // "max_timefield":{"value":1.602348325E12,"value_as_string":"2020-10-10T16:45:25.000Z"}}} DocValueFormat dateFormat = new DocValueFormat.DateTime( DateFormatter.forPattern("strict_date_optional_time||epoch_millis"), ZoneId.of("UTC"), DateFieldMapper.Resolution.MILLISECONDS ); double earliest = 1.602211285E12; InternalMin minInternal = new InternalMin("min_timefield", earliest, dateFormat, new HashMap<>()); InternalAggregations internalAggregations = InternalAggregations.from(Arrays.asList(minInternal)); SearchHits hits = new SearchHits(new SearchHit[] {}, null, Float.NaN); SearchResponseSections searchSections = new SearchResponseSections(hits, internalAggregations, null, false, false, null, 1); SearchResponse searchResponse = new SearchResponse( searchSections, null, 1, 1, 0, 11, ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY ); doAnswer(invocation -> { SearchRequest request = invocation.getArgument(0); assertEquals(1, request.indices().length); assertTrue(detector.getIndices().contains(request.indices()[0])); AggregatorFactories.Builder aggs = request.source().aggregations(); assertEquals(1, aggs.count()); Collection factory = aggs.getAggregatorFactories(); assertTrue(!factory.isEmpty()); Iterator iterator = factory.iterator(); while (iterator.hasNext()) { assertThat(iterator.next(), anyOf(instanceOf(MaxAggregationBuilder.class), instanceOf(MinAggregationBuilder.class))); } ActionListener listener = invocation.getArgument(1); listener.onResponse(searchResponse); return null; }).when(client).search(any(SearchRequest.class), any(ActionListener.class)); ActionListener> listener = mock(ActionListener.class); Entity entity = Entity.createSingleAttributeEntity("field", "app_1"); searchFeatureDao.getMinDataTime(detector, Optional.ofNullable(entity), AnalysisType.AD, listener); ArgumentCaptor> captor = ArgumentCaptor.forClass(Optional.class); verify(listener).onResponse(captor.capture()); Optional result = captor.getValue(); assertEquals((long) earliest, result.get().longValue()); } }