/*
 * Decompiled with CFR 0.152.
 */
package com.rocoinfo.appender;

import ch.qos.logback.core.spi.ContextAware;
import com.rocoinfo.appender.FlumeAvroWorker;
import com.rocoinfo.appender.FlumeHost;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flume.Event;
import org.apache.flume.api.RpcClientConfigurationConstants;
import org.apache.flume.api.RpcClientFactory;

public class FlumeAvroMaster {
    private final ContextAware loggingContext;
    private static final int MAX_RECONNECTS = 3;
    private static final long DEFALT_MAX_IDLE_TIME = 3000L;
    private static final int DEFAULT_BATCH_SIZE = 100;
    private static final int DEFALUT_EVENT_QUEUE_SIZE = 10000;
    private static final int DEFALUT_MAX_THREAD_POOL_SIZE = 4;
    private static final int DEFALUT_WORK_QUEUE_SIZE = 100;
    private final BlockingQueue<Event> eventQueue;
    private final SchedulerThread scheduler;
    private final FlumeAvroWorker worker;

    public static FlumeAvroMaster create(List<FlumeHost> hosts, Properties additionProps, Integer batchSize, Integer eventQueueSize, Long maxIdleTime, Integer maxThreadPoolSize, Integer workQueueSize, ContextAware context) {
        if (CollectionUtils.isNotEmpty(hosts)) {
            Properties props = FlumeAvroMaster.buildFlumeProperties(hosts);
            props.putAll((Map<?, ?>)additionProps);
            return new FlumeAvroMaster(props, batchSize, eventQueueSize, maxIdleTime, maxThreadPoolSize, workQueueSize, context);
        }
        context.addError("flume agent host\u4e3a\u7a7a\uff01");
        return null;
    }

    private FlumeAvroMaster(Properties props, Integer batchSizeReq, Integer eventQueueSizeReq, Long maxIdleTimeReq, Integer maxThreadPoolSizeReq, Integer workQueueSizeReq, ContextAware context) {
        this.loggingContext = context;
        int batchSize = batchSizeReq == null ? 100 : batchSizeReq;
        long maxIdleTime = maxIdleTimeReq == null ? 3000L : maxIdleTimeReq;
        int eventQueueSize = eventQueueSizeReq == null ? 10000 : eventQueueSizeReq;
        int maxThreadPoolSize = maxThreadPoolSizeReq == null ? 4 : maxThreadPoolSizeReq;
        int workQueueSize = workQueueSizeReq == null ? 100 : workQueueSizeReq;
        this.worker = new FlumeAvroWorker(props, this.loggingContext, maxThreadPoolSize, workQueueSize);
        this.eventQueue = new ArrayBlockingQueue<Event>(eventQueueSize);
        this.scheduler = new SchedulerThread(this.eventQueue, batchSize, maxIdleTime);
        this.scheduler.start();
    }

    public void stop() {
        this.scheduler.shutdown();
    }

    public void commit(Event event) {
        if (event != null) {
            this.eventQueue.add(event);
        }
    }

    private static Properties buildFlumeProperties(List<FlumeHost> agents) {
        Properties props = new Properties();
        int i = 0;
        for (FlumeHost agent : agents) {
            props.put("hosts.h" + i++, agent.getHostname() + ':' + agent.getPort());
        }
        StringBuffer buffer = new StringBuffer(i * 4);
        for (int j = 0; j < i; ++j) {
            buffer.append("h").append(j).append(" ");
        }
        props.put("hosts", buffer.toString());
        props.put("max-attempts", Integer.toString(3 * agents.size()));
        props.put("connect-timeout", (Object)RpcClientConfigurationConstants.DEFAULT_CONNECT_TIMEOUT_MILLIS);
        props.put("request-timeout", (Object)RpcClientConfigurationConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS);
        if (i > 1) {
            props.put("client.type", RpcClientFactory.ClientType.DEFAULT_LOADBALANCE);
            props.put("host-selector", "ROUND_ROBIN");
        }
        props.put("backoff", "true");
        props.put("maxBackoff", "10000");
        return props;
    }

    private class SchedulerThread
    extends Thread {
        private final BlockingQueue<Event> queue;
        private final int batchSize;
        private final long maxIdleTime;
        private volatile boolean shutdown = false;

        private SchedulerThread(BlockingQueue<Event> queue, int batchSize, long maxIdleTime) {
            this.queue = queue;
            this.batchSize = batchSize;
            this.maxIdleTime = maxIdleTime;
            this.setDaemon(true);
            this.setName(SchedulerThread.class.getSimpleName());
        }

        @Override
        public void run() {
            while (!this.shutdown) {
                Event[] batch;
                long lastPoll = System.currentTimeMillis();
                long maxTime = lastPoll + this.maxIdleTime;
                Event[] events = new Event[this.batchSize];
                int count = 0;
                try {
                    while (count < this.batchSize && System.currentTimeMillis() < maxTime) {
                        lastPoll = Math.max(System.currentTimeMillis(), lastPoll);
                        Event event = this.queue.poll(maxTime - lastPoll, TimeUnit.MILLISECONDS);
                        if (event == null) continue;
                        events[count++] = event;
                    }
                }
                catch (InterruptedException ie) {
                    FlumeAvroMaster.this.loggingContext.addWarn(ie.getLocalizedMessage(), (Throwable)ie);
                }
                if (count <= 0) continue;
                if (count == events.length) {
                    batch = events;
                } else {
                    batch = new Event[count];
                    System.arraycopy(events, 0, batch, 0, count);
                }
                FlumeAvroMaster.this.worker.send(batch);
            }
            FlumeAvroMaster.this.worker.shutdown();
        }

        void shutdown() {
            FlumeAvroMaster.this.loggingContext.addInfo("Shutting down command received");
            this.shutdown = true;
        }
    }
}

