package com.microsoft.azure.servicebus;

import com.microsoft.azure.servicebus.amqp.AmqpConstants;
import com.microsoft.azure.servicebus.amqp.IAmqpReceiver;
import com.microsoft.azure.servicebus.amqp.ReceiveLinkHandler;
import com.microsoft.azure.servicebus.amqp.SessionHandler;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnknownDescribedType;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.message.Message;

/* loaded from: input_file:com/microsoft/azure/servicebus/MessageReceiver.class */
public class MessageReceiver extends ClientEntity implements IAmqpReceiver, IErrorContextProvider {
    private static final Logger TRACE_LOGGER = Logger.getLogger(ClientConstants.SERVICEBUS_CLIENT_TRACE);
    private static final Duration MINIMUM_RECEIVE_TIMER = Duration.ofSeconds(2);
    private final ConcurrentLinkedQueue<WorkItem<Collection<Message>>> pendingReceives;
    private final MessagingFactory underlyingFactory;
    private final ITimeoutErrorHandler stuckTransportHandler;
    private final String receivePath;
    private final Runnable onOperationTimedout;
    private final Duration operationTimeout;
    private int prefetchCount;
    private ConcurrentLinkedQueue<Message> prefetchedMessages;
    private Receiver receiveLink;
    private WorkItem<MessageReceiver> linkOpen;
    private CompletableFuture<Void> linkClose;
    private boolean closeCalled;
    private long epoch;
    private boolean isEpochReceiver;
    private Instant dateTime;
    private boolean offsetInclusive;
    private String lastReceivedOffset;
    private boolean linkCreateScheduled;
    private Object linkCreateLock;
    private Exception lastKnownLinkError;
    private int nextCreditToFlow;
    private Object flowSync;

    private MessageReceiver(MessagingFactory messagingFactory, ITimeoutErrorHandler iTimeoutErrorHandler, String str, String str2, String str3, boolean z, Instant instant, int i, Long l, boolean z2) {
        super(str);
        this.underlyingFactory = messagingFactory;
        this.stuckTransportHandler = iTimeoutErrorHandler;
        this.operationTimeout = messagingFactory.getOperationTimeout();
        this.receivePath = str2;
        this.prefetchCount = i;
        this.epoch = l.longValue();
        this.isEpochReceiver = z2;
        this.prefetchedMessages = new ConcurrentLinkedQueue<>();
        this.linkCreateLock = new Object();
        this.linkClose = new CompletableFuture<>();
        this.lastKnownLinkError = null;
        this.flowSync = new Object();
        if (str3 != null) {
            this.lastReceivedOffset = str3;
            this.offsetInclusive = z;
        } else {
            this.dateTime = instant;
        }
        this.pendingReceives = new ConcurrentLinkedQueue<>();
        this.onOperationTimedout = new Runnable() { // from class: com.microsoft.azure.servicebus.MessageReceiver.1
            @Override // java.lang.Runnable
            public void run() {
                boolean z3 = false;
                while (true) {
                    WorkItem workItem = (WorkItem) MessageReceiver.this.pendingReceives.peek();
                    if (workItem != null) {
                        if (workItem.getTimeoutTracker().remaining().getSeconds() > 0) {
                            MessageReceiver.this.scheduleOperationTimer(workItem.getTimeoutTracker());
                            break;
                        }
                        WorkItem workItem2 = (WorkItem) MessageReceiver.this.pendingReceives.poll();
                        if (workItem2 == null) {
                            break;
                        }
                        z3 = true;
                        workItem2.getWork().complete(null);
                    } else {
                        break;
                    }
                }
                if (z3) {
                    MessageReceiver.this.receiveLink.flow(0);
                    MessageReceiver.this.stuckTransportHandler.reportTimeoutError();
                }
            }
        };
    }

    public static CompletableFuture<MessageReceiver> create(MessagingFactory messagingFactory, String str, String str2, String str3, boolean z, Instant instant, int i, long j, boolean z2) {
        return new MessageReceiver(messagingFactory, messagingFactory, str, str2, str3, z, instant, i, Long.valueOf(j), z2).createLink();
    }

    private CompletableFuture<MessageReceiver> createLink() {
        this.linkOpen = new WorkItem<>(new CompletableFuture(), this.operationTimeout);
        scheduleLinkOpenTimeout(this.linkOpen.getTimeoutTracker());
        this.linkCreateScheduled = true;
        Timer.schedule(new Runnable() { // from class: com.microsoft.azure.servicebus.MessageReceiver.2
            @Override // java.lang.Runnable
            public void run() {
                MessageReceiver.this.receiveLink = MessageReceiver.this.createReceiveLink();
            }
        }, Duration.ofSeconds(0L), TimerType.OneTimeRun);
        return this.linkOpen.getWork();
    }

    public int getPrefetchCount() {
        return this.prefetchCount;
    }

    public void setPrefetchCount(int i) {
        this.prefetchCount = i;
    }

    public CompletableFuture<Collection<Message>> receive() {
        List<Message> receiveCore = receiveCore();
        if (receiveCore != null) {
            return CompletableFuture.completedFuture(receiveCore);
        }
        if (this.operationTimeout.compareTo(MINIMUM_RECEIVE_TIMER) <= 0) {
            return CompletableFuture.completedFuture(null);
        }
        if (this.pendingReceives.isEmpty()) {
            scheduleOperationTimer(TimeoutTracker.create(this.operationTimeout));
        }
        CompletableFuture<Collection<Message>> completableFuture = new CompletableFuture<>();
        this.pendingReceives.offer(new WorkItem<>(completableFuture, this.operationTimeout));
        return completableFuture;
    }

    public List<Message> receiveCore() {
        LinkedList linkedList = null;
        do {
            Message pollPrefetchQueue = pollPrefetchQueue();
            if (pollPrefetchQueue == null) {
                break;
            }
            if (linkedList == null) {
                linkedList = new LinkedList();
            }
            linkedList.add(pollPrefetchQueue);
        } while (linkedList.size() < this.prefetchCount);
        return linkedList;
    }

    @Override // com.microsoft.azure.servicebus.amqp.IAmqpLink
    public void onOpenComplete(Exception exc) {
        if (exc == null) {
            if (this.linkOpen != null && !this.linkOpen.getWork().isDone()) {
                this.linkOpen.getWork().complete(this);
            }
            this.lastKnownLinkError = null;
            this.offsetInclusive = false;
            this.underlyingFactory.getRetryPolicy().resetRetryCount(this.underlyingFactory.getClientId());
            if (this.receiveLink.getCredit() == 0) {
                sendFlow(this.prefetchCount - this.prefetchedMessages.size());
            }
        } else {
            if (this.linkOpen != null && !this.linkOpen.getWork().isDone()) {
                ExceptionUtil.completeExceptionally(this.linkOpen.getWork(), exc, this);
            }
            this.lastKnownLinkError = exc;
        }
        this.stuckTransportHandler.resetTimeoutErrorTracking();
        synchronized (this.linkCreateLock) {
            this.linkCreateScheduled = false;
        }
    }

    @Override // com.microsoft.azure.servicebus.amqp.IAmqpReceiver
    public void onReceiveComplete(Message message) {
        this.prefetchedMessages.add(message);
        this.underlyingFactory.getRetryPolicy().resetRetryCount(getClientId());
        this.stuckTransportHandler.resetTimeoutErrorTracking();
        WorkItem<Collection<Message>> poll = this.pendingReceives.poll();
        if (poll != null) {
            poll.getWork().complete(receiveCore());
        }
    }

    public void onError(ErrorCondition errorCondition) {
        onError(ExceptionUtil.toException(errorCondition));
    }

    @Override // com.microsoft.azure.servicebus.amqp.IAmqpLink
    public void onError(Exception exc) {
        exc.getStackTrace();
        this.lastKnownLinkError = exc;
        WorkItem<Collection<Message>> peek = this.pendingReceives.peek();
        TimeoutTracker timeoutTracker = peek != null ? peek.getTimeoutTracker() : (this.linkOpen == null || this.linkOpen.getWork() == null || this.linkOpen.getWork().isDone()) ? new TimeoutTracker(this.operationTimeout, true) : this.linkOpen.getTimeoutTracker();
        Duration nextRetryInterval = this.underlyingFactory.getRetryPolicy().getNextRetryInterval(getClientId(), exc, timeoutTracker == null ? Duration.ofSeconds(0L) : timeoutTracker.elapsed().compareTo(this.operationTimeout) > 0 ? Duration.ofSeconds(0L) : this.operationTimeout.minus(timeoutTracker.elapsed()));
        if (nextRetryInterval != null) {
            if (this.receiveLink.getLocalState() != EndpointState.CLOSED) {
                this.receiveLink.close();
            }
            scheduleRecreate(nextRetryInterval);
            return;
        }
        onOpenComplete(exc);
        while (true) {
            WorkItem<Collection<Message>> poll = this.pendingReceives.poll();
            if (poll == null) {
                return;
            }
            CompletableFuture<Collection<Message>> work = poll.getWork();
            if ((exc instanceof ServiceBusException) && ((ServiceBusException) exc).getIsTransient()) {
                work.complete(null);
            } else {
                ExceptionUtil.completeExceptionally(work, exc, this);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleOperationTimer(TimeoutTracker timeoutTracker) {
        if (timeoutTracker != null) {
            Timer.schedule(this.onOperationTimedout, timeoutTracker.remaining(), TimerType.OneTimeRun);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Receiver createReceiveLink() {
        long j;
        UnknownDescribedType unknownDescribedType;
        try {
            Connection connection = this.underlyingFactory.getConnection().get(this.operationTimeout.getSeconds(), TimeUnit.SECONDS);
            if (connection == null || connection.getLocalState() == EndpointState.CLOSED) {
                return null;
            }
            Source source = new Source();
            source.setAddress(this.receivePath);
            if (this.lastReceivedOffset == null) {
                try {
                    j = this.dateTime.toEpochMilli();
                } catch (ArithmeticException e) {
                    j = Long.MAX_VALUE;
                    if (TRACE_LOGGER.isLoggable(Level.WARNING)) {
                        TRACE_LOGGER.log(Level.WARNING, String.format("linkname[%s], linkPath[%s], warning[starting receiver from epoch+Long.Max]", this.receiveLink.getName(), this.receivePath, Integer.valueOf(this.receiveLink.getCredit())));
                    }
                }
                unknownDescribedType = new UnknownDescribedType(AmqpConstants.STRING_FILTER, String.format(AmqpConstants.AMQP_ANNOTATION_FORMAT, AmqpConstants.RECEIVED_AT_ANNOTATION_NAME, StringUtil.EMPTY, Long.valueOf(j)));
            } else {
                this.prefetchedMessages.clear();
                if (TRACE_LOGGER.isLoggable(Level.FINE)) {
                    TRACE_LOGGER.log(Level.FINE, String.format("action[recreateReceiveLink], offset[%s], offsetInclusive[%s]", this.lastReceivedOffset, Boolean.valueOf(this.offsetInclusive)));
                }
                Symbol symbol = AmqpConstants.STRING_FILTER;
                Object[] objArr = new Object[3];
                objArr[0] = AmqpConstants.OFFSET_ANNOTATION_NAME;
                objArr[1] = this.offsetInclusive ? "=" : StringUtil.EMPTY;
                objArr[2] = this.lastReceivedOffset;
                unknownDescribedType = new UnknownDescribedType(symbol, String.format(AmqpConstants.AMQP_ANNOTATION_FORMAT, objArr));
            }
            source.setFilter(Collections.singletonMap(AmqpConstants.STRING_FILTER, unknownDescribedType));
            Session session = connection.session();
            session.setIncomingCapacity(Integer.MAX_VALUE);
            session.open();
            BaseHandler.setHandler(session, new SessionHandler(this.receivePath));
            Link receiver = session.receiver(StringUtil.getRandomString().concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(connection.getRemoteContainer()));
            receiver.setSource(source);
            receiver.setTarget(new Target());
            receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
            receiver.setReceiverSettleMode(ReceiverSettleMode.SECOND);
            if (this.isEpochReceiver) {
                receiver.setProperties(Collections.singletonMap(AmqpConstants.EPOCH, Long.valueOf(this.epoch)));
            }
            BaseHandler.setHandler(receiver, new ReceiveLinkHandler(this));
            this.underlyingFactory.registerForConnectionError(receiver);
            receiver.open();
            return receiver;
        } catch (InterruptedException | ExecutionException e2) {
            Throwable cause = e2.getCause();
            if (cause != null && (cause instanceof Exception)) {
                onError((Exception) e2.getCause());
            }
            if (!(e2 instanceof InterruptedException)) {
                return null;
            }
            Thread.currentThread().interrupt();
            return null;
        } catch (TimeoutException e3) {
            onError(new ServiceBusException(false, "Connection creation timed out.", e3));
            return null;
        }
    }

    private Message pollPrefetchQueue() {
        Message poll = this.prefetchedMessages.poll();
        if (poll != null) {
            this.lastReceivedOffset = poll.getMessageAnnotations().getValue().get(AmqpConstants.OFFSET).toString();
            sendFlow(1);
        }
        return poll;
    }

    private void sendFlow(int i) {
        int i2 = 0;
        synchronized (this.flowSync) {
            this.nextCreditToFlow += i;
            if (this.nextCreditToFlow >= this.prefetchCount) {
                i2 = this.nextCreditToFlow;
                this.receiveLink.flow(this.nextCreditToFlow);
                this.nextCreditToFlow = 0;
            }
        }
        if (i2 == 0 || !TRACE_LOGGER.isLoggable(Level.FINE)) {
            return;
        }
        TRACE_LOGGER.log(Level.FINE, String.format("linkname[%s], updated-link-credit[%s], sentCredits[%s]", this.receiveLink.getName(), Integer.valueOf(this.receiveLink.getCredit()), Integer.valueOf(i)));
    }

    private void scheduleRecreate(Duration duration) {
        synchronized (this.linkCreateLock) {
            if (this.linkCreateScheduled) {
                return;
            }
            this.linkCreateScheduled = true;
            Timer.schedule(new Runnable() { // from class: com.microsoft.azure.servicebus.MessageReceiver.3
                @Override // java.lang.Runnable
                public void run() {
                    if (MessageReceiver.this.receiveLink.getLocalState() != EndpointState.CLOSED) {
                        return;
                    }
                    Receiver createReceiveLink = MessageReceiver.this.createReceiveLink();
                    if (createReceiveLink != null) {
                        MessageReceiver.this.underlyingFactory.deregisterForConnectionError(MessageReceiver.this.receiveLink);
                        MessageReceiver.this.receiveLink = createReceiveLink;
                    } else {
                        synchronized (MessageReceiver.this.linkCreateLock) {
                            MessageReceiver.this.linkCreateScheduled = false;
                        }
                    }
                    MessageReceiver.this.underlyingFactory.getRetryPolicy().incrementRetryCount(MessageReceiver.this.getClientId());
                }
            }, duration, TimerType.OneTimeRun);
        }
    }

    private void scheduleLinkOpenTimeout(TimeoutTracker timeoutTracker) {
        Timer.schedule(new Runnable() { // from class: com.microsoft.azure.servicebus.MessageReceiver.4
            @Override // java.lang.Runnable
            public void run() {
                if (MessageReceiver.this.linkOpen.getWork().isDone()) {
                    return;
                }
                Exception exc = MessageReceiver.this.lastKnownLinkError;
                ServiceBusException serviceBusException = new ServiceBusException((exc == null || !(exc instanceof ServiceBusException)) ? true : ((ServiceBusException) exc).getIsTransient(), String.format(Locale.US, "ReceiveLink(%s) %s() on path(%s) timed out", MessageReceiver.this.receiveLink.getName(), "Open", MessageReceiver.this.receivePath), exc);
                if (MessageReceiver.TRACE_LOGGER.isLoggable(Level.WARNING)) {
                    MessageReceiver.TRACE_LOGGER.log(Level.WARNING, String.format(Locale.US, "message recever(linkName: %s, path: %s) %s call timedout", MessageReceiver.this.receiveLink.getName(), MessageReceiver.this.receivePath, "Open"), (Throwable) serviceBusException);
                }
                ExceptionUtil.completeExceptionally(MessageReceiver.this.linkOpen.getWork(), serviceBusException, MessageReceiver.this);
            }
        }, timeoutTracker.remaining(), TimerType.OneTimeRun);
    }

    private void scheduleLinkCloseTimeout(TimeoutTracker timeoutTracker) {
        Timer.schedule(new Runnable() { // from class: com.microsoft.azure.servicebus.MessageReceiver.5
            @Override // java.lang.Runnable
            public void run() {
                synchronized (MessageReceiver.this.linkClose) {
                    if (!MessageReceiver.this.linkClose.isDone()) {
                        TimeoutException timeoutException = new TimeoutException(String.format(Locale.US, "Receive Link(%s) %s() timed out", MessageReceiver.this.receiveLink.getName(), "Close"));
                        if (MessageReceiver.TRACE_LOGGER.isLoggable(Level.WARNING)) {
                            MessageReceiver.TRACE_LOGGER.log(Level.WARNING, String.format(Locale.US, "message recever(linkName: %s, path: %s) %s call timedout", MessageReceiver.this.receiveLink.getName(), MessageReceiver.this.receivePath, "Close"), (Throwable) timeoutException);
                        }
                        ExceptionUtil.completeExceptionally(MessageReceiver.this.linkClose, timeoutException, MessageReceiver.this);
                    }
                }
            }
        }, timeoutTracker.remaining(), TimerType.OneTimeRun);
    }

    @Override // com.microsoft.azure.servicebus.amqp.IAmqpLink
    public void onClose(ErrorCondition errorCondition) {
        synchronized (this.linkClose) {
            if (this.closeCalled) {
                this.linkClose.complete(null);
                this.closeCalled = false;
            } else if (errorCondition == null) {
                onError(new ServiceBusException(true, String.format(Locale.US, "Closing the link. LinkName(%s), EntityPath(%s)", this.receiveLink.getName(), this.receivePath)));
            } else {
                onError(errorCondition);
            }
        }
    }

    @Override // com.microsoft.azure.servicebus.ClientEntity
    public CompletableFuture<Void> close() {
        closeInternal();
        return this.linkClose;
    }

    private void closeInternal() {
        synchronized (this.linkClose) {
            if (!this.closeCalled) {
                if (this.receiveLink == null || this.receiveLink.getLocalState() == EndpointState.CLOSED) {
                    this.linkClose.complete(null);
                } else {
                    this.receiveLink.close();
                    scheduleLinkCloseTimeout(TimeoutTracker.create(this.operationTimeout));
                    this.closeCalled = true;
                }
            }
        }
    }

    @Override // com.microsoft.azure.servicebus.IErrorContextProvider
    public ErrorContext getContext() {
        boolean z = this.linkOpen != null && this.linkOpen.getWork().isDone();
        return new ReceiverContext(this.underlyingFactory != null ? this.underlyingFactory.getHostName() : null, this.receivePath, (this.receiveLink == null || this.receiveLink.getRemoteProperties() == null || !this.receiveLink.getRemoteProperties().containsKey(ClientConstants.TRACKING_ID_PROPERTY)) ? this.receiveLink != null ? this.receiveLink.getName() : null : this.receiveLink.getRemoteProperties().get(ClientConstants.TRACKING_ID_PROPERTY).toString(), z ? new Long(this.lastReceivedOffset) : null, z ? Integer.valueOf(this.prefetchCount) : null, z ? Integer.valueOf(this.receiveLink.getCredit()) : null, (!z || this.prefetchedMessages == null) ? null : Integer.valueOf(this.prefetchedMessages.size()), Boolean.valueOf(this.isEpochReceiver));
    }
}
