package org.zbus.mq.server;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.zbus.broker.ha.ServerEntry;
import org.zbus.broker.ha.TrackPub;
import org.zbus.kit.ConfigKit;
import org.zbus.kit.log.Logger;
import org.zbus.mq.Protocol;
import org.zbus.net.Client;
import org.zbus.net.Server;
import org.zbus.net.core.Dispatcher;
import org.zbus.net.core.Session;
import org.zbus.proxy.thrift.ThriftCodec;

/* loaded from: input_file:org/zbus/mq/server/MqServer.class */
public class MqServer extends Server {
    private static final Logger log = Logger.getLogger((Class<?>) MqServer.class);
    private final Map<String, Session> sessionTable = new ConcurrentHashMap();
    private final Map<String, AbstractMQ> mqTable = new ConcurrentHashMap();
    private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
    private long cleanInterval = 3000;
    private long trackInterval = 5000;
    private MqServerConfig config;
    private String registerToken;
    private MqAdaptor defaultMqAdaptor;
    private TrackPub trackPub;

    public MqServer(MqServerConfig mqServerConfig) {
        this.registerToken = "";
        this.config = mqServerConfig;
        this.serverName = "MqServer";
        this.registerToken = mqServerConfig.registerToken;
        this.serverMainIpOrder = mqServerConfig.serverMainIpOrder;
        this.dispatcher = new Dispatcher();
        this.dispatcher.selectorCount(mqServerConfig.selectorCount);
        this.dispatcher.executorCount(mqServerConfig.executorCount);
        this.scheduledExecutor.scheduleAtFixedRate(new Runnable() { // from class: org.zbus.mq.server.MqServer.1
            @Override // java.lang.Runnable
            public void run() {
                Iterator it = MqServer.this.mqTable.entrySet().iterator();
                while (it.hasNext()) {
                    ((AbstractMQ) ((Map.Entry) it.next()).getValue()).cleanSession();
                }
            }
        }, 1000L, this.cleanInterval, TimeUnit.MILLISECONDS);
        registerDefaultMqAdaptor();
    }

    private void registerDefaultMqAdaptor() {
        if (this.defaultMqAdaptor != null) {
            return;
        }
        this.defaultMqAdaptor = new MqAdaptor(this);
        this.defaultMqAdaptor.setVerbose(this.config.verbose);
        this.defaultMqAdaptor.loadMQ(this.config.storePath);
        registerAdaptor(this.config.getServerAddress(), this.defaultMqAdaptor, "HttpExt");
    }

    @Override // org.zbus.net.Server
    public void start() throws IOException {
        log.info("MqServer starting ...");
        super.start();
        if (this.config.trackServerList != null) {
            log.info("Running at HA mode, connect to TrackServers");
            setupTracker(this.config.trackServerList, this.dispatcher);
        }
        log.info("MqServer started successfully");
    }

    @Override // org.zbus.net.Server, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.defaultMqAdaptor != null) {
            this.defaultMqAdaptor.close();
        }
        if (this.trackPub != null) {
            this.trackPub.close();
        }
        this.scheduledExecutor.shutdown();
        super.close();
        if (this.dispatcher != null) {
            this.dispatcher.close();
        }
    }

    public void setupTracker(String str, Dispatcher dispatcher) {
        this.trackPub = new TrackPub(str, dispatcher);
        this.trackPub.onConnected(new Client.ConnectedHandler() { // from class: org.zbus.mq.server.MqServer.2
            @Override // org.zbus.net.Client.ConnectedHandler
            public void onConnected(Session session) throws IOException {
                MqServer.this.trackPub.pubServerJoin(MqServer.this.serverAddr);
                Iterator it = MqServer.this.mqTable.values().iterator();
                while (it.hasNext()) {
                    MqServer.this.pubEntryUpdate((AbstractMQ) it.next());
                }
            }
        });
        this.trackPub.start();
        this.scheduledExecutor.scheduleAtFixedRate(new Runnable() { // from class: org.zbus.mq.server.MqServer.3
            @Override // java.lang.Runnable
            public void run() {
                Iterator it = MqServer.this.mqTable.values().iterator();
                while (it.hasNext()) {
                    MqServer.this.pubEntryUpdate((AbstractMQ) it.next());
                }
            }
        }, 2000L, this.trackInterval, TimeUnit.MILLISECONDS);
    }

    public void pubEntryUpdate(AbstractMQ abstractMQ) {
        if (this.trackPub == null) {
            return;
        }
        Protocol.MqInfo mqInfo = abstractMQ.getMqInfo();
        ServerEntry serverEntry = new ServerEntry();
        serverEntry.entryId = mqInfo.name;
        serverEntry.serverAddr = this.serverAddr;
        serverEntry.consumerCount = mqInfo.consumerCount;
        serverEntry.mode = mqInfo.mode;
        serverEntry.unconsumedMsgCount = mqInfo.unconsumedMsgCount;
        serverEntry.lastUpdateTime = abstractMQ.lastUpdateTime;
        this.trackPub.pubEntryUpdate(serverEntry);
    }

    public String getRegisterToken() {
        return this.registerToken;
    }

    public Map<String, AbstractMQ> getMqTable() {
        return this.mqTable;
    }

    public Map<String, Session> getSessionTable() {
        return this.sessionTable;
    }

    public static void main(String[] strArr) throws Exception {
        MqServerConfig mqServerConfig = new MqServerConfig();
        mqServerConfig.serverHost = ConfigKit.option(strArr, "-h", "0.0.0.0");
        mqServerConfig.serverPort = ConfigKit.option(strArr, "-p", 15555);
        mqServerConfig.selectorCount = ConfigKit.option(strArr, "-selector", 0);
        mqServerConfig.executorCount = ConfigKit.option(strArr, "-executor", 64);
        mqServerConfig.verbose = ConfigKit.option(strArr, "-verbose", false);
        mqServerConfig.storePath = ConfigKit.option(strArr, "-store", "store");
        mqServerConfig.trackServerList = ConfigKit.option(strArr, "-track", (String) null);
        mqServerConfig.thriftServer = ConfigKit.option(strArr, "-thrift", (String) null);
        mqServerConfig.serverMainIpOrder = ConfigKit.option(strArr, "-ipOrder", (String) null);
        MqServer mqServer = new MqServer(mqServerConfig);
        if (mqServerConfig.thriftServer != null) {
            MqAdaptor mqAdaptor = new MqAdaptor(mqServer);
            mqAdaptor.codec(new ThriftCodec());
            mqServer.registerAdaptor(mqServerConfig.thriftServer, mqAdaptor, "Thrift");
        }
        mqServer.start();
        Runtime.getRuntime().addShutdownHook(new Thread() { // from class: org.zbus.mq.server.MqServer.4
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    MqServer.this.close();
                    MqServer.log.info("MqServer shutdown completed");
                } catch (IOException e) {
                    MqServer.log.error(e.getMessage(), e);
                }
            }
        });
    }
}
