/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.performanceanalyzer.metricsdb;
import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jooq.BatchBindStep;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.Record;
import org.jooq.Result;
import org.jooq.SQLDialect;
import org.jooq.Select;
import org.jooq.TableLike;
import org.jooq.exception.DataAccessException;
import org.jooq.impl.DSL;
import org.opensearch.performanceanalyzer.DBUtils;
import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector;
import org.opensearch.performanceanalyzer.commons.config.PluginSettings;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode;
import org.opensearch.performanceanalyzer.reader.Removable;
/**
* On-disk database that holds a 5 second snapshot of all metrics. We create one table per metric.
* Every row contains four aggregations and any other relevant dimensions.
*
*
Eg: CPU table |sum|avg|max|min| index|shard|role| +---+---+---+---+--------+-----+----+ |
* 5|2.5| 3| 2|sonested| 1| N/A|
*
*
RSS table |sum|avg|max|min| index|shard|role| +---+---+---+---+---------+-----+----+ | 30| 15|
* 20| 10|nyc_taxis| 1| N/A|
*/
@SuppressWarnings("serial")
public class MetricsDB implements Removable {
private static final Logger LOG = LogManager.getLogger(MetricsDB.class);
private static final String DB_FILE_PREFIX_PATH_DEFAULT = "/tmp/metricsdb_";
private static final String DB_FILE_PREFIX_PATH_CONF_NAME = "metrics-db-file-prefix-path";
private static final String DB_URL = "jdbc:sqlite:";
private final Connection conn;
private final DSLContext create;
public static final String SUM = "sum";
public static final String AVG = "avg";
public static final String MIN = "min";
public static final String MAX = "max";
public static final Set AGG_VALUES =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(SUM, AVG, MIN, MAX)));
private long windowStartTime;
public static String getDBFilePath(long windowStartTime) {
return getFilePrefix() + windowStartTime;
}
public String getDBFilePath() {
return getDBFilePath(windowStartTime);
}
public static String getFilePrefix() {
return PluginSettings.instance()
.getSettingValue(DB_FILE_PREFIX_PATH_CONF_NAME, DB_FILE_PREFIX_PATH_DEFAULT);
}
public MetricsDB(long windowStartTime) throws Exception {
this.windowStartTime = windowStartTime;
String url = DB_URL + getDBFilePath();
try {
conn = DriverManager.getConnection(url);
conn.setAutoCommit(false);
} catch (Exception e) {
StatsCollector.instance()
.logException(StatExceptionCode.READER_METRICSDB_ACCESS_ERRORS);
throw e;
}
create = DSL.using(conn, SQLDialect.SQLITE);
}
/**
* Returns a MetricsDB handle associated with an existing metricsdb file.
*
* @param windowStartTime the timestamp associated with an existing metricsdb file
* @return a MetricsDB handle associated with the metricsdb file
* @throws Exception if the metricsdb file does not exist or is invalid
*/
public static MetricsDB fetchExisting(long windowStartTime) throws Exception {
String filePath = getDBFilePath(windowStartTime);
if (!(new File(filePath)).exists()) {
StatsCollector.instance()
.logException(StatExceptionCode.READER_METRICSDB_ACCESS_ERRORS);
throw new FileNotFoundException(
String.format("MetricsDB file %s could not be found.", filePath));
}
return new MetricsDB(windowStartTime);
}
public void close() throws Exception {
conn.close();
}
public void createMetric(Metric> metric, List dimensions) {
if (DBUtils.checkIfTableExists(create, metric.getName())) {
return;
}
List> fields = DBUtils.getFieldsFromList(dimensions);
fields.add(DSL.field(SUM, metric.getValueType()));
fields.add(DSL.field(AVG, metric.getValueType()));
fields.add(DSL.field(MIN, metric.getValueType()));
fields.add(DSL.field(MAX, metric.getValueType()));
create.createTable(metric.getName()).columns(fields).execute();
}
public BatchBindStep startBatchPut(Metric> metric, List dimensions) {
List> dummyValues = new ArrayList<>();
for (String dim : dimensions) {
dummyValues.add(null);
}
// Finally add sum, avg, min, max
dummyValues.add(null);
dummyValues.add(null);
dummyValues.add(null);
dummyValues.add(null);
return create.batch(create.insertInto(DSL.table(metric.getName())).values(dummyValues));
}
public BatchBindStep startBatchPut(String tableName, int dimNum) {
if (dimNum < 1 || !DBUtils.checkIfTableExists(create, tableName)) {
throw new IllegalArgumentException(
String.format("Incorrect arguments %s, %d", tableName, dimNum));
}
List> dummyValues = new ArrayList<>(dimNum);
for (int i = 0; i < dimNum; i++) {
dummyValues.add(null);
}
return create.batch(create.insertInto(DSL.table(tableName)).values(dummyValues));
}
public void putMetric(Metric metric, Dimensions dimensions, long windowStartTime) {
create.insertInto(DSL.table(metric.getName()))
.set(DSL.field(SUM, Double.class), metric.getSum())
.set(DSL.field(AVG, Double.class), metric.getAvg())
.set(DSL.field(MIN, Double.class), metric.getMin())
.set(DSL.field(MAX, Double.class), metric.getMax())
.set(dimensions.getFieldMap())
.execute();
}
/**
* Drop a metric table. This is for IT framework to use only
*
* @param metricName metric table to be deleted
*/
@VisibleForTesting
public void deleteMetric(String metricName) {
if (DBUtils.checkIfTableExists(create, metricName)) {
create.dropTable(metricName).execute();
}
}
// We have a table per metric. We do a group by/aggregate on
// every dimension and return all the metric tables.
public List> getAggregatedMetricTables(
List metrics, List aggregations, List dimensions)
throws Exception {
List> tList = new ArrayList<>();
List> groupByFields = DBUtils.getFieldsFromList(dimensions);
for (int i = 0; i < metrics.size(); i++) {
String metric = metrics.get(i);
List> selectFields = DBUtils.getFieldsFromList(dimensions);
String aggType = aggregations.get(i);
if (aggType.equals(SUM)) {
Field field = DSL.field(SUM, Double.class);
selectFields.add(DSL.sum(field).as(metric));
} else if (aggType.equals(AVG)) {
Field field = DSL.field(AVG, Double.class);
selectFields.add(DSL.avg(field).as(metric));
} else if (aggType.equals(MIN)) {
Field field = DSL.field(MIN, Double.class);
selectFields.add(DSL.min(field).as(metric));
} else if (aggType.equals(MAX)) {
Field field = DSL.field(MAX, Double.class);
selectFields.add(DSL.max(field).as(metric));
} else {
throw new Exception("Unknown agg type");
}
if (!DBUtils.checkIfTableExists(create, metrics.get(i))) {
tList.add(null);
} else {
tList.add(
create.select(selectFields)
.from(DSL.table(metric))
.groupBy(groupByFields)
.asTable());
}
}
return tList;
}
/**
* query metrics from different tables and merge to one table.
*
* getAggregatedMetricTables returns tables like: +-----+---------+-----+ |shard|indexName|
* cpu| +-----+---------+-----+ |0 |sonested | 10| |1 |sonested | 20|
*
*
+-----+---------+-----+ |shard|indexName| rss| +-----+---------+-----+ |0 |sonested | 54|
* |2 |sonested | 47|
*
*
We select metrics from each table and union them: +-----+---------+-----+-----+
* |shard|indexName| cpu| rss| +-----+---------+-----+-----+ |0 |sonested | 10| null| |1
* |sonested | 20| null| |0 |sonested | null| 54| |2 |sonested | null| 47|
*
*
Then, we group by dimensions and return following table: +-----+---------+-----+-----+
* |shard|indexName| cpu| rss| +-----+---------+-----+-----+ |0 |sonested | 10| 54| |1 |sonested
* | 20| null| |2 |sonested | null| 47|
*
* @param metrics a list of metrics we want to query
* @param aggregations aggregation we want to use for each metric
* @param dimensions dimension we want to use for each metric
* @return result of query
* @throws Exception if one of the aggregations contains sth other than "sum", "avg", "min", and
* "max".
*/
public Result queryMetric(
List metrics, List aggregations, List dimensions)
throws Exception {
List> tList =
getAggregatedMetricTables(metrics, aggregations, dimensions);
// Join all the individual metric tables to generate the final table.
Select finalTable = null;
for (int i = 0; i < tList.size(); i++) {
TableLike metricTable = tList.get(i);
if (metricTable == null) {
LOG.info(
String.format(
"%s metric table does not exist. "
+ "Returning null for the metric/dimension.",
metrics.get(i)));
continue;
}
List> selectFields =
DBUtils.getSelectFieldsForMetricName(metrics.get(i), metrics, dimensions);
Select curTable = create.select(selectFields).from(metricTable);
if (finalTable == null) {
finalTable = curTable;
} else {
finalTable = finalTable.union(curTable);
}
}
List> allFields = DBUtils.getFieldsFromList(dimensions);
for (String metric : metrics) {
allFields.add(DSL.max(DSL.field(metric, Double.class)).as(metric));
}
List> groupByFields = DBUtils.getFieldsFromList(dimensions);
if (finalTable == null) {
return null;
}
return create.select(allFields).from(finalTable).groupBy(groupByFields).fetch();
}
/**
* Queries all the data associated with the given metric.
*
* @param metric the desired metric
* @return the result of the query
*/
public Result queryMetric(String metric) throws DataAccessException {
return create.select().from(DSL.table(metric)).fetch();
}
/**
* Queries all the data associated with a given metric.
*
* @param metric the desired metric
* @param dimensions the dimensions we want to return for the given metric
* @param limit the maximum number of records to return
* @return the result of the query
*/
public Result queryMetric(String metric, Collection dimensions, int limit)
throws DataAccessException {
if (!DBUtils.checkIfTableExists(create, metric)) {
return null;
}
if (limit < 0) {
throw new IllegalArgumentException("Limit must be non-negative");
}
List> fields = DBUtils.getFieldsFromList(dimensions);
fields.add(DSL.field(SUM, Double.class));
fields.add(DSL.field(AVG, Double.class));
fields.add(DSL.field(MIN, Double.class));
fields.add(DSL.field(MAX, Double.class));
return create.select(fields).from(DSL.table(metric)).limit(limit).fetch();
}
public void commit() throws Exception {
conn.commit();
}
@Override
public void remove() throws Exception {
conn.close();
}
/** Deletes the underlying metricsdb file. */
public void deleteOnDiskFile() {
MetricsDB.deleteOnDiskFile(windowStartTime);
}
/**
* Deletes the metricsdb file associated with the given timestamp if it exists.
*
* @param windowStartTime the timestamp associated with an existing metricsdb file
*/
public static void deleteOnDiskFile(long windowStartTime) {
Path dbFilePath = Paths.get(getDBFilePath(windowStartTime));
try {
Files.delete(dbFilePath);
} catch (IOException | SecurityException e) {
LOG.error(
"Failed to delete File - {} with ExceptionCode: {}",
dbFilePath,
StatExceptionCode.READER_METRICSDB_ACCESS_ERRORS,
e);
StatsCollector.instance()
.logException(StatExceptionCode.READER_METRICSDB_ACCESS_ERRORS);
}
}
/**
* Returns the timestamps associated with on-disk files.
*
* @return the timestamps associated with on-disk files
*/
public static Set listOnDiskFiles() {
String prefix = getFilePrefix();
Path parentPath = Paths.get(prefix).getParent();
Set found = new HashSet();
try (Stream paths = Files.list(parentPath)) {
PathMatcher matcher =
FileSystems.getDefault().getPathMatcher("regex:" + prefix + "\\d+");
int prefixLength = prefix.length();
paths.filter(matcher::matches)
.map(path -> path.toString())
.forEach(
s -> {
try {
found.add(
Long.parseUnsignedLong(s.substring(prefixLength), 10));
} catch (IndexOutOfBoundsException | NumberFormatException e) {
LOG.error("Unexpected file in metricsdb directory - {}", s);
}
});
} catch (IOException | SecurityException e) {
LOG.error(
"Failed to access metricsdb directory - {} with ExceptionCode: {}",
parentPath,
StatExceptionCode.READER_METRICSDB_ACCESS_ERRORS,
e);
StatsCollector.instance()
.logException(StatExceptionCode.READER_METRICSDB_ACCESS_ERRORS);
}
return found;
}
public DSLContext getDSLContext() {
return create;
}
public boolean metricExists(String metric) {
return DBUtils.checkIfTableExists(create, metric);
}
}