package org.apache.kafka.streams.processor.internals.metrics;

import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Total;
import org.apache.kafka.streams.StreamsMetrics;

/* loaded from: input_file:BOOT-INF/lib/kafka-streams-2.3.1.jar:org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.class */
public class StreamsMetricsImpl implements StreamsMetrics {
    private final Metrics metrics;
    private final Map<Sensor, Sensor> parentSensors;
    private final Sensor skippedRecordsSensor;
    private final String threadName;
    private final Deque<String> threadLevelSensors = new LinkedList();
    private final Map<String, Deque<String>> taskLevelSensors = new HashMap();
    private final Map<String, Deque<String>> nodeLevelSensors = new HashMap();
    private final Map<String, Deque<String>> cacheLevelSensors = new HashMap();
    private final Map<String, Deque<String>> storeLevelSensors = new HashMap();
    private static final String SENSOR_PREFIX_DELIMITER = ".";
    private static final String SENSOR_NAME_DELIMITER = ".s.";
    public static final String PROCESSOR_NODE_METRICS_GROUP = "stream-processor-node-metrics";
    public static final String PROCESSOR_NODE_ID_TAG = "processor-node-id";
    public static final String EXPIRED_WINDOW_RECORD_DROP = "expired-window-record-drop";
    public static final String LATE_RECORD_DROP = "late-record-drop";

    public StreamsMetricsImpl(Metrics metrics, String str) {
        Objects.requireNonNull(metrics, "Metrics cannot be null");
        this.threadName = str;
        this.metrics = metrics;
        this.parentSensors = new HashMap();
        this.skippedRecordsSensor = threadLevelSensor("skipped-records", Sensor.RecordingLevel.INFO, new Sensor[0]);
        this.skippedRecordsSensor.add(new MetricName("skipped-records-rate", "stream-metrics", "The average per-second number of skipped records", tagMap(new String[0])), new Rate(TimeUnit.SECONDS, new Count()));
        this.skippedRecordsSensor.add(new MetricName("skipped-records-total", "stream-metrics", "The total number of skipped records", tagMap(new String[0])), new Total());
    }

    public final Sensor threadLevelSensor(String str, Sensor.RecordingLevel recordingLevel, Sensor... sensorArr) {
        Sensor sensor;
        synchronized (this.threadLevelSensors) {
            String str2 = threadSensorPrefix() + SENSOR_NAME_DELIMITER + str;
            sensor = this.metrics.sensor(str2, recordingLevel, sensorArr);
            this.threadLevelSensors.push(str2);
        }
        return sensor;
    }

    public final void removeAllThreadLevelSensors() {
        synchronized (this.threadLevelSensors) {
            while (!this.threadLevelSensors.isEmpty()) {
                this.metrics.removeSensor(this.threadLevelSensors.pop());
            }
        }
    }

    private String threadSensorPrefix() {
        return "internal." + this.threadName;
    }

    public final Sensor taskLevelSensor(String str, String str2, Sensor.RecordingLevel recordingLevel, Sensor... sensorArr) {
        Sensor sensor;
        String taskSensorPrefix = taskSensorPrefix(str);
        synchronized (this.taskLevelSensors) {
            if (!this.taskLevelSensors.containsKey(taskSensorPrefix)) {
                this.taskLevelSensors.put(taskSensorPrefix, new LinkedList());
            }
            String str3 = taskSensorPrefix + SENSOR_NAME_DELIMITER + str2;
            sensor = this.metrics.sensor(str3, recordingLevel, sensorArr);
            this.taskLevelSensors.get(taskSensorPrefix).push(str3);
        }
        return sensor;
    }

    public final void removeAllTaskLevelSensors(String str) {
        String taskSensorPrefix = taskSensorPrefix(str);
        synchronized (this.taskLevelSensors) {
            Deque<String> remove = this.taskLevelSensors.remove(taskSensorPrefix);
            while (remove != null && !remove.isEmpty()) {
                this.metrics.removeSensor(remove.pop());
            }
        }
    }

    private String taskSensorPrefix(String str) {
        return threadSensorPrefix() + ".task." + str;
    }

    public Sensor nodeLevelSensor(String str, String str2, String str3, Sensor.RecordingLevel recordingLevel, Sensor... sensorArr) {
        Sensor sensor;
        String nodeSensorPrefix = nodeSensorPrefix(str, str2);
        synchronized (this.nodeLevelSensors) {
            if (!this.nodeLevelSensors.containsKey(nodeSensorPrefix)) {
                this.nodeLevelSensors.put(nodeSensorPrefix, new LinkedList());
            }
            String str4 = nodeSensorPrefix + SENSOR_NAME_DELIMITER + str3;
            sensor = this.metrics.sensor(str4, recordingLevel, sensorArr);
            this.nodeLevelSensors.get(nodeSensorPrefix).push(str4);
        }
        return sensor;
    }

    public final void removeAllNodeLevelSensors(String str, String str2) {
        String nodeSensorPrefix = nodeSensorPrefix(str, str2);
        synchronized (this.nodeLevelSensors) {
            Deque<String> remove = this.nodeLevelSensors.remove(nodeSensorPrefix);
            while (remove != null && !remove.isEmpty()) {
                this.metrics.removeSensor(remove.pop());
            }
        }
    }

    private String nodeSensorPrefix(String str, String str2) {
        return taskSensorPrefix(str) + ".node." + str2;
    }

    public final Sensor cacheLevelSensor(String str, String str2, String str3, Sensor.RecordingLevel recordingLevel, Sensor... sensorArr) {
        Sensor sensor;
        String cacheSensorPrefix = cacheSensorPrefix(str, str2);
        synchronized (this.cacheLevelSensors) {
            if (!this.cacheLevelSensors.containsKey(cacheSensorPrefix)) {
                this.cacheLevelSensors.put(cacheSensorPrefix, new LinkedList());
            }
            String str4 = cacheSensorPrefix + SENSOR_NAME_DELIMITER + str3;
            sensor = this.metrics.sensor(str4, recordingLevel, sensorArr);
            this.cacheLevelSensors.get(cacheSensorPrefix).push(str4);
        }
        return sensor;
    }

    public final void removeAllCacheLevelSensors(String str, String str2) {
        String cacheSensorPrefix = cacheSensorPrefix(str, str2);
        synchronized (this.cacheLevelSensors) {
            Deque<String> remove = this.cacheLevelSensors.remove(cacheSensorPrefix);
            while (remove != null && !remove.isEmpty()) {
                this.metrics.removeSensor(remove.pop());
            }
        }
    }

    private String cacheSensorPrefix(String str, String str2) {
        return taskSensorPrefix(str) + ".cache." + str2;
    }

    public final Sensor storeLevelSensor(String str, String str2, String str3, Sensor.RecordingLevel recordingLevel, Sensor... sensorArr) {
        Sensor sensor;
        String storeSensorPrefix = storeSensorPrefix(str, str2);
        synchronized (this.storeLevelSensors) {
            if (!this.storeLevelSensors.containsKey(storeSensorPrefix)) {
                this.storeLevelSensors.put(storeSensorPrefix, new LinkedList());
            }
            String str4 = storeSensorPrefix + SENSOR_NAME_DELIMITER + str3;
            sensor = this.metrics.sensor(str4, recordingLevel, sensorArr);
            this.storeLevelSensors.get(storeSensorPrefix).push(str4);
        }
        return sensor;
    }

    public final void removeAllStoreLevelSensors(String str, String str2) {
        String storeSensorPrefix = storeSensorPrefix(str, str2);
        synchronized (this.storeLevelSensors) {
            Deque<String> remove = this.storeLevelSensors.remove(storeSensorPrefix);
            while (remove != null && !remove.isEmpty()) {
                this.metrics.removeSensor(remove.pop());
            }
        }
    }

    private String storeSensorPrefix(String str, String str2) {
        return taskSensorPrefix(str) + ".store." + str2;
    }

    public final Sensor skippedRecordsSensor() {
        return this.skippedRecordsSensor;
    }

    @Override // org.apache.kafka.streams.StreamsMetrics
    public Sensor addSensor(String str, Sensor.RecordingLevel recordingLevel) {
        return this.metrics.sensor(str, recordingLevel);
    }

    @Override // org.apache.kafka.streams.StreamsMetrics
    public Sensor addSensor(String str, Sensor.RecordingLevel recordingLevel, Sensor... sensorArr) {
        return this.metrics.sensor(str, recordingLevel, sensorArr);
    }

    @Override // org.apache.kafka.streams.StreamsMetrics
    public Map<MetricName, ? extends Metric> metrics() {
        return Collections.unmodifiableMap(this.metrics.metrics());
    }

    @Override // org.apache.kafka.streams.StreamsMetrics
    public void recordLatency(Sensor sensor, long j, long j2) {
        sensor.record(j2 - j);
    }

    @Override // org.apache.kafka.streams.StreamsMetrics
    public void recordThroughput(Sensor sensor, long j) {
        sensor.record(j);
    }

    public final Map<String, String> tagMap(String... strArr) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("client-id", this.threadName);
        if (strArr != null) {
            if (strArr.length % 2 != 0) {
                throw new IllegalArgumentException("Tags needs to be specified in key-value pairs");
            }
            for (int i = 0; i < strArr.length; i += 2) {
                linkedHashMap.put(strArr[i], strArr[i + 1]);
            }
        }
        return linkedHashMap;
    }

    private Map<String, String> constructTags(String str, String str2, String... strArr) {
        String[] strArr2 = (String[]) Arrays.copyOf(strArr, strArr.length + 2);
        strArr2[strArr.length] = str + "-id";
        strArr2[strArr.length + 1] = str2;
        return tagMap(strArr2);
    }

    @Override // org.apache.kafka.streams.StreamsMetrics
    public Sensor addLatencyAndThroughputSensor(String str, String str2, String str3, Sensor.RecordingLevel recordingLevel, String... strArr) {
        String groupNameFromScope = groupNameFromScope(str);
        Map<String, String> constructTags = constructTags(str, str2, strArr);
        Map<String, String> constructTags2 = constructTags(str, "all", strArr);
        Sensor sensor = this.metrics.sensor(externalParentSensorName(str3), recordingLevel);
        addAvgMaxLatency(sensor, groupNameFromScope, constructTags2, str3);
        addInvocationRateAndCount(sensor, groupNameFromScope, constructTags2, str3);
        Sensor sensor2 = this.metrics.sensor(externalChildSensorName(str3, str2), recordingLevel, sensor);
        addAvgMaxLatency(sensor2, groupNameFromScope, constructTags, str3);
        addInvocationRateAndCount(sensor2, groupNameFromScope, constructTags, str3);
        this.parentSensors.put(sensor2, sensor);
        return sensor2;
    }

    @Override // org.apache.kafka.streams.StreamsMetrics
    public Sensor addThroughputSensor(String str, String str2, String str3, Sensor.RecordingLevel recordingLevel, String... strArr) {
        String groupNameFromScope = groupNameFromScope(str);
        Map<String, String> constructTags = constructTags(str, str2, strArr);
        Map<String, String> constructTags2 = constructTags(str, "all", strArr);
        Sensor sensor = this.metrics.sensor(externalParentSensorName(str3), recordingLevel);
        addInvocationRateAndCount(sensor, groupNameFromScope, constructTags2, str3);
        Sensor sensor2 = this.metrics.sensor(externalChildSensorName(str3, str2), recordingLevel, sensor);
        addInvocationRateAndCount(sensor2, groupNameFromScope, constructTags, str3);
        this.parentSensors.put(sensor2, sensor);
        return sensor2;
    }

    private String externalChildSensorName(String str, String str2) {
        return "external." + this.threadName + ".entity." + str2 + SENSOR_NAME_DELIMITER + str;
    }

    private String externalParentSensorName(String str) {
        return "external." + this.threadName + SENSOR_NAME_DELIMITER + str;
    }

    public static void addAvgMaxLatency(Sensor sensor, String str, Map<String, String> map, String str2) {
        sensor.add(new MetricName(str2 + "-latency-avg", str, "The average latency of " + str2 + " operation.", map), new Avg());
        sensor.add(new MetricName(str2 + "-latency-max", str, "The max latency of " + str2 + " operation.", map), new Max());
    }

    public static void addInvocationRateAndCount(Sensor sensor, String str, Map<String, String> map, String str2) {
        sensor.add(new MetricName(str2 + "-rate", str, "The average number of occurrence of " + str2 + " operation per second.", map), new Rate(TimeUnit.SECONDS, new Count()));
        sensor.add(new MetricName(str2 + "-total", str, "The total number of occurrence of " + str2 + " operations.", map), new CumulativeCount());
    }

    @Override // org.apache.kafka.streams.StreamsMetrics
    public void removeSensor(Sensor sensor) {
        Objects.requireNonNull(sensor, "Sensor is null");
        this.metrics.removeSensor(sensor.name());
        Sensor remove = this.parentSensors.remove(sensor);
        if (remove != null) {
            this.metrics.removeSensor(remove.name());
        }
    }

    Map<Sensor, Sensor> parentSensors() {
        return Collections.unmodifiableMap(this.parentSensors);
    }

    private static String groupNameFromScope(String str) {
        return "stream-" + str + "-metrics";
    }
}
