package org.apache.rocketmq.console.service.impl;

import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.RollbackStats;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.Connection;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.console.aspect.admin.annotation.MultiMQAdminCmdMethod;
import org.apache.rocketmq.console.model.ConsumerGroupRollBackStat;
import org.apache.rocketmq.console.model.GroupConsumeInfo;
import org.apache.rocketmq.console.model.QueueStatInfo;
import org.apache.rocketmq.console.model.TopicConsumerInfo;
import org.apache.rocketmq.console.model.request.ConsumerConfigInfo;
import org.apache.rocketmq.console.model.request.DeleteSubGroupRequest;
import org.apache.rocketmq.console.model.request.ResetOffsetRequest;
import org.apache.rocketmq.console.service.AbstractCommonService;
import org.apache.rocketmq.console.service.ConsumerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:BOOT-INF/classes/org/apache/rocketmq/console/service/impl/ConsumerServiceImpl.class */
public class ConsumerServiceImpl extends AbstractCommonService implements ConsumerService {
    private Logger logger = LoggerFactory.getLogger((Class<?>) ConsumerServiceImpl.class);

    @Override // org.apache.rocketmq.console.service.ConsumerService
    @MultiMQAdminCmdMethod
    public List<GroupConsumeInfo> queryGroupList() {
        HashSet newHashSet = Sets.newHashSet();
        try {
            Iterator<BrokerData> it = this.mqAdminExt.examineBrokerClusterInfo().getBrokerAddrTable().values().iterator();
            while (it.hasNext()) {
                newHashSet.addAll(this.mqAdminExt.getAllSubscriptionGroup(it.next().selectBrokerAddr(), 3000L).getSubscriptionGroupTable().keySet());
            }
            ArrayList newArrayList = Lists.newArrayList();
            Iterator it2 = newHashSet.iterator();
            while (it2.hasNext()) {
                newArrayList.add(queryGroup((String) it2.next()));
            }
            Collections.sort(newArrayList);
            return newArrayList;
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    @Override // org.apache.rocketmq.console.service.ConsumerService
    @MultiMQAdminCmdMethod
    public GroupConsumeInfo queryGroup(String str) {
        GroupConsumeInfo groupConsumeInfo = new GroupConsumeInfo();
        ConsumeStats consumeStats = null;
        try {
            try {
                consumeStats = this.mqAdminExt.examineConsumeStats(str);
            } catch (Exception e) {
                this.logger.warn("examineConsumeStats exception, " + str, (Throwable) e);
            }
            ConsumerConnection consumerConnection = null;
            try {
                consumerConnection = this.mqAdminExt.examineConsumerConnectionInfo(str);
            } catch (Exception e2) {
                this.logger.warn("examineConsumerConnectionInfo exception, " + str, (Throwable) e2);
            }
            groupConsumeInfo.setGroup(str);
            if (consumeStats != null) {
                groupConsumeInfo.setConsumeTps((int) consumeStats.getConsumeTps());
                groupConsumeInfo.setDiffTotal(consumeStats.computeTotalDiff());
            }
            if (consumerConnection != null) {
                groupConsumeInfo.setCount(consumerConnection.getConnectionSet().size());
                groupConsumeInfo.setMessageModel(consumerConnection.getMessageModel());
                groupConsumeInfo.setConsumeType(consumerConnection.getConsumeType());
                groupConsumeInfo.setVersion(MQVersion.getVersionDesc(consumerConnection.computeMinVersion()));
            }
        } catch (Exception e3) {
            this.logger.warn("examineConsumeStats or examineConsumerConnectionInfo exception, " + str, (Throwable) e3);
        }
        return groupConsumeInfo;
    }

    @Override // org.apache.rocketmq.console.service.ConsumerService
    public List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String str) {
        return queryConsumeStatsList(null, str);
    }

    @Override // org.apache.rocketmq.console.service.ConsumerService
    @MultiMQAdminCmdMethod
    public List<TopicConsumerInfo> queryConsumeStatsList(final String str, String str2) {
        try {
            ConsumeStats examineConsumeStats = this.mqAdminExt.examineConsumeStats(str2, str);
            ArrayList<MessageQueue> newArrayList = Lists.newArrayList(Iterables.filter(examineConsumeStats.getOffsetTable().keySet(), new Predicate<MessageQueue>() { // from class: org.apache.rocketmq.console.service.impl.ConsumerServiceImpl.1
                @Override // com.google.common.base.Predicate
                public boolean apply(MessageQueue messageQueue) {
                    return StringUtils.isBlank(str) || messageQueue.getTopic().equals(str);
                }
            }));
            Collections.sort(newArrayList);
            ArrayList newArrayList2 = Lists.newArrayList();
            TopicConsumerInfo topicConsumerInfo = null;
            Map<MessageQueue, String> clientConnection = getClientConnection(str2);
            for (MessageQueue messageQueue : newArrayList) {
                if (topicConsumerInfo == null || !StringUtils.equals(messageQueue.getTopic(), topicConsumerInfo.getTopic())) {
                    topicConsumerInfo = new TopicConsumerInfo(messageQueue.getTopic());
                    newArrayList2.add(topicConsumerInfo);
                }
                QueueStatInfo fromOffsetTableEntry = QueueStatInfo.fromOffsetTableEntry(messageQueue, examineConsumeStats.getOffsetTable().get(messageQueue));
                fromOffsetTableEntry.setClientInfo(clientConnection.get(messageQueue));
                topicConsumerInfo.appendQueueStatInfo(fromOffsetTableEntry);
            }
            return newArrayList2;
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    private Map<MessageQueue, String> getClientConnection(String str) {
        HashMap newHashMap = Maps.newHashMap();
        try {
            Iterator<Connection> it = this.mqAdminExt.examineConsumerConnectionInfo(str).getConnectionSet().iterator();
            while (it.hasNext()) {
                String clientId = it.next().getClientId();
                Iterator<MessageQueue> it2 = this.mqAdminExt.getConsumerRunningInfo(str, clientId, false).getMqTable().keySet().iterator();
                while (it2.hasNext()) {
                    newHashMap.put(it2.next(), clientId);
                }
            }
        } catch (Exception e) {
            this.logger.error("op=getClientConnection_error", (Throwable) e);
        }
        return newHashMap;
    }

    @Override // org.apache.rocketmq.console.service.ConsumerService
    @MultiMQAdminCmdMethod
    public Map<String, TopicConsumerInfo> queryConsumeStatsListByTopicName(String str) {
        HashMap newHashMap = Maps.newHashMap();
        try {
            Iterator<String> it = this.mqAdminExt.queryTopicConsumeByWho(str).getGroupList().iterator();
            while (it.hasNext()) {
                String next = it.next();
                List<TopicConsumerInfo> list = null;
                try {
                    list = queryConsumeStatsList(str, next);
                } catch (Exception e) {
                }
                newHashMap.put(next, CollectionUtils.isEmpty(list) ? new TopicConsumerInfo(str) : list.get(0));
            }
            return newHashMap;
        } catch (Exception e2) {
            throw Throwables.propagate(e2);
        }
    }

    @Override // org.apache.rocketmq.console.service.ConsumerService
    @MultiMQAdminCmdMethod
    public Map<String, ConsumerGroupRollBackStat> resetOffset(ResetOffsetRequest resetOffsetRequest) {
        HashMap newHashMap = Maps.newHashMap();
        for (String str : resetOffsetRequest.getConsumerGroupList()) {
            try {
                Map<MessageQueue, Long> resetOffsetByTimestamp = this.mqAdminExt.resetOffsetByTimestamp(resetOffsetRequest.getTopic(), str, resetOffsetRequest.getResetTime(), resetOffsetRequest.isForce());
                ConsumerGroupRollBackStat consumerGroupRollBackStat = new ConsumerGroupRollBackStat(true);
                List<RollbackStats> rollbackStatsList = consumerGroupRollBackStat.getRollbackStatsList();
                for (Map.Entry<MessageQueue, Long> entry : resetOffsetByTimestamp.entrySet()) {
                    RollbackStats rollbackStats = new RollbackStats();
                    rollbackStats.setRollbackOffset(entry.getValue().longValue());
                    rollbackStats.setQueueId(entry.getKey().getQueueId());
                    rollbackStats.setBrokerName(entry.getKey().getBrokerName());
                    rollbackStatsList.add(rollbackStats);
                }
                newHashMap.put(str, consumerGroupRollBackStat);
            } catch (MQClientException e) {
                if (206 == e.getResponseCode()) {
                    try {
                        ConsumerGroupRollBackStat consumerGroupRollBackStat2 = new ConsumerGroupRollBackStat(true);
                        consumerGroupRollBackStat2.setRollbackStatsList(this.mqAdminExt.resetOffsetByTimestampOld(str, resetOffsetRequest.getTopic(), resetOffsetRequest.getResetTime(), true));
                        newHashMap.put(str, consumerGroupRollBackStat2);
                    } catch (Exception e2) {
                        this.logger.error("op=resetOffset_which_not_online_error", (Throwable) e2);
                        newHashMap.put(str, new ConsumerGroupRollBackStat(false, e.getMessage()));
                    }
                } else {
                    this.logger.error("op=resetOffset_error", (Throwable) e);
                }
                newHashMap.put(str, new ConsumerGroupRollBackStat(false, e.getMessage()));
            } catch (Exception e3) {
                this.logger.error("op=resetOffset_error", (Throwable) e3);
                newHashMap.put(str, new ConsumerGroupRollBackStat(false, e3.getMessage()));
            }
        }
        return newHashMap;
    }

    @Override // org.apache.rocketmq.console.service.ConsumerService
    @MultiMQAdminCmdMethod
    public List<ConsumerConfigInfo> examineSubscriptionGroupConfig(String str) {
        ArrayList newArrayList = Lists.newArrayList();
        try {
            ClusterInfo examineBrokerClusterInfo = this.mqAdminExt.examineBrokerClusterInfo();
            for (String str2 : examineBrokerClusterInfo.getBrokerAddrTable().keySet()) {
                SubscriptionGroupConfig examineSubscriptionGroupConfig = this.mqAdminExt.examineSubscriptionGroupConfig(examineBrokerClusterInfo.getBrokerAddrTable().get(str2).selectBrokerAddr(), str);
                if (examineSubscriptionGroupConfig != null) {
                    newArrayList.add(new ConsumerConfigInfo(Lists.newArrayList(str2), examineSubscriptionGroupConfig));
                }
            }
            return newArrayList;
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    @Override // org.apache.rocketmq.console.service.ConsumerService
    @MultiMQAdminCmdMethod
    public boolean deleteSubGroup(DeleteSubGroupRequest deleteSubGroupRequest) {
        try {
            ClusterInfo examineBrokerClusterInfo = this.mqAdminExt.examineBrokerClusterInfo();
            for (String str : deleteSubGroupRequest.getBrokerNameList()) {
                this.logger.info("addr={} groupName={}", examineBrokerClusterInfo.getBrokerAddrTable().get(str).selectBrokerAddr(), deleteSubGroupRequest.getGroupName());
                this.mqAdminExt.deleteSubscriptionGroup(examineBrokerClusterInfo.getBrokerAddrTable().get(str).selectBrokerAddr(), deleteSubGroupRequest.getGroupName());
            }
            return true;
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    @Override // org.apache.rocketmq.console.service.ConsumerService
    public boolean createAndUpdateSubscriptionGroupConfig(ConsumerConfigInfo consumerConfigInfo) {
        try {
            ClusterInfo examineBrokerClusterInfo = this.mqAdminExt.examineBrokerClusterInfo();
            Iterator<String> it = changeToBrokerNameSet(examineBrokerClusterInfo.getClusterAddrTable(), consumerConfigInfo.getClusterNameList(), consumerConfigInfo.getBrokerNameList()).iterator();
            while (it.hasNext()) {
                this.mqAdminExt.createAndUpdateSubscriptionGroupConfig(examineBrokerClusterInfo.getBrokerAddrTable().get(it.next()).selectBrokerAddr(), consumerConfigInfo.getSubscriptionGroupConfig());
            }
            return true;
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    @Override // org.apache.rocketmq.console.service.ConsumerService
    @MultiMQAdminCmdMethod
    public Set<String> fetchBrokerNameSetBySubscriptionGroup(String str) {
        HashSet newHashSet = Sets.newHashSet();
        try {
            Iterator<ConsumerConfigInfo> it = examineSubscriptionGroupConfig(str).iterator();
            while (it.hasNext()) {
                newHashSet.addAll(it.next().getBrokerNameList());
            }
            return newHashSet;
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    @Override // org.apache.rocketmq.console.service.ConsumerService
    public ConsumerConnection getConsumerConnection(String str) {
        try {
            return this.mqAdminExt.examineConsumerConnectionInfo(str);
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    @Override // org.apache.rocketmq.console.service.ConsumerService
    public ConsumerRunningInfo getConsumerRunningInfo(String str, String str2, boolean z) {
        try {
            return this.mqAdminExt.getConsumerRunningInfo(str, str2, z);
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }
}
