/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.eventhubs;

import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventDataUtil;
import com.microsoft.azure.eventhubs.PartitionReceiveHandler;
import com.microsoft.azure.servicebus.ClientEntity;
import com.microsoft.azure.servicebus.MessageReceiver;
import com.microsoft.azure.servicebus.MessagingFactory;
import com.microsoft.azure.servicebus.ServiceBusException;
import com.microsoft.azure.servicebus.StringUtil;
import java.time.Instant;
import java.util.Collection;
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.qpid.proton.message.Message;

public final class PartitionReceiver
extends ClientEntity {
    private static final int MINIMUM_PREFETCH_COUNT = 10;
    private static final int MAXIMUM_PREFETCH_COUNT = 999;
    static final int DEFAULT_PREFETCH_COUNT = 300;
    static final long NULL_EPOCH = 0L;
    public static final String START_OF_STREAM = "-1";
    private final String partitionId;
    private final MessagingFactory underlyingFactory;
    private final String eventHubName;
    private final String consumerGroupName;
    private final Object receiveHandlerSync;
    private String startingOffset;
    private boolean offsetInclusive;
    private Instant startingDateTime;
    private MessageReceiver internalReceiver;
    private Long epoch;
    private boolean isEpochReceiver;
    private PartitionReceiveHandler onReceiveHandler;
    private boolean isOnReceivePumpRunning;
    private Thread onReceivePumpThread;

    private PartitionReceiver(MessagingFactory factory, String eventHubName, String consumerGroupName, String partitionId, String startingOffset, boolean offsetInclusive, Instant dateTime, Long epoch, boolean isEpochReceiver) throws ServiceBusException {
        super(null);
        this.underlyingFactory = factory;
        this.eventHubName = eventHubName;
        this.consumerGroupName = consumerGroupName;
        this.partitionId = partitionId;
        this.startingOffset = startingOffset;
        this.offsetInclusive = offsetInclusive;
        this.startingDateTime = dateTime;
        this.epoch = epoch;
        this.isEpochReceiver = isEpochReceiver;
        this.receiveHandlerSync = new Object();
        this.isOnReceivePumpRunning = false;
    }

    static CompletableFuture<PartitionReceiver> create(MessagingFactory factory, String eventHubName, String consumerGroupName, String partitionId, String startingOffset, boolean offsetInclusive, Instant dateTime, long epoch, boolean isEpochReceiver) throws ServiceBusException {
        if (epoch < 0L) {
            throw new IllegalArgumentException("epoch cannot be a negative value. Please specify a zero or positive long value.");
        }
        if (StringUtil.isNullOrWhiteSpace(consumerGroupName)) {
            throw new IllegalArgumentException("specify valid string for argument - 'consumerGroupName'");
        }
        final PartitionReceiver receiver = new PartitionReceiver(factory, eventHubName, consumerGroupName, partitionId, startingOffset, offsetInclusive, dateTime, epoch, isEpochReceiver);
        return receiver.createInternalReceiver().thenApplyAsync(new Function<Void, PartitionReceiver>(){

            @Override
            public PartitionReceiver apply(Void a) {
                return receiver;
            }
        });
    }

    private CompletableFuture<Void> createInternalReceiver() throws ServiceBusException {
        return MessageReceiver.create(this.underlyingFactory, StringUtil.getRandomString(), String.format("%s/ConsumerGroups/%s/Partitions/%s", this.eventHubName, this.consumerGroupName, this.partitionId), this.startingOffset, this.offsetInclusive, this.startingDateTime, 300, this.epoch, this.isEpochReceiver).thenAcceptAsync((Consumer)new Consumer<MessageReceiver>(){

            @Override
            public void accept(MessageReceiver r) {
                PartitionReceiver.this.internalReceiver = r;
            }
        });
    }

    final String getStartingOffset() {
        return this.startingOffset;
    }

    final boolean getOffsetInclusive() {
        return this.offsetInclusive;
    }

    public final String getPartitionId() {
        return this.partitionId;
    }

    public final int getPrefetchCount() {
        return this.internalReceiver.getPrefetchCount();
    }

    public final void setPrefetchCount(int prefetchCount) {
        if (prefetchCount < 10 && prefetchCount > 999) {
            throw new IllegalArgumentException(String.format(Locale.US, "PrefetchCount has to be between %s and %s", 10, 999));
        }
        this.internalReceiver.setPrefetchCount(prefetchCount);
    }

    public final long getEpoch() {
        return this.epoch;
    }

    public final Iterable<EventData> receiveSync() throws ServiceBusException {
        try {
            return this.receive().get();
        }
        catch (InterruptedException | ExecutionException exception) {
            Throwable throwable;
            if (exception instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            if ((throwable = exception.getCause()) != null) {
                if (throwable instanceof RuntimeException) {
                    throw (RuntimeException)throwable;
                }
                if (throwable instanceof ServiceBusException) {
                    throw (ServiceBusException)throwable;
                }
                throw new ServiceBusException(true, throwable);
            }
            return null;
        }
    }

    public CompletableFuture<Iterable<EventData>> receive() {
        return this.internalReceiver.receive().thenApply(new Function<Collection<Message>, Iterable<EventData>>(){

            @Override
            public Iterable<EventData> apply(Collection<Message> amqpMessages) {
                return EventDataUtil.toEventDataCollection(amqpMessages);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setReceiveHandler(PartitionReceiveHandler receiveHandler) {
        Object object = this.receiveHandlerSync;
        synchronized (object) {
            if (receiveHandler == null) {
                if (this.onReceiveHandler != null) {
                    this.isOnReceivePumpRunning = false;
                    this.onReceivePumpThread.interrupt();
                }
            } else {
                this.onReceiveHandler = receiveHandler;
                this.startOnReceivePump();
            }
        }
    }

    @Override
    public CompletableFuture<Void> close() {
        this.isOnReceivePumpRunning = false;
        if (this.onReceivePumpThread != null && !this.onReceivePumpThread.isInterrupted()) {
            this.onReceivePumpThread.interrupt();
        }
        if (this.internalReceiver != null) {
            return this.internalReceiver.close();
        }
        return CompletableFuture.completedFuture(null);
    }

    private void startOnReceivePump() {
        this.onReceivePumpThread = new Thread(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                Object object = PartitionReceiver.this.receiveHandlerSync;
                synchronized (object) {
                    PartitionReceiver.this.isOnReceivePumpRunning = true;
                }
                while (PartitionReceiver.this.isOnReceivePumpRunning) {
                    Iterable<EventData> receivedEvents = null;
                    try {
                        receivedEvents = PartitionReceiver.this.receive().get(PartitionReceiver.this.underlyingFactory.getOperationTimeout().getSeconds(), TimeUnit.SECONDS);
                    }
                    catch (InterruptedException | ExecutionException | TimeoutException clientException) {
                        block22: {
                            if (clientException instanceof TimeoutException) continue;
                            Throwable cause = clientException.getCause();
                            if (cause != null && (cause instanceof ServiceBusException && ((ServiceBusException)cause).getIsTransient() || !(cause instanceof RuntimeException))) {
                                try {
                                    PartitionReceiver.this.onReceiveHandler.onError(clientException.getCause());
                                    continue;
                                }
                                catch (Throwable userCodeError) {
                                    Object object2 = PartitionReceiver.this.receiveHandlerSync;
                                    synchronized (object2) {
                                        PartitionReceiver.this.isOnReceivePumpRunning = false;
                                    }
                                    PartitionReceiver.this.onReceiveHandler.onClose(userCodeError);
                                    break block22;
                                }
                            }
                            Object object3 = PartitionReceiver.this.receiveHandlerSync;
                            synchronized (object3) {
                                PartitionReceiver.this.isOnReceivePumpRunning = false;
                            }
                            PartitionReceiver.this.onReceiveHandler.onClose(cause);
                        }
                        if (clientException instanceof InterruptedException) {
                            Thread.currentThread().interrupt();
                        }
                        return;
                    }
                    if (receivedEvents == null || !receivedEvents.iterator().hasNext()) continue;
                    try {
                        PartitionReceiver.this.onReceiveHandler.onReceive(receivedEvents);
                    }
                    catch (Throwable userCodeError) {
                        Object object4 = PartitionReceiver.this.receiveHandlerSync;
                        synchronized (object4) {
                            PartitionReceiver.this.isOnReceivePumpRunning = false;
                        }
                        PartitionReceiver.this.onReceiveHandler.onClose(userCodeError);
                        if (userCodeError instanceof InterruptedException) {
                            Thread.currentThread().interrupt();
                        }
                        return;
                    }
                }
            }
        });
        this.onReceivePumpThread.start();
    }
}

