package org.zbus.mq.server;

import java.io.Closeable;
import java.io.IOException;
import java.util.AbstractQueue;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.zbus.kit.FileKit;
import org.zbus.kit.JsonKit;
import org.zbus.kit.log.Logger;
import org.zbus.mq.Protocol;
import org.zbus.mq.server.support.DiskQueuePool;
import org.zbus.mq.server.support.MessageDiskQueue;
import org.zbus.mq.server.support.MessageMemoryQueue;
import org.zbus.net.core.IoAdaptor;
import org.zbus.net.core.Session;
import org.zbus.net.http.Message;
import org.zbus.net.http.MessageCodec;

/* loaded from: input_file:org/zbus/mq/server/MqAdaptor.class */
public class MqAdaptor extends IoAdaptor implements Closeable {
    private static final Logger log = Logger.getLogger((Class<?>) MqAdaptor.class);
    private final Map<String, AbstractMQ> mqTable;
    private final Map<String, Session> sessionTable;
    private final MqServer mqServer;
    private String registerToken;
    private final Map<String, Message.MessageHandler> handlerMap = new ConcurrentHashMap();
    private boolean verbose = false;
    private Message.MessageHandler produceHandler = new Message.MessageHandler() { // from class: org.zbus.mq.server.MqAdaptor.1
        @Override // org.zbus.net.Client.MsgHandler
        public void handle(Message message, Session session) throws IOException {
            AbstractMQ findMQ = MqAdaptor.this.findMQ(message, session);
            if (findMQ == null) {
                return;
            }
            if (!MqAdaptor.this.auth(findMQ, message)) {
                ReplyKit.reply403(message, session);
                return;
            }
            boolean isAck = message.isAck();
            message.removeHead(Message.CMD);
            message.removeHead(Message.ACK);
            findMQ.produce(message, session);
            findMQ.lastUpdateTime = System.currentTimeMillis();
            if (isAck) {
                ReplyKit.reply200(message, session);
            }
        }
    };
    private Message.MessageHandler consumeHandler = new Message.MessageHandler() { // from class: org.zbus.mq.server.MqAdaptor.2
        @Override // org.zbus.net.Client.MsgHandler
        public void handle(Message message, Session session) throws IOException {
            AbstractMQ findMQ = MqAdaptor.this.findMQ(message, session);
            if (findMQ == null) {
                return;
            }
            if (!MqAdaptor.this.auth(findMQ, message)) {
                ReplyKit.reply403(message, session);
                return;
            }
            findMQ.consume(message, session);
            if (message.getMq().equals((String) session.attr(Message.MQ))) {
                return;
            }
            session.attr(Message.MQ, findMQ.getName());
            MqAdaptor.this.mqServer.pubEntryUpdate(findMQ);
        }
    };
    private Message.MessageHandler routeHandler = new Message.MessageHandler() { // from class: org.zbus.mq.server.MqAdaptor.3
        @Override // org.zbus.net.Client.MsgHandler
        public void handle(Message message, Session session) throws IOException {
            String recver = message.getRecver();
            if (recver == null) {
                return;
            }
            Session session2 = (Session) MqAdaptor.this.sessionTable.get(recver);
            if (session2 == null) {
                MqAdaptor.log.warn("Missing target %s", recver);
                return;
            }
            message.removeHead(Message.ACK);
            message.removeHead(Message.RECVER);
            message.removeHead(Message.CMD);
            try {
                session2.write(message);
            } catch (Exception e) {
                MqAdaptor.log.warn("Target(%s) write failed, Ignore", recver);
            }
        }
    };
    private Message.MessageHandler createMqHandler = new Message.MessageHandler() { // from class: org.zbus.mq.server.MqAdaptor.4
        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v59, types: [org.zbus.mq.server.PubSub] */
        @Override // org.zbus.net.Client.MsgHandler
        public void handle(Message message, Session session) throws IOException {
            if (!MqAdaptor.this.registerToken.equals(message.getHead("register_token", ""))) {
                message.setBody("registerToken unmatched");
                ReplyKit.reply403(message, session);
                return;
            }
            String trim = message.getHead("mq_name", "").trim();
            if ("".equals(trim)) {
                message.setBody("Missing mq_name");
                ReplyKit.reply400(message, session);
                return;
            }
            String trim2 = message.getHead("mq_mode", "").trim();
            if ("".equals(trim2)) {
                message.setBody("Missing mq_mode");
                ReplyKit.reply400(message, session);
                return;
            }
            try {
                int intValue = Integer.valueOf(trim2).intValue();
                String head = message.getHead("access_token", "");
                synchronized (MqAdaptor.this.mqTable) {
                    if (((AbstractMQ) MqAdaptor.this.mqTable.get(trim)) != null) {
                        ReplyKit.reply200(message, session);
                        return;
                    }
                    AbstractQueue messageMemoryQueue = (Protocol.MqMode.isEnabled(intValue, Protocol.MqMode.Memory) || Protocol.MqMode.isEnabled(intValue, Protocol.MqMode.RPC)) ? new MessageMemoryQueue() : new MessageDiskQueue(trim, intValue);
                    MQ pubSub = Protocol.MqMode.isEnabled(intValue, Protocol.MqMode.PubSub) ? new PubSub(trim, messageMemoryQueue) : new MQ(trim, messageMemoryQueue);
                    pubSub.setMode(intValue);
                    pubSub.creator = session.getRemoteAddress();
                    pubSub.setAccessToken(head);
                    MqAdaptor.log.info("MQ Created: %s", pubSub);
                    MqAdaptor.this.mqTable.put(trim, pubSub);
                    ReplyKit.reply200(message, session);
                    MqAdaptor.this.mqServer.pubEntryUpdate(pubSub);
                }
            } catch (Exception e) {
                message.setBody("mq_mode invalid");
                ReplyKit.reply400(message, session);
            }
        }
    };
    private Message.MessageHandler testHandler = new Message.MessageHandler() { // from class: org.zbus.mq.server.MqAdaptor.5
        @Override // org.zbus.net.Client.MsgHandler
        public void handle(Message message, Session session) throws IOException {
            Message message2 = new Message();
            message2.setResponseStatus(200);
            message2.setId(message.getId());
            message2.setBody("OK");
            session.write(message2);
        }
    };
    private Message.MessageHandler homeHandler = new Message.MessageHandler() { // from class: org.zbus.mq.server.MqAdaptor.6
        @Override // org.zbus.net.Client.MsgHandler
        public void handle(Message message, Session session) throws IOException {
            Message message2 = new Message();
            message2.setResponseStatus("200");
            message2.setHead(Message.CONTENT_TYPE, "text/html");
            String loadFileContent = FileKit.loadFileContent("zbus.htm");
            if ("".equals(loadFileContent)) {
                loadFileContent = "<strong>zbus.htm file missing</strong>";
            }
            message2.setBody(loadFileContent);
            session.write(message2);
        }
    };
    private Message.MessageHandler jqueryHandler = new Message.MessageHandler() { // from class: org.zbus.mq.server.MqAdaptor.7
        @Override // org.zbus.net.Client.MsgHandler
        public void handle(Message message, Session session) throws IOException {
            Message message2 = new Message();
            message2.setResponseStatus("200");
            message2.setHead(Message.CONTENT_TYPE, "application/javascript");
            message2.setBody(FileKit.loadFileContent("jquery.js"));
            session.write(message2);
        }
    };
    private Message.MessageHandler dataHandler = new Message.MessageHandler() { // from class: org.zbus.mq.server.MqAdaptor.8
        @Override // org.zbus.net.Client.MsgHandler
        public void handle(Message message, Session session) throws IOException {
            Protocol.BrokerInfo statInfo = MqAdaptor.this.getStatInfo();
            Message message2 = new Message();
            message2.setResponseStatus("200");
            message2.setId(message.getId());
            message2.setHead(Message.CONTENT_TYPE, "application/json");
            message2.setBody(JsonKit.toJson(statInfo));
            session.write(message2);
        }
    };
    private Message.MessageHandler queryHandler = new Message.MessageHandler() { // from class: org.zbus.mq.server.MqAdaptor.9
        @Override // org.zbus.net.Client.MsgHandler
        public void handle(Message message, Session session) throws IOException {
            String json;
            if (message.getMq() == null) {
                json = JsonKit.toJson(MqAdaptor.this.getStatInfo());
            } else {
                AbstractMQ findMQ = MqAdaptor.this.findMQ(message, session);
                if (findMQ == null) {
                    return;
                } else {
                    json = JsonKit.toJson(findMQ.getMqInfo());
                }
            }
            Message message2 = new Message();
            message2.setResponseStatus("200");
            message2.setId(message.getId());
            message2.setHead(Message.CONTENT_TYPE, "application/json");
            message2.setBody(json);
            session.write(message2);
        }
    };
    private Message.MessageHandler heartbeatHandler = new Message.MessageHandler() { // from class: org.zbus.mq.server.MqAdaptor.10
        @Override // org.zbus.net.Client.MsgHandler
        public void handle(Message message, Session session) throws IOException {
        }
    };

    public MqAdaptor(MqServer mqServer) {
        this.registerToken = "";
        codec(new MessageCodec());
        this.mqServer = mqServer;
        this.mqTable = mqServer.getMqTable();
        this.sessionTable = mqServer.getSessionTable();
        this.registerToken = mqServer.getRegisterToken();
        registerHandler(Protocol.Produce, this.produceHandler);
        registerHandler(Protocol.Consume, this.consumeHandler);
        registerHandler(Protocol.Route, this.routeHandler);
        registerHandler(Protocol.CreateMQ, this.createMqHandler);
        registerHandler(Protocol.Test, this.testHandler);
        registerHandler(Protocol.Query, this.queryHandler);
        registerHandler("", this.homeHandler);
        registerHandler(Protocol.Data, this.dataHandler);
        registerHandler(Protocol.Jquery, this.jqueryHandler);
        registerHandler(Message.HEARTBEAT, this.heartbeatHandler);
    }

    private Message handleUrlMessage(Message message) {
        AbstractMQ abstractMQ;
        UrlInfo urlInfo = new UrlInfo(message.getRequestString());
        if (urlInfo.empty) {
            message.setCmd("");
            return message;
        }
        if (urlInfo.mq != null) {
            if (message.getMq() == null) {
                message.setMq(urlInfo.mq);
            }
            String str = urlInfo.method;
            if (str == null) {
                str = "";
            }
            if ((urlInfo.method != null || urlInfo.cmd == null) && (abstractMQ = this.mqTable.get(urlInfo.mq)) != null && Protocol.MqMode.isEnabled(abstractMQ.getMode(), Protocol.MqMode.RPC)) {
                message.setMq(urlInfo.mq);
                message.setAck(false);
                message.setCmd(Protocol.Produce);
                String str2 = ("{\"module\": \"" + (urlInfo.module == null ? "" : urlInfo.module) + "\"") + ", \"method\": \"" + str + "\"";
                if (urlInfo.params != null) {
                    str2 = str2 + ", \"params\": [" + urlInfo.params + "]";
                }
                message.setJsonBody(str2 + "}");
            }
        }
        if (urlInfo.cmd != null && message.getCmd() == null) {
            message.setCmd(urlInfo.cmd);
        }
        return message;
    }

    @Override // org.zbus.net.core.IoAdaptor
    public void onMessage(Object obj, Session session) throws IOException {
        Message.MessageHandler messageHandler;
        Message message = (Message) obj;
        message.setSender(session.id());
        message.setServer(this.mqServer.getServerAddr());
        message.setRemoteAddr(session.getRemoteAddress());
        if (this.verbose) {
            log.info("\n%s", message);
        }
        String cmd = message.getCmd();
        if (cmd == null) {
            message = handleUrlMessage(message);
            cmd = message.getCmd();
        }
        if (cmd != null && (messageHandler = this.handlerMap.get(cmd)) != null) {
            messageHandler.handle(message, session);
            return;
        }
        Message message2 = new Message();
        message2.setId(message.getId());
        message2.setResponseStatus(400);
        message2.setBody(String.format("Bad format: command(%s) not support", cmd));
        session.write(message2);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public AbstractMQ findMQ(Message message, Session session) throws IOException {
        AbstractMQ abstractMQ = this.mqTable.get(message.getMq());
        if (abstractMQ != null) {
            return abstractMQ;
        }
        ReplyKit.reply404(message, session);
        return null;
    }

    public void registerHandler(String str, Message.MessageHandler messageHandler) {
        this.handlerMap.put(str, messageHandler);
    }

    private void cleanSession(Session session) {
        AbstractMQ abstractMQ;
        log.info("Clean: " + session);
        this.sessionTable.remove(session.id());
        String str = (String) session.attr(Message.MQ);
        if (str == null || (abstractMQ = this.mqTable.get(str)) == null) {
            return;
        }
        abstractMQ.cleanSession(session);
        this.mqServer.pubEntryUpdate(abstractMQ);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.zbus.net.core.IoAdaptor
    public void onSessionAccepted(Session session) throws IOException {
        this.sessionTable.put(session.id(), session);
        super.onSessionAccepted(session);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.zbus.net.core.IoAdaptor
    public void onException(Throwable th, Session session) throws IOException {
        cleanSession(session);
        super.onException(th, session);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.zbus.net.core.IoAdaptor
    public void onSessionToDestroy(Session session) throws IOException {
        cleanSession(session);
        super.onSessionToDestroy(session);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean auth(AbstractMQ abstractMQ, Message message) {
        return abstractMQ.auth(message.getHead("appid", ""), message.getHead("token", ""));
    }

    public void setVerbose(boolean z) {
        this.verbose = z;
    }

    public Protocol.BrokerInfo getStatInfo() {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, AbstractMQ> entry : this.mqTable.entrySet()) {
            Protocol.MqInfo mqInfo = entry.getValue().getMqInfo();
            mqInfo.consumerInfoList.clear();
            hashMap.put(entry.getKey(), mqInfo);
        }
        Protocol.BrokerInfo brokerInfo = new Protocol.BrokerInfo();
        brokerInfo.broker = this.mqServer.getServerAddr();
        brokerInfo.mqTable = hashMap;
        return brokerInfo;
    }

    public void loadMQ(String str) {
        log.info("Loading DiskQueues...");
        this.mqTable.clear();
        DiskQueuePool.init(str);
        for (Map.Entry<String, DiskQueuePool.DiskQueue> entry : DiskQueuePool.getQueryMap().entrySet()) {
            String key = entry.getKey();
            DiskQueuePool.DiskQueue value = entry.getValue();
            int flag = value.getFlag();
            MessageDiskQueue messageDiskQueue = new MessageDiskQueue(key, value);
            AbstractMQ pubSub = Protocol.MqMode.isEnabled(flag, Protocol.MqMode.PubSub) ? new PubSub(key, messageDiskQueue) : new MQ(key, messageDiskQueue);
            pubSub.setMode(flag);
            pubSub.lastUpdateTime = System.currentTimeMillis();
            pubSub.creator = "System";
            this.mqTable.put(key, pubSub);
            this.mqServer.pubEntryUpdate(pubSub);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        DiskQueuePool.destory();
    }
}
