package org.apache.rocketmq.broker.longpolling;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.SystemClock;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.store.DefaultMessageFilter;
import org.apache.rocketmq.store.MessageFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/rocketmq-broker-4.0.0-incubating.jar:org/apache/rocketmq/broker/longpolling/PullRequestHoldService.class */
public class PullRequestHoldService extends ServiceThread {
    private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
    private static final String TOPIC_QUEUEID_SEPARATOR = "@";
    private final BrokerController brokerController;
    private final SystemClock systemClock = new SystemClock();
    private final MessageFilter messageFilter = new DefaultMessageFilter();
    private ConcurrentHashMap<String, ManyPullRequest> pullRequestTable = new ConcurrentHashMap<>(1024);

    public PullRequestHoldService(BrokerController brokerController) {
        this.brokerController = brokerController;
    }

    public void suspendPullRequest(String str, int i, PullRequest pullRequest) {
        String buildKey = buildKey(str, i);
        ManyPullRequest manyPullRequest = this.pullRequestTable.get(buildKey);
        if (null == manyPullRequest) {
            manyPullRequest = new ManyPullRequest();
            ManyPullRequest putIfAbsent = this.pullRequestTable.putIfAbsent(buildKey, manyPullRequest);
            if (putIfAbsent != null) {
                manyPullRequest = putIfAbsent;
            }
        }
        manyPullRequest.addPullRequest(pullRequest);
    }

    private String buildKey(String str, int i) {
        return str + TOPIC_QUEUEID_SEPARATOR + i;
    }

    @Override // java.lang.Runnable
    public void run() {
        log.info("{} service started", getServiceName());
        while (!isStopped()) {
            try {
                if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                    waitForRunning(5000L);
                } else {
                    waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
                }
                long now = this.systemClock.now();
                checkHoldRequest();
                long now2 = this.systemClock.now() - now;
                if (now2 > 5000) {
                    log.info("[NOTIFYME] check hold request cost {} ms.", Long.valueOf(now2));
                }
            } catch (Throwable th) {
                log.warn(getServiceName() + " service has exception. ", th);
            }
        }
        log.info("{} service end", getServiceName());
    }

    @Override // org.apache.rocketmq.common.ServiceThread
    public String getServiceName() {
        return PullRequestHoldService.class.getSimpleName();
    }

    private void checkHoldRequest() {
        Iterator<String> it = this.pullRequestTable.keySet().iterator();
        while (it.hasNext()) {
            String[] split = it.next().split(TOPIC_QUEUEID_SEPARATOR);
            if (2 == split.length) {
                String str = split[0];
                int parseInt = Integer.parseInt(split[1]);
                try {
                    notifyMessageArriving(str, parseInt, this.brokerController.getMessageStore().getMaxOffsetInQuque(str, parseInt));
                } catch (Throwable th) {
                    log.error("check hold request failed. topic={}, queueId={}", str, Integer.valueOf(parseInt), th);
                }
            }
        }
    }

    public void notifyMessageArriving(String str, int i, long j) {
        notifyMessageArriving(str, i, j, null);
    }

    public void notifyMessageArriving(String str, int i, long j, Long l) {
        List<PullRequest> cloneListAndClear;
        ManyPullRequest manyPullRequest = this.pullRequestTable.get(buildKey(str, i));
        if (manyPullRequest == null || (cloneListAndClear = manyPullRequest.cloneListAndClear()) == null) {
            return;
        }
        ArrayList arrayList = new ArrayList();
        for (PullRequest pullRequest : cloneListAndClear) {
            long j2 = j;
            if (j2 <= pullRequest.getPullFromThisOffset()) {
                j2 = this.brokerController.getMessageStore().getMaxOffsetInQuque(str, i);
            }
            if (j2 > pullRequest.getPullFromThisOffset() && this.messageFilter.isMessageMatched(pullRequest.getSubscriptionData(), l)) {
                try {
                    this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(pullRequest.getClientChannel(), pullRequest.getRequestCommand());
                } catch (Throwable th) {
                    log.error("execute request when wakeup failed.", th);
                }
            } else if (System.currentTimeMillis() >= pullRequest.getSuspendTimestamp() + pullRequest.getTimeoutMillis()) {
                try {
                    this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(pullRequest.getClientChannel(), pullRequest.getRequestCommand());
                } catch (Throwable th2) {
                    log.error("execute request when wakeup failed.", th2);
                }
            } else {
                arrayList.add(pullRequest);
            }
        }
        if (arrayList.isEmpty()) {
            return;
        }
        manyPullRequest.addPullRequest(arrayList);
    }
}
