package backtype.storm.messaging.netty;

import backtype.storm.Config;
import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.TaskMessage;
import backtype.storm.utils.Utils;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:backtype/storm/messaging/netty/Client.class */
public class Client implements IConnection {
    private static final Logger LOG = LoggerFactory.getLogger(Client.class);
    private final int max_retries;
    private final int base_sleep_ms;
    private final int max_sleep_ms;
    private final ClientBootstrap bootstrap;
    private InetSocketAddress remote_addr;
    private final ChannelFactory factory;
    private final int buffer_size;
    private final Random random = new Random();
    private LinkedBlockingQueue<Object> message_queue = new LinkedBlockingQueue<>();
    private AtomicInteger retries = new AtomicInteger(0);
    private AtomicReference<Channel> channelRef = new AtomicReference<>(null);
    private final AtomicBoolean being_closed = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: package-private */
    public Client(Map map, String str, int i) {
        this.buffer_size = Utils.getInt(map.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE)).intValue();
        this.max_retries = Math.min(30, Utils.getInt(map.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES)).intValue());
        this.base_sleep_ms = Utils.getInt(map.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS)).intValue();
        this.max_sleep_ms = Utils.getInt(map.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS)).intValue();
        int intValue = Utils.getInt(map.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS)).intValue();
        if (intValue > 0) {
            this.factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), intValue);
        } else {
            this.factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
        }
        this.bootstrap = new ClientBootstrap(this.factory);
        this.bootstrap.setOption("tcpNoDelay", true);
        this.bootstrap.setOption("sendBufferSize", Integer.valueOf(this.buffer_size));
        this.bootstrap.setOption("keepAlive", true);
        this.bootstrap.setPipelineFactory(new StormClientPipelineFactory(this));
        this.remote_addr = new InetSocketAddress(str, i);
        this.bootstrap.connect(this.remote_addr);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reconnect() {
        try {
            int incrementAndGet = this.retries.incrementAndGet();
            if (incrementAndGet <= this.max_retries) {
                Thread.sleep(getSleepTimeMs());
                LOG.info("Reconnect ... [{}]", Integer.valueOf(incrementAndGet));
                this.bootstrap.connect(this.remote_addr);
                LOG.debug("connection started...");
            } else {
                LOG.warn("Remote address is not reachable. We will close this client.");
                close();
            }
        } catch (InterruptedException e) {
            LOG.warn("connection failed", e);
        }
    }

    private int getSleepTimeMs() {
        int max = this.base_sleep_ms * Math.max(1, this.random.nextInt(1 << this.retries.get()));
        if (max > this.max_sleep_ms) {
            max = this.max_sleep_ms;
        }
        return max;
    }

    @Override // backtype.storm.messaging.IConnection
    public void send(int i, byte[] bArr) {
        if (this.being_closed.get()) {
            throw new RuntimeException("Client is being closed, and does not take requests any more");
        }
        try {
            this.message_queue.put(new TaskMessage(i, bArr));
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MessageBatch takeMessages() throws InterruptedException {
        Object peek;
        MessageBatch messageBatch = new MessageBatch(this.buffer_size);
        Object take = this.message_queue.take();
        messageBatch.add(take);
        if (take == ControlMessage.CLOSE_MESSAGE) {
            return messageBatch;
        }
        while (true) {
            if (messageBatch.isFull() || (peek = this.message_queue.peek()) == null) {
                break;
            }
            if (peek == ControlMessage.CLOSE_MESSAGE) {
                this.message_queue.take();
                messageBatch.add(peek);
                break;
            }
            if (!messageBatch.tryAdd((TaskMessage) peek)) {
                break;
            }
            this.message_queue.take();
        }
        return messageBatch;
    }

    @Override // backtype.storm.messaging.IConnection
    public synchronized void close() {
        if (this.being_closed.get()) {
            return;
        }
        try {
            this.message_queue.put(ControlMessage.CLOSE_MESSAGE);
            this.being_closed.set(true);
        } catch (InterruptedException e) {
            close_n_release();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close_n_release() {
        if (this.channelRef.get() != null) {
            this.channelRef.get().close().awaitUninterruptibly();
        }
        new Thread(new Runnable() { // from class: backtype.storm.messaging.netty.Client.1
            @Override // java.lang.Runnable
            public void run() {
                Client.this.factory.releaseExternalResources();
            }
        }).start();
    }

    @Override // backtype.storm.messaging.IConnection
    public TaskMessage recv(int i) {
        throw new RuntimeException("Client connection should not receive any messages");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setChannel(Channel channel) {
        this.channelRef.set(channel);
        if (channel != null) {
            this.retries.set(0);
        }
    }
}
