package backtype.storm.utils;

import backtype.storm.metric.api.IStatefulObject;
import com.lmax.disruptor.AlertException;
import com.lmax.disruptor.ClaimStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.SingleThreadedClaimStrategy;
import com.lmax.disruptor.WaitStrategy;
import java.util.HashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/* loaded from: input_file:backtype/storm/utils/DisruptorQueue.class */
public class DisruptorQueue implements IStatefulObject {
    RingBuffer<MutableObject> _buffer;
    SequenceBarrier _barrier;
    volatile boolean consumerStartedFlag;
    private String _queueName;
    static final Object FLUSH_CACHE = new Object();
    static final Object INTERRUPT = new Object();
    private static String PREFIX = "disruptor-";
    ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue<>();
    private final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock();
    private final Lock readLock = this.cacheLock.readLock();
    private final Lock writeLock = this.cacheLock.writeLock();
    Sequence _consumer = new Sequence();

    /* loaded from: input_file:backtype/storm/utils/DisruptorQueue$ObjectEventFactory.class */
    public static class ObjectEventFactory implements EventFactory<MutableObject> {
        /* renamed from: newInstance, reason: merged with bridge method [inline-methods] */
        public MutableObject m2287newInstance() {
            return new MutableObject();
        }
    }

    public DisruptorQueue(String str, ClaimStrategy claimStrategy, WaitStrategy waitStrategy) {
        this.consumerStartedFlag = false;
        this._queueName = "";
        this._queueName = PREFIX + str;
        this._buffer = new RingBuffer<>(new ObjectEventFactory(), claimStrategy, waitStrategy);
        this._barrier = this._buffer.newBarrier(new Sequence[0]);
        this._buffer.setGatingSequences(new Sequence[]{this._consumer});
        if (claimStrategy instanceof SingleThreadedClaimStrategy) {
            this.consumerStartedFlag = true;
            return;
        }
        try {
            publishDirect(FLUSH_CACHE, true);
        } catch (InsufficientCapacityException e) {
            throw new RuntimeException("This code should be unreachable!", e);
        }
    }

    public String getName() {
        return this._queueName;
    }

    public void consumeBatch(EventHandler<Object> eventHandler) {
        consumeBatchToCursor(this._barrier.getCursor(), eventHandler);
    }

    public void haltWithInterrupt() {
        publish(INTERRUPT);
    }

    public void consumeBatchWhenAvailable(EventHandler<Object> eventHandler) {
        try {
            long j = this._consumer.get() + 1;
            long waitFor = this._barrier.waitFor(j, 10L, TimeUnit.MILLISECONDS);
            if (waitFor >= j) {
                consumeBatchToCursor(waitFor, eventHandler);
            }
        } catch (AlertException e) {
            throw new RuntimeException((Throwable) e);
        } catch (InterruptedException e2) {
            throw new RuntimeException(e2);
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:25:0x006b, code lost:
    
        throw new java.lang.InterruptedException("Disruptor processing interrupted");
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void consumeBatchToCursor(long r10, com.lmax.disruptor.EventHandler<java.lang.Object> r12) {
        /*
            r9 = this;
            r0 = r9
            com.lmax.disruptor.Sequence r0 = r0._consumer
            long r0 = r0.get()
            r1 = 1
            long r0 = r0 + r1
            r13 = r0
        Lb:
            r0 = r13
            r1 = r10
            int r0 = (r0 > r1 ? 1 : (r0 == r1 ? 0 : -1))
            if (r0 > 0) goto L9a
            r0 = r9
            com.lmax.disruptor.RingBuffer<backtype.storm.utils.MutableObject> r0 = r0._buffer     // Catch: java.lang.Exception -> L85
            r1 = r13
            java.lang.Object r0 = r0.get(r1)     // Catch: java.lang.Exception -> L85
            backtype.storm.utils.MutableObject r0 = (backtype.storm.utils.MutableObject) r0     // Catch: java.lang.Exception -> L85
            r15 = r0
            r0 = r15
            java.lang.Object r0 = r0.o     // Catch: java.lang.Exception -> L85
            r16 = r0
            r0 = r15
            r1 = 0
            r0.setObject(r1)     // Catch: java.lang.Exception -> L85
            r0 = r16
            java.lang.Object r1 = backtype.storm.utils.DisruptorQueue.FLUSH_CACHE     // Catch: java.lang.Exception -> L85
            if (r0 != r1) goto L5a
            r0 = 0
            r17 = r0
        L38:
            r0 = r9
            java.util.concurrent.ConcurrentLinkedQueue<java.lang.Object> r0 = r0._cache     // Catch: java.lang.Exception -> L85
            java.lang.Object r0 = r0.poll()     // Catch: java.lang.Exception -> L85
            r17 = r0
            r0 = r17
            if (r0 != 0) goto L49
            goto L57
        L49:
            r0 = r12
            r1 = r17
            r2 = r13
            r3 = 1
            r0.onEvent(r1, r2, r3)     // Catch: java.lang.Exception -> L85
            goto L38
        L57:
            goto L82
        L5a:
            r0 = r16
            java.lang.Object r1 = backtype.storm.utils.DisruptorQueue.INTERRUPT     // Catch: java.lang.Exception -> L85
            if (r0 != r1) goto L6c
            java.lang.InterruptedException r0 = new java.lang.InterruptedException     // Catch: java.lang.Exception -> L85
            r1 = r0
            java.lang.String r2 = "Disruptor processing interrupted"
            r1.<init>(r2)     // Catch: java.lang.Exception -> L85
            throw r0     // Catch: java.lang.Exception -> L85
        L6c:
            r0 = r12
            r1 = r16
            r2 = r13
            r3 = r13
            r4 = r10
            int r3 = (r3 > r4 ? 1 : (r3 == r4 ? 0 : -1))
            if (r3 != 0) goto L7c
            r3 = 1
            goto L7d
        L7c:
            r3 = 0
        L7d:
            r0.onEvent(r1, r2, r3)     // Catch: java.lang.Exception -> L85
        L82:
            goto L91
        L85:
            r15 = move-exception
            java.lang.RuntimeException r0 = new java.lang.RuntimeException
            r1 = r0
            r2 = r15
            r1.<init>(r2)
            throw r0
        L91:
            r0 = r13
            r1 = 1
            long r0 = r0 + r1
            r13 = r0
            goto Lb
        L9a:
            r0 = r9
            com.lmax.disruptor.Sequence r0 = r0._consumer
            r1 = r10
            r0.set(r1)
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(long, com.lmax.disruptor.EventHandler):void");
    }

    public void publish(Object obj) {
        try {
            publish(obj, true);
        } catch (InsufficientCapacityException e) {
            throw new RuntimeException("This code should be unreachable!");
        }
    }

    public void tryPublish(Object obj) throws InsufficientCapacityException {
        publish(obj, false);
    }

    public void publish(Object obj, boolean z) throws InsufficientCapacityException {
        boolean z2 = this.consumerStartedFlag;
        if (!z2) {
            this.readLock.lock();
            try {
                z2 = this.consumerStartedFlag;
                if (!z2) {
                    this._cache.add(obj);
                }
            } finally {
                this.readLock.unlock();
            }
        }
        if (z2) {
            publishDirect(obj, z);
        }
    }

    private void publishDirect(Object obj, boolean z) throws InsufficientCapacityException {
        long next = z ? this._buffer.next() : this._buffer.tryNext(1);
        ((MutableObject) this._buffer.get(next)).setObject(obj);
        this._buffer.publish(next);
    }

    public void consumerStarted() {
        this.consumerStartedFlag = true;
        this.writeLock.lock();
        this.writeLock.unlock();
    }

    public long population() {
        return writePos() - readPos();
    }

    public long capacity() {
        return this._buffer.getBufferSize();
    }

    public long writePos() {
        return this._buffer.getCursor();
    }

    public long readPos() {
        return this._consumer.get();
    }

    public float pctFull() {
        return (1.0f * ((float) population())) / ((float) capacity());
    }

    @Override // backtype.storm.metric.api.IStatefulObject
    public Object getState() {
        HashMap hashMap = new HashMap();
        long readPos = readPos();
        long writePos = writePos();
        hashMap.put("capacity", Long.valueOf(capacity()));
        hashMap.put("population", Long.valueOf(writePos - readPos));
        hashMap.put("write_pos", Long.valueOf(writePos));
        hashMap.put("read_pos", Long.valueOf(readPos));
        return hashMap;
    }
}
