package org.apache.kafka.streams.state.internals;

import java.util.Map;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.metrics.Sensors;

/* loaded from: input_file:BOOT-INF/lib/kafka-streams-2.3.1.jar:org/apache/kafka/streams/state/internals/MeteredWindowStore.class */
public class MeteredWindowStore<K, V> extends WrappedStateStore<WindowStore<Bytes, byte[]>, Windowed<K>, V> implements WindowStore<K, V> {
    private final long windowSizeMs;
    private final String metricScope;
    private final Time time;
    final Serde<K> keySerde;
    final Serde<V> valueSerde;
    StateSerdes<K, V> serdes;
    private StreamsMetricsImpl metrics;
    private Sensor putTime;
    private Sensor fetchTime;
    private Sensor flushTime;
    private ProcessorContext context;
    private String taskName;

    /* JADX INFO: Access modifiers changed from: package-private */
    public MeteredWindowStore(WindowStore<Bytes, byte[]> windowStore, long j, String str, Time time, Serde<K> serde, Serde<V> serde2) {
        super(windowStore);
        this.windowSizeMs = j;
        this.metricScope = str;
        this.time = time;
        this.keySerde = serde;
        this.valueSerde = serde2;
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore, org.apache.kafka.streams.processor.StateStore
    public void init(ProcessorContext processorContext, StateStore stateStore) {
        this.context = processorContext;
        initStoreSerde(processorContext);
        this.metrics = (StreamsMetricsImpl) processorContext.metrics();
        this.taskName = processorContext.taskId().toString();
        String str = "stream-" + this.metricScope + "-metrics";
        Map<String, String> tagMap = this.metrics.tagMap("task-id", this.taskName, this.metricScope + "-id", "all");
        Map<String, String> tagMap2 = this.metrics.tagMap("task-id", this.taskName, this.metricScope + "-id", name());
        this.putTime = Sensors.createTaskAndStoreLatencyAndThroughputSensors(Sensor.RecordingLevel.DEBUG, "put", this.metrics, str, this.taskName, name(), tagMap, tagMap2);
        this.fetchTime = Sensors.createTaskAndStoreLatencyAndThroughputSensors(Sensor.RecordingLevel.DEBUG, "fetch", this.metrics, str, this.taskName, name(), tagMap, tagMap2);
        this.flushTime = Sensors.createTaskAndStoreLatencyAndThroughputSensors(Sensor.RecordingLevel.DEBUG, "flush", this.metrics, str, this.taskName, name(), tagMap, tagMap2);
        Sensor createTaskAndStoreLatencyAndThroughputSensors = Sensors.createTaskAndStoreLatencyAndThroughputSensors(Sensor.RecordingLevel.DEBUG, "restore", this.metrics, str, this.taskName, name(), tagMap, tagMap2);
        long nanoseconds = this.time.nanoseconds();
        try {
            super.init(processorContext, stateStore);
            this.metrics.recordLatency(createTaskAndStoreLatencyAndThroughputSensors, nanoseconds, this.time.nanoseconds());
        } catch (Throwable th) {
            this.metrics.recordLatency(createTaskAndStoreLatencyAndThroughputSensors, nanoseconds, this.time.nanoseconds());
            throw th;
        }
    }

    void initStoreSerde(ProcessorContext processorContext) {
        this.serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(processorContext.applicationId(), name()), this.keySerde == null ? processorContext.keySerde() : this.keySerde, this.valueSerde == null ? processorContext.valueSerde() : this.valueSerde);
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore, org.apache.kafka.streams.state.internals.CachedStateStore
    public boolean setFlushListener(CacheFlushListener<Windowed<K>, V> cacheFlushListener, boolean z) {
        WindowStore<Bytes, byte[]> wrapped = wrapped();
        if (wrapped instanceof CachedStateStore) {
            return ((CachedStateStore) wrapped).setFlushListener((bArr, bArr2, bArr3, j) -> {
                cacheFlushListener.apply(WindowKeySchema.fromStoreKey(bArr, this.windowSizeMs, this.serdes.keyDeserializer(), this.serdes.topic()), bArr2 != null ? this.serdes.valueFrom(bArr2) : null, bArr3 != null ? this.serdes.valueFrom(bArr3) : null, j);
            }, z);
        }
        return false;
    }

    @Override // org.apache.kafka.streams.state.WindowStore
    public void put(K k, V v) {
        put(k, v, this.context.timestamp());
    }

    @Override // org.apache.kafka.streams.state.WindowStore
    public void put(K k, V v, long j) {
        long nanoseconds = this.time.nanoseconds();
        try {
            try {
                wrapped().put(keyBytes(k), this.serdes.rawValue(v), j);
                this.metrics.recordLatency(this.putTime, nanoseconds, this.time.nanoseconds());
            } catch (ProcessorStateException e) {
                throw new ProcessorStateException(String.format(e.getMessage(), k, v), e);
            }
        } catch (Throwable th) {
            this.metrics.recordLatency(this.putTime, nanoseconds, this.time.nanoseconds());
            throw th;
        }
    }

    @Override // org.apache.kafka.streams.state.ReadOnlyWindowStore
    public V fetch(K k, long j) {
        long nanoseconds = this.time.nanoseconds();
        try {
            byte[] fetch = wrapped().fetch(keyBytes(k), j);
            if (fetch == null) {
                return null;
            }
            V valueFrom = this.serdes.valueFrom(fetch);
            this.metrics.recordLatency(this.fetchTime, nanoseconds, this.time.nanoseconds());
            return valueFrom;
        } finally {
            this.metrics.recordLatency(this.fetchTime, nanoseconds, this.time.nanoseconds());
        }
    }

    @Override // org.apache.kafka.streams.state.WindowStore, org.apache.kafka.streams.state.ReadOnlyWindowStore
    public WindowStoreIterator<V> fetch(K k, long j, long j2) {
        return new MeteredWindowStoreIterator(wrapped().fetch((WindowStore<Bytes, byte[]>) keyBytes(k), j, j2), this.fetchTime, this.metrics, this.serdes, this.time);
    }

    @Override // org.apache.kafka.streams.state.WindowStore, org.apache.kafka.streams.state.ReadOnlyWindowStore
    public KeyValueIterator<Windowed<K>, V> fetch(K k, K k2, long j, long j2) {
        return new MeteredWindowedKeyValueIterator(wrapped().fetch(keyBytes(k), keyBytes(k2), j, j2), this.fetchTime, this.metrics, this.serdes, this.time);
    }

    @Override // org.apache.kafka.streams.state.WindowStore, org.apache.kafka.streams.state.ReadOnlyWindowStore
    public KeyValueIterator<Windowed<K>, V> fetchAll(long j, long j2) {
        return new MeteredWindowedKeyValueIterator(wrapped().fetchAll(j, j2), this.fetchTime, this.metrics, this.serdes, this.time);
    }

    @Override // org.apache.kafka.streams.state.ReadOnlyWindowStore
    public KeyValueIterator<Windowed<K>, V> all() {
        return new MeteredWindowedKeyValueIterator(wrapped().all(), this.fetchTime, this.metrics, this.serdes, this.time);
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore, org.apache.kafka.streams.processor.StateStore
    public void flush() {
        long nanoseconds = this.time.nanoseconds();
        try {
            super.flush();
        } finally {
            this.metrics.recordLatency(this.flushTime, nanoseconds, this.time.nanoseconds());
        }
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore, org.apache.kafka.streams.processor.StateStore
    public void close() {
        super.close();
        this.metrics.removeAllStoreLevelSensors(this.taskName, name());
    }

    private Bytes keyBytes(K k) {
        return Bytes.wrap(this.serdes.rawKey(k));
    }
}
