/*
 * Copyright OpenSearch Contributors
 * SPDX-License-Identifier: Apache-2.0
 */

package org.opensearch.performanceanalyzer.reader;

import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.closeTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.jooq.BatchBindStep;
import org.jooq.DSLContext;
import org.jooq.Record;
import org.jooq.Result;
import org.jooq.SQLDialect;
import org.jooq.impl.DSL;
import org.junit.Assert;
import org.junit.Test;
import org.opensearch.performanceanalyzer.commons.metrics.AllMetrics;
import org.opensearch.performanceanalyzer.commons.metrics.PerformanceAnalyzerMetrics;
import org.opensearch.performanceanalyzer.config.TroubleshootingConfig;
import org.opensearch.performanceanalyzer.metricsdb.Dimensions;
import org.opensearch.performanceanalyzer.metricsdb.MetricsDB;
import org.powermock.api.mockito.PowerMockito;

// @PowerMockIgnore({ "org.apache.logging.log4j.*" })
// @RunWith(PowerMockRunner.class)
// @PrepareForTest({ TroubleshootingConfig.class })
public class MetricsEmitterTests extends AbstractReaderTests {
    public MetricsEmitterTests() throws SQLException, ClassNotFoundException {
        super();
        // TODO Auto-generated constructor stub
    }

    private static final String DB_URL = "jdbc:sqlite:";

    @Test
    public void testMetricsEmitter() throws Exception {
        //
        Connection conn = DriverManager.getConnection(DB_URL);
        ShardRequestMetricsSnapshot rqMetricsSnap =
                new ShardRequestMetricsSnapshot(conn, 1535065195000L);
        Map<String, String> dimensions = new HashMap<>();
        dimensions.put(ShardRequestMetricsSnapshot.Fields.INDEX_NAME.toString(), "ac-test");
        dimensions.put(ShardRequestMetricsSnapshot.Fields.SHARD_ID.toString(), "1");
        dimensions.put(ShardRequestMetricsSnapshot.Fields.OPERATION.toString(), "shardBulk");
        dimensions.put(ShardRequestMetricsSnapshot.Fields.SHARD_ROLE.toString(), "primary");
        dimensions.put("tid", "1");
        dimensions.put("rid", "1");
        rqMetricsSnap.putStartMetric(1535065196120L, dimensions);
        rqMetricsSnap.putEndMetric(1535065196323L, dimensions);
        dimensions.put("rid", "2");
        dimensions.put(ShardRequestMetricsSnapshot.Fields.OPERATION.toString(), "shardSearch");
        rqMetricsSnap.putStartMetric(1535065197323L, dimensions);
        dimensions.put("rid", "3");
        dimensions.put("tid", "2");
        rqMetricsSnap.putStartMetric(1535065198323L, dimensions);
        rqMetricsSnap.putEndMetric(1535065199923L, dimensions);
        Result<Record> res = rqMetricsSnap.fetchThreadUtilizationRatio();
        Float tUtil = Float.parseFloat(res.get(0).get("tUtil").toString());
        assertEquals(0.07048611f, tUtil.floatValue(), 0);

        OSMetricsSnapshot osMetricsSnap = new OSMetricsSnapshot(conn, 1L);
        // Create OSMetricsSnapshot
        Map<String, Double> metrics = new HashMap<>();
        Map<String, String> osDim = new HashMap<>();
        osDim.put("tid", "1");
        osDim.put("tName", "opensearch[E-C7clp][search][T#1]");
        metrics.put(AllMetrics.OSMetrics.CPU_UTILIZATION.toString(), 2.3333d);
        metrics.put(AllMetrics.OSMetrics.PAGING_RSS.toString(), 3.63d);
        osMetricsSnap.putMetric(metrics, osDim, 1L);
        osDim.put("tid", "2");
        osDim.put("tName", "opensearch[E-C7clp][bulk][T#2]");
        metrics.put(AllMetrics.OSMetrics.CPU_UTILIZATION.toString(), 3.3333d);
        metrics.put(AllMetrics.OSMetrics.PAGING_RSS.toString(), 1.63d);
        osMetricsSnap.putMetric(metrics, osDim, 1L);
        osDim.put("tid", "3");
        osDim.put("tName", "GC");
        metrics.put(AllMetrics.OSMetrics.CPU_UTILIZATION.toString(), 3.3333d);
        metrics.put(AllMetrics.OSMetrics.PAGING_RSS.toString(), 1.63d);
        osMetricsSnap.putMetric(metrics, osDim, 1L);

        DSLContext create = DSL.using(conn, SQLDialect.SQLITE);
        MetricsDB db = new MetricsDB(1553713402);
        MetricsEmitter.emitAggregatedOSMetrics(create, db, osMetricsSnap, rqMetricsSnap);
        res =
                db.queryMetric(
                        Arrays.asList(
                                AllMetrics.OSMetrics.PAGING_RSS.toString(),
                                AllMetrics.OSMetrics.CPU_UTILIZATION.toString()),
                        Arrays.asList("sum", "sum"),
                        Arrays.asList(
                                ShardRequestMetricsSnapshot.Fields.SHARD_ID.toString(),
                                ShardRequestMetricsSnapshot.Fields.INDEX_NAME.toString(),
                                ShardRequestMetricsSnapshot.Fields.OPERATION.toString()));

        Double cpu =
                Double.parseDouble(
                        res.get(0).get(AllMetrics.OSMetrics.CPU_UTILIZATION.toString()).toString());
        db.remove();
        assertEquals(0.164465243055556d, cpu.doubleValue(), 0);
    }

    @Test(expected = Exception.class)
    public void testMetricsEmitterInvalidData() throws Exception {
        //
        PowerMockito.mockStatic(TroubleshootingConfig.class);
        PowerMockito.when(TroubleshootingConfig.getEnableDevAssert()).thenReturn(true);

        Connection conn = DriverManager.getConnection(DB_URL);
        ShardRequestMetricsSnapshot rqMetricsSnap =
                new ShardRequestMetricsSnapshot(conn, 1535065195000L);
        Map<String, String> dimensions = new HashMap<>();
        dimensions.put(ShardRequestMetricsSnapshot.Fields.INDEX_NAME.toString(), "ac-test");
        dimensions.put(ShardRequestMetricsSnapshot.Fields.SHARD_ID.toString(), "1");
        dimensions.put(ShardRequestMetricsSnapshot.Fields.OPERATION.toString(), "shardBulk");
        dimensions.put(ShardRequestMetricsSnapshot.Fields.SHARD_ROLE.toString(), "primary");
        dimensions.put("tid", "1");
        dimensions.put("rid", "1");
        rqMetricsSnap.putStartMetric(1535065196120L, dimensions);
        rqMetricsSnap.putEndMetric(1535065196323L, dimensions);
        dimensions.put("rid", "2");
        dimensions.put(ShardRequestMetricsSnapshot.Fields.OPERATION.toString(), "shardSearch");
        rqMetricsSnap.putStartMetric(1535065197323L, dimensions);
        dimensions.put("rid", "3");
        dimensions.put("tid", "2");
        rqMetricsSnap.putStartMetric(1535065198323L, dimensions);
        rqMetricsSnap.putEndMetric(1535065199923L, dimensions);
        Result<Record> res = rqMetricsSnap.fetchThreadUtilizationRatio();
        Float tUtil = Float.parseFloat(res.get(0).get("tUtil").toString());
        assertEquals(0.07048611f, tUtil.floatValue(), 0);

        OSMetricsSnapshot osMetricsSnap = new OSMetricsSnapshot(conn, 1L);
        // Create OSMetricsSnapshot
        Map<String, Double> metrics = new HashMap<>();
        Map<String, String> osDim = new HashMap<>();
        osDim.put("tid", "1");
        osDim.put("tName", "opensearch[E-C7clp][search][T#1]");
        metrics.put(AllMetrics.OSMetrics.CPU_UTILIZATION.toString(), 2.3333d);
        metrics.put(AllMetrics.OSMetrics.PAGING_RSS.toString(), 3.63d);
        osMetricsSnap.putMetric(metrics, osDim, 1L);
        osDim.put("tid", "2");
        osDim.put("tName", "GC thread");
        metrics.put(AllMetrics.OSMetrics.CPU_UTILIZATION.toString(), 3.3333d);
        metrics.put(AllMetrics.OSMetrics.PAGING_RSS.toString(), 1.63d);
        osMetricsSnap.putMetric(metrics, osDim, 1L);
        osDim.put("tid", "3");
        osDim.put("tName", "GC");
        metrics.put(AllMetrics.OSMetrics.CPU_UTILIZATION.toString(), 3.3333d);
        metrics.put(AllMetrics.OSMetrics.PAGING_RSS.toString(), 1.63d);
        osMetricsSnap.putMetric(metrics, osDim, 1L);

        DSLContext create = DSL.using(conn, SQLDialect.SQLITE);
        MetricsDB db = new MetricsDB(1553713410);
        MetricsEmitter.emitAggregatedOSMetrics(create, db, osMetricsSnap, rqMetricsSnap);
        res =
                db.queryMetric(
                        Arrays.asList(
                                AllMetrics.OSMetrics.PAGING_RSS.toString(),
                                AllMetrics.OSMetrics.CPU_UTILIZATION.toString()),
                        Arrays.asList("sum", "sum"),
                        Arrays.asList(
                                ShardRequestMetricsSnapshot.Fields.SHARD_ID.toString(),
                                ShardRequestMetricsSnapshot.Fields.INDEX_NAME.toString(),
                                ShardRequestMetricsSnapshot.Fields.OPERATION.toString()));
        db.remove();
    }

    @Test
    public void testHttpMetricsEmitter() throws Exception {
        Connection conn = DriverManager.getConnection(DB_URL);
        HttpRequestMetricsSnapshot rqMetricsSnap = new HttpRequestMetricsSnapshot(conn, 1L);
        Map<String, String> dimensions = new HashMap<>();
        dimensions.put(HttpRequestMetricsSnapshot.Fields.OPERATION.toString(), "search");
        dimensions.put(HttpRequestMetricsSnapshot.Fields.HTTP_RESP_CODE.toString(), "200");
        dimensions.put(HttpRequestMetricsSnapshot.Fields.INDICES.toString(), "");
        dimensions.put(HttpRequestMetricsSnapshot.Fields.EXCEPTION.toString(), "");
        dimensions.put("rid", "1");
        rqMetricsSnap.putStartMetric(12345L, 0L, dimensions);
        rqMetricsSnap.putEndMetric(33325L, dimensions);
        dimensions.put("rid", "2");
        dimensions.put(HttpRequestMetricsSnapshot.Fields.OPERATION.toString(), "search");
        rqMetricsSnap.putStartMetric(22245L, 0L, dimensions);
        dimensions.put("rid", "3");
        rqMetricsSnap.putStartMetric(10000L, 0L, dimensions);
        rqMetricsSnap.putEndMetric(30000L, dimensions);

        DSLContext create = DSL.using(conn, SQLDialect.SQLITE);
        MetricsDB db = new MetricsDB(1553713438);
        MetricsEmitter.emitHttpMetrics(create, db, rqMetricsSnap);
        Result<Record> res =
                db.queryMetric(
                        Arrays.asList(
                                AllMetrics.CommonMetric.LATENCY.toString(),
                                AllMetrics.HttpMetric.HTTP_TOTAL_REQUESTS.toString()),
                        Arrays.asList("avg", "sum"),
                        Arrays.asList(HttpRequestMetricsSnapshot.Fields.OPERATION.toString()));

        Float latency =
                Float.parseFloat(
                        res.get(0).get(AllMetrics.CommonMetric.LATENCY.toString()).toString());
        db.remove();
        assertEquals(20490.0f, latency.floatValue(), 0);
    }

    @Test
    public void testWorkloadMetricsEmitter() throws Exception {
        Connection conn = DriverManager.getConnection(DB_URL);
        ShardRequestMetricsSnapshot rqMetricsSnap =
                new ShardRequestMetricsSnapshot(conn, 1535065195000L);
        BatchBindStep handle = rqMetricsSnap.startBatchPut();
        handle.bind(
                "shardId",
                "indexName",
                "1",
                "threadId",
                "operation",
                "primary",
                1535065195000L,
                null,
                10);
        handle.bind(
                "shardId",
                "indexName",
                "1",
                "threadId",
                "operation",
                "primary",
                null,
                1535065196000L,
                null);
        handle.bind(
                "shardId",
                "indexName",
                "2",
                "threadId",
                "operation",
                "primary",
                1535065197000L,
                null,
                10);
        handle.bind(
                "shardId",
                "indexName",
                "2",
                "threadId",
                "operation",
                "primary",
                null,
                1535065198000L,
                null);
        handle.execute();

        System.out.println(rqMetricsSnap.fetchAll());
        System.out.println(rqMetricsSnap.fetchLatencyByOp());

        DSLContext create = DSL.using(conn, SQLDialect.SQLITE);
        MetricsDB db = new MetricsDB(1553713445);
        MetricsEmitter.emitWorkloadMetrics(create, db, rqMetricsSnap);
        Result<Record> res =
                db.queryMetric(
                        Arrays.asList(
                                AllMetrics.ShardBulkMetric.DOC_COUNT.toString(),
                                AllMetrics.ShardOperationMetric.SHARD_OP_COUNT.toString()),
                        Arrays.asList("sum", "sum"),
                        Arrays.asList(HttpRequestMetricsSnapshot.Fields.OPERATION.toString()));

        Double bulkDocs =
                Double.parseDouble(
                        res.get(0).get(AllMetrics.ShardBulkMetric.DOC_COUNT.toString()).toString());
        Double shardOps =
                Double.parseDouble(
                        res.get(0)
                                .get(AllMetrics.ShardOperationMetric.SHARD_OP_COUNT.toString())
                                .toString());
        db.remove();
        assertEquals(20.0d, bulkDocs.doubleValue(), 0);
        assertEquals(2d, shardOps.doubleValue(), 0);
    }

    @Test
    public void testWorkloadMetricsEmitterDoNothing() throws Exception {
        Connection conn = DriverManager.getConnection(DB_URL);
        ShardRequestMetricsSnapshot rqMetricsSnap =
                new ShardRequestMetricsSnapshot(conn, 1535065195000L);
        DSLContext create = DSL.using(conn, SQLDialect.SQLITE);
        MetricsDB db = new MetricsDB(1553713492);
        MetricsEmitter.emitWorkloadMetrics(create, db, rqMetricsSnap);
        System.out.println(rqMetricsSnap.fetchAll());
        db.remove();
        assertEquals(0, rqMetricsSnap.fetchAll().size());
    }

    @Test
    public void testExtractor() {
        String check = "abc: 2\nbbc:\ncbc:21\n";
        Assert.assertEquals(" 2", PerformanceAnalyzerMetrics.extractMetricValue(check, "abc"));
        assertEquals("", PerformanceAnalyzerMetrics.extractMetricValue(check, "bbc"));
        assertEquals("21", PerformanceAnalyzerMetrics.extractMetricValue(check, "cbc"));
    }

    @Test
    public void testThreadNameCategorization() {
        Dimensions dimensions = new Dimensions();
        assertEquals(
                "GC",
                MetricsEmitter.categorizeThreadName(
                        "Gang worker#0 (Parallel GC Threads)", dimensions));
        assertEquals(
                "search",
                MetricsEmitter.categorizeThreadName(
                        "opensearch[I9AByra][search][T#4]", dimensions));
        assertEquals(
                "refresh",
                MetricsEmitter.categorizeThreadName(
                        "opensearch[I9AByra][refresh][T#1]", dimensions));
        assertEquals(
                "merge",
                MetricsEmitter.categorizeThreadName(
                        "opensearch[I9AByra][[nyc_taxis][1]: Lucene Merge", dimensions));
        assertEquals(
                "management",
                MetricsEmitter.categorizeThreadName("opensearch[I9AByra][management]", dimensions));
        assertEquals(
                "search",
                MetricsEmitter.categorizeThreadName("opensearch[I9AByra][search]", dimensions));
        assertEquals(
                "write",
                MetricsEmitter.categorizeThreadName("opensearch[I9AByra][bulk]", dimensions));
        assertEquals("other", MetricsEmitter.categorizeThreadName("Top thread random", dimensions));
    }

    @Test
    public void testEmitNodeMetrics() throws Exception {
        Connection conn = DriverManager.getConnection(DB_URL);

        MemoryDBSnapshot tcpSnap =
                new MemoryDBSnapshot(conn, AllMetrics.MetricName.TCP_METRICS, 5001L);

        long lastUpdatedTime = 2000L;
        tcpSnap.setLastUpdatedTime(lastUpdatedTime);

        Object[][] values = {
            {"0000000000000000FFFF0000E03DD40A", 24, 0, 0, 0, 7, 1},
            {"0000000000000000FFFF00006733D40A", 23, 0, 0, 0, 6, 1},
            {"0000000000000000FFFF00000100007F", 24, 0, 0, 0, 10, -1},
            {"0000000000000000FFFF00005432D40A", 23, 0, 0, 0, 8, 5},
            {"00000000000000000000000000000000", 4, 0, 0, 0, 10, 0},
            {"0000000000000000FFFF0000F134D40A", 23, 0, 0, 0, 8, 0}
        };

        tcpSnap.insertMultiRows(values);

        DSLContext create = DSL.using(conn, SQLDialect.SQLITE);
        MetricsDB db = new MetricsDB(1553713499);
        MetricsEmitter.emitNodeMetrics(create, db, tcpSnap);
        Result<Record> res =
                db.queryMetric(
                        Arrays.asList(
                                AllMetrics.TCPValue.Net_TCP_NUM_FLOWS.toString(),
                                AllMetrics.TCPValue.Net_TCP_SSTHRESH.toString()),
                        Arrays.asList("sum", "avg"),
                        Arrays.asList(AllMetrics.TCPDimension.DEST_ADDR.toString()));

        assertTrue(6 == res.size());

        for (int i = 0; i < 6; i++) {
            Record record0 = res.get(i);
            Double numFlows =
                    Double.parseDouble(
                            record0.get(AllMetrics.TCPValue.Net_TCP_NUM_FLOWS.toString())
                                    .toString());

            assertThat(
                    numFlows.doubleValue(),
                    anyOf(closeTo(24, 0.001), closeTo(23, 0.001), closeTo(4, 0.001)));

            Double ssThresh =
                    Double.parseDouble(
                            record0.get(AllMetrics.TCPValue.Net_TCP_SSTHRESH.toString())
                                    .toString());

            assertThat(
                    ssThresh.doubleValue(),
                    anyOf(
                            closeTo(1, 0.001),
                            closeTo(-1, 0.001),
                            closeTo(5, 0.001),
                            closeTo(0, 0.001)));
        }
        db.remove();
    }

    @Test
    public void testFaultDetectionMetricsEmitter() throws Exception {
        Connection conn = DriverManager.getConnection(DB_URL);
        FaultDetectionMetricsSnapshot faultDetectionMetricsSnapshot =
                new FaultDetectionMetricsSnapshot(conn, 1L);
        Map<String, String> dimensions = new HashMap<>();
        dimensions.put(
                AllMetrics.FaultDetectionDimension.SOURCE_NODE_ID.toString(), "sourceNodeId");
        dimensions.put(
                AllMetrics.FaultDetectionDimension.TARGET_NODE_ID.toString(), "targetNodeId");
        dimensions.put(
                FaultDetectionMetricsSnapshot.Fields.FAULT_DETECTION_TYPE.toString(),
                "follower_check");
        dimensions.put(FaultDetectionMetricsSnapshot.Fields.RID.toString(), "1");
        faultDetectionMetricsSnapshot.putStartMetric(12345L, dimensions);
        faultDetectionMetricsSnapshot.putEndMetric(33325L, 0, dimensions);

        dimensions.put(FaultDetectionMetricsSnapshot.Fields.RID.toString(), "2");
        faultDetectionMetricsSnapshot.putStartMetric(22245L, dimensions);

        dimensions.put(FaultDetectionMetricsSnapshot.Fields.RID.toString(), "3");
        faultDetectionMetricsSnapshot.putStartMetric(10000L, dimensions);
        faultDetectionMetricsSnapshot.putEndMetric(30000L, 1, dimensions);

        MetricsDB db = new MetricsDB(1553713438);
        MetricsEmitter.emitFaultDetectionMetrics(db, faultDetectionMetricsSnapshot);
        Result<Record> res =
                db.queryMetric(
                        Arrays.asList(
                                AllMetrics.FaultDetectionMetric.FOLLOWER_CHECK_LATENCY.toString(),
                                AllMetrics.FaultDetectionMetric.FOLLOWER_CHECK_FAILURE.toString()),
                        Arrays.asList("avg", "sum"),
                        Arrays.asList(
                                AllMetrics.FaultDetectionDimension.SOURCE_NODE_ID.toString()));

        Float latency =
                Float.parseFloat(
                        res.get(0)
                                .get(
                                        AllMetrics.FaultDetectionMetric.FOLLOWER_CHECK_LATENCY
                                                .toString())
                                .toString());
        db.remove();
        assertEquals(20490.0f, latency.floatValue(), 0);
    }

    public void testShardStateMetricsEmitter() throws Exception {
        Connection conn = DriverManager.getConnection(DB_URL);
        ShardStateMetricsSnapshot shardStateMetricsSnapshot =
                new ShardStateMetricsSnapshot(conn, 1L);
        Map<String, String> dimensions = new HashMap<>();
        dimensions.put(AllMetrics.ShardStateDimension.INDEX_NAME.toString(), "indexName");
        dimensions.put(AllMetrics.ShardStateDimension.SHARD_ID.toString(), "shardId");
        dimensions.put(AllMetrics.ShardStateDimension.SHARD_TYPE.toString(), "p");
        dimensions.put(AllMetrics.ShardStateDimension.NODE_NAME.toString(), "nodeName");

        shardStateMetricsSnapshot.putMetrics("Unassigned", dimensions);
        MetricsDB db = new MetricsDB(1553713438);
        MetricsEmitter.emitShardStateMetric(db, shardStateMetricsSnapshot);

        Result<Record> res = db.queryMetric(AllMetrics.ShardStateValue.SHARD_STATE.toString());

        String shard_state =
                res.get(0).get(AllMetrics.ShardStateValue.SHARD_STATE.toString()).toString();
        db.remove();
        assertEquals("Unassigned", shard_state);
    }

    @Test
    public void testEmitGCTypeMetric() throws Exception {
        Connection conn = DriverManager.getConnection(DB_URL);
        final String memPool = "testMemPool";
        final String collectorName = "testCollectorName";
        long currTime = System.currentTimeMillis();

        GarbageCollectorInfoSnapshot gcSnap = new GarbageCollectorInfoSnapshot(conn, currTime);
        BatchBindStep handle = gcSnap.startBatchPut();
        Object[] bindVals = new Object[2];
        bindVals[0] = memPool;
        bindVals[1] = collectorName;
        handle.bind(bindVals).execute();

        MetricsDB metricsDB = new MetricsDB(currTime);
        MetricsEmitter.emitGarbageCollectionInfo(metricsDB, gcSnap);

        Result<Record> result =
                metricsDB.queryMetric(AllMetrics.GCInfoValue.GARBAGE_COLLECTOR_TYPE.toString());

        assertEquals(1, result.size());
        Assert.assertEquals(
                memPool, result.get(0).get(AllMetrics.GCInfoDimension.MEMORY_POOL.getField()));
        Assert.assertEquals(
                collectorName,
                result.get(0).get(AllMetrics.GCInfoDimension.COLLECTOR_NAME.getField()));
    }

    @Test
    public void testEmitAdmissionControlMetric() throws Exception {
        Connection connection = DriverManager.getConnection(DB_URL);
        String testController = "testController";
        String testRejectionCount = "1";
        long currentTimeMillis = System.currentTimeMillis();

        AdmissionControlSnapshot snapshot =
                new AdmissionControlSnapshot(connection, currentTimeMillis);
        BatchBindStep handle = snapshot.startBatchPut();
        handle.bind(testController, Long.parseLong(testRejectionCount)).execute();

        MetricsDB metricsDB = new MetricsDB(currentTimeMillis);
        MetricsEmitter.emitAdmissionControlMetrics(metricsDB, snapshot);

        Result<Record> result =
                metricsDB.queryMetric(AllMetrics.AdmissionControlValue.REJECTION_COUNT.toString());
        assertEquals(1, result.size());
    }

    @Test
    public void testClusterManagerThrottlingMetricsEmitter() throws Exception {
        Connection conn = DriverManager.getConnection(DB_URL);
        ClusterManagerThrottlingMetricsSnapshot clusterManagerThrottlingMetricsSnapshot =
                new ClusterManagerThrottlingMetricsSnapshot(conn, 1L);
        Map<String, String> dimensions = new HashMap<>();

        clusterManagerThrottlingMetricsSnapshot.putMetrics(1, dimensions);
        MetricsDB db = new MetricsDB(1553713438);
        MetricsEmitter.emitClusterManagerThrottledTaskMetric(
                db, clusterManagerThrottlingMetricsSnapshot);

        Result<Record> res =
                db.queryMetric(
                        Arrays.asList(
                                AllMetrics.ClusterManagerThrottlingValue.DATA_RETRYING_TASK_COUNT
                                        .toString()),
                        Arrays.asList("sum"),
                        new ArrayList<>());

        Double retrying_task =
                Double.parseDouble(
                        res.get(0)
                                .get(
                                        AllMetrics.ClusterManagerThrottlingValue
                                                .DATA_RETRYING_TASK_COUNT
                                                .toString())
                                .toString());
        db.remove();
        assertEquals(1.0, retrying_task.doubleValue(), 0);
    }
}