/* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ package org.opensearch.performanceanalyzer.rest; import com.google.common.annotations.VisibleForTesting; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; import java.io.IOException; import java.io.OutputStream; import java.net.HttpURLConnection; import java.security.InvalidParameterException; import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.NavigableSet; import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jooq.Record; import org.jooq.Result; import org.jooq.exception.DataAccessException; import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector; import org.opensearch.performanceanalyzer.commons.config.PluginSettings; import org.opensearch.performanceanalyzer.commons.stats.ServiceMetrics; import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode; import org.opensearch.performanceanalyzer.metrics.MetricsRestUtil; import org.opensearch.performanceanalyzer.metricsdb.MetricsDB; import org.opensearch.performanceanalyzer.model.MetricsModel; import org.opensearch.performanceanalyzer.net.NetClient; import org.opensearch.performanceanalyzer.rca.framework.metrics.ReaderMetrics; import org.opensearch.performanceanalyzer.reader.ReaderMetricsProcessor; /** * Request handler that supports querying batch metrics from an EC2 instance * *

Return 1 minute of CPU_Utilization metrics sampled at a 5s sampling period: * "http://localhost:9600/_plugins/_performanceanalyzer/batch?metrics=CPU_Utilization&starttime=1566413975000&endtime=1566413980000" * *

Return 1 minute of CPU_Utilization and Latency metrics sampled at a 10s sampling period: * "http://localhost:9600/_plugins/_performanceanalyzer/batch?metrics=CPU_Utilization,Latency&starttime=1566413975000&endtime=1566413980000&samplingperiod=10" * *

Return format: { "1594412650000": { "CPU_Utilization": { "fields": [ { "name: "IndexName", * "type": "VARCHAR" }, <...> ] "records": [ [ "pmc", <...> ], <...> ] } } } */ public class QueryBatchRequestHandler extends MetricsHandler implements HttpHandler { private static final Logger LOG = LogManager.getLogger(QueryBatchRequestHandler.class); private static final int TIME_OUT_VALUE = 2; private static final TimeUnit TIME_OUT_UNIT = TimeUnit.SECONDS; private NetClient netClient; MetricsRestUtil metricsRestUtil; public static final int DEFAULT_MAX_DATAPOINTS = 100800; // Must be non-negative public static final long DEFAULT_SAMPLING_PERIOD_MILLIS = 5000; // Must be a multiple of 5000 public QueryBatchRequestHandler(NetClient netClient, MetricsRestUtil metricsRestUtil) { this.netClient = netClient; this.metricsRestUtil = metricsRestUtil; } @Override public void handle(HttpExchange exchange) throws IOException { String requestMethod = exchange.getRequestMethod(); if (!requestMethod.equalsIgnoreCase("GET")) { exchange.sendResponseHeaders(HttpURLConnection.HTTP_NOT_FOUND, -1); exchange.close(); return; } ReaderMetricsProcessor mp = ReaderMetricsProcessor.getInstance(); if (mp == null) { sendResponse( exchange, "{\"error\":\"Metrics Processor is not initialized. The reader has run into an issue or has just started.\"}", HttpURLConnection.HTTP_UNAVAILABLE); LOG.warn( "Metrics Processor is not initialized. The reader has run into an issue or has just started."); return; } NavigableSet batchMetrics = mp.getBatchMetrics(); long currentTime = System.currentTimeMillis(); if (batchMetrics == null) { sendResponse( exchange, "{\"error\":\"The batch metrics api has not been enabled for this node.\"}", HttpURLConnection.HTTP_UNAVAILABLE); LOG.warn("The batch metrics api has not been enabled for this node."); return; } if (batchMetrics.isEmpty()) { sendResponse( exchange, "{\"error\":\"There are no metrics databases. The reader has run into an issue or has just started.\"}", HttpURLConnection.HTTP_UNAVAILABLE); LOG.warn( "There are no metrics databases. The reader has run into an issue or has just started."); return; } exchange.getResponseHeaders().set("Content-Type", "application/json"); Map params = getParamsMap(exchange.getRequestURI().getQuery()); try { // Parse and validate parameters String[] validParamsTmp = {"", "metrics", "starttime", "endtime", "samplingperiod"}; Set validParams = new HashSet<>(Arrays.asList(validParamsTmp)); for (String param : params.keySet()) { if (!validParams.contains(param)) { throw new InvalidParameterException( String.format("%s is an invalid parameter", param)); } } List metrics = metricsRestUtil.parseArrayParam(params, "metrics", false); String startTimeParam = params.get("starttime"); String endTimeParam = params.get("endtime"); String samplingPeriodParam = params.get("samplingperiod"); for (String metric : metrics) { if (!MetricsModel.ALL_METRICS.containsKey(metric)) { throw new InvalidParameterException( String.format("%s is an invalid metric", metric)); } } if (startTimeParam == null || startTimeParam.isEmpty()) { throw new InvalidParameterException("starttime parameter must be set"); } long startTime; try { startTime = Long.parseUnsignedLong(startTimeParam); } catch (NumberFormatException e) { throw new InvalidParameterException( String.format("%s is an invalid starttime", startTimeParam)); } if (endTimeParam == null || endTimeParam.isEmpty()) { throw new InvalidParameterException("endtime parameter must be set"); } long endTime; try { endTime = Long.parseUnsignedLong(endTimeParam); } catch (NumberFormatException e) { throw new InvalidParameterException( String.format("%s is an invalid endtime", endTimeParam)); } long samplingPeriod = DEFAULT_SAMPLING_PERIOD_MILLIS; if (samplingPeriodParam != null && !samplingPeriodParam.isEmpty()) { samplingPeriod = Long.parseLong(samplingPeriodParam); if (samplingPeriod < 5 || samplingPeriod % 5 != 0) { throw new InvalidParameterException( String.format("%s is an invalid sampling period", samplingPeriodParam)); } if (samplingPeriod >= PluginSettings.instance().getBatchMetricsRetentionPeriodMinutes() * 60) { throw new InvalidParameterException( "sampling period must be less than the retention period"); } samplingPeriod *= 1000; } if (startTime >= endTime) { throw new InvalidParameterException("starttime must be less than the endtime"); } startTime -= startTime % samplingPeriod; endTime -= endTime % samplingPeriod; if (startTime == endTime) { throw new InvalidParameterException( "starttime and endtime must be at least one sampling period apart"); } if (endTime > currentTime) { throw new InvalidParameterException( "endtime can be no greater than the system time at the node"); } if (startTime < currentTime - PluginSettings.instance().getBatchMetricsRetentionPeriodMinutes() * 60 * 1000) { throw new InvalidParameterException( "starttime must be within the retention period"); } long processingStartTime = System.currentTimeMillis(); String queryResponse = queryFromBatchMetrics( batchMetrics, metrics, startTime, endTime, samplingPeriod, DEFAULT_MAX_DATAPOINTS); ServiceMetrics.READER_METRICS_AGGREGATOR.updateStat( ReaderMetrics.BATCH_METRICS_QUERY_PROCESSING_TIME, System.currentTimeMillis() - processingStartTime); ServiceMetrics.READER_METRICS_AGGREGATOR.updateStat( ReaderMetrics.BATCH_METRICS_HTTP_SUCCESS, 1); sendResponse(exchange, queryResponse, HttpURLConnection.HTTP_OK); } catch (DataAccessException e) { StatsCollector.instance() .logException(StatExceptionCode.READER_METRICSDB_ACCESS_ERRORS); LOG.error( "QueryException {} ExceptionCode: {}.", e, StatExceptionCode.BATCH_METRICS_HTTP_HOST_ERROR, e); StatsCollector.instance().logException(StatExceptionCode.BATCH_METRICS_HTTP_HOST_ERROR); String response = "{\"error\":\"" + e.toString() + "\"}"; sendResponse(exchange, response, HttpURLConnection.HTTP_INTERNAL_ERROR); } catch (InvalidParameterException e) { LOG.error( "QueryException {} ExceptionCode: {}.", e, StatExceptionCode.BATCH_METRICS_HTTP_CLIENT_ERROR, e); StatsCollector.instance() .logException(StatExceptionCode.BATCH_METRICS_HTTP_CLIENT_ERROR); String response = "{\"error\":\"" + e.getMessage() + ".\"}"; sendResponse(exchange, response, HttpURLConnection.HTTP_BAD_REQUEST); } catch (Exception e) { LOG.error( "QueryException {} ExceptionCode: {}.", e, StatExceptionCode.BATCH_METRICS_HTTP_HOST_ERROR, e); StatsCollector.instance().logException(StatExceptionCode.BATCH_METRICS_HTTP_HOST_ERROR); String response = "{\"error\":\"" + e.toString() + "\"}"; sendResponse(exchange, response, HttpURLConnection.HTTP_INTERNAL_ERROR); } } private int appendMetrics( Long timestamp, List metrics, StringBuilder builder, int maxDatapoints) throws Exception { maxDatapoints += 1; builder.append("\""); builder.append(timestamp); builder.append("\":{"); MetricsDB db = MetricsDB.fetchExisting(timestamp); for (int metricIndex = 0, numMetrics = metrics.size(); metricIndex < numMetrics; metricIndex++) { String metric = metrics.get(metricIndex); Result results = db.queryMetric( metric, MetricsModel.ALL_METRICS.get(metric).dimensionNames, maxDatapoints); if (results != null) { maxDatapoints -= results.size(); if (maxDatapoints <= 0) { StatsCollector.instance() .logException(StatExceptionCode.BATCH_METRICS_EXCEEDED_MAX_DATAPOINTS); throw new InvalidParameterException( String.format( "requested data exceeds the %d datapoints limit", DEFAULT_MAX_DATAPOINTS)); } builder.append("\""); builder.append(metric); builder.append("\":"); builder.append(results.formatJSON()); for (metricIndex += 1; metricIndex < numMetrics; metricIndex++) { metric = metrics.get(metricIndex); results = db.queryMetric( metric, MetricsModel.ALL_METRICS.get(metric).dimensionNames, maxDatapoints); if (results != null) { maxDatapoints -= results.size(); if (maxDatapoints <= 0) { StatsCollector.instance() .logException( StatExceptionCode .BATCH_METRICS_EXCEEDED_MAX_DATAPOINTS); throw new InvalidParameterException( String.format( "requested data exceeds the %d datapoints limit", DEFAULT_MAX_DATAPOINTS)); } builder.append(",\""); builder.append(metric); builder.append("\":"); builder.append(results.formatJSON()); } } } } builder.append("}"); db.remove(); return maxDatapoints - 1; } /** * Requires non-empty batchMetrics, valid non-empty metrics, valid startTime, valid endTime, * valid samplingPeriod (in milliseconds), and non-negative maxDatapoints. */ private String queryFromBatchMetrics( NavigableSet batchMetrics, List metrics, long startTime, long endTime, long samplingPeriod, int maxDatapoints) throws Exception { StringBuilder responseJson = new StringBuilder(); responseJson.append("{"); Long metricsTimestamp = batchMetrics.ceiling(startTime); if (metricsTimestamp != null && metricsTimestamp < endTime) { maxDatapoints = appendMetrics(metricsTimestamp, metrics, responseJson, maxDatapoints); metricsTimestamp = metricsTimestamp - metricsTimestamp % samplingPeriod + samplingPeriod; metricsTimestamp = batchMetrics.ceiling(metricsTimestamp); while (metricsTimestamp != null && metricsTimestamp < endTime) { responseJson.append(","); maxDatapoints = appendMetrics(metricsTimestamp, metrics, responseJson, maxDatapoints); metricsTimestamp = metricsTimestamp - metricsTimestamp % samplingPeriod + samplingPeriod; metricsTimestamp = batchMetrics.ceiling(metricsTimestamp); } } responseJson.append("}"); return responseJson.toString(); } private void sendResponse(HttpExchange exchange, String response, int status) throws IOException { try (OutputStream os = exchange.getResponseBody()) { exchange.sendResponseHeaders(status, response.length()); os.write(response.getBytes()); } catch (Exception e) { response = e.toString(); exchange.sendResponseHeaders(HttpURLConnection.HTTP_INTERNAL_ERROR, response.length()); } } @VisibleForTesting public String queryFromBatchMetricsShim( NavigableSet batchMetrics, List metrics, long startTime, long endTime, long samplingPeriod, int maxDatapoints) throws Exception { return queryFromBatchMetrics( batchMetrics, metrics, startTime, endTime, samplingPeriod, maxDatapoints); } @VisibleForTesting public int appendMetricsShim( Long timestamp, List metrics, StringBuilder builder, int maxDatapoints) throws Exception { return appendMetrics(timestamp, metrics, builder, maxDatapoints); } }