/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.servicebus;

import com.microsoft.azure.servicebus.ClientConstants;
import com.microsoft.azure.servicebus.ClientEntity;
import com.microsoft.azure.servicebus.ErrorContext;
import com.microsoft.azure.servicebus.ExceptionUtil;
import com.microsoft.azure.servicebus.IErrorContextProvider;
import com.microsoft.azure.servicebus.IteratorUtil;
import com.microsoft.azure.servicebus.MessagingFactory;
import com.microsoft.azure.servicebus.PayloadSizeExceededException;
import com.microsoft.azure.servicebus.ReplayableWorkItem;
import com.microsoft.azure.servicebus.RetryPolicy;
import com.microsoft.azure.servicebus.SenderContext;
import com.microsoft.azure.servicebus.ServiceBusException;
import com.microsoft.azure.servicebus.StringUtil;
import com.microsoft.azure.servicebus.TimeoutTracker;
import com.microsoft.azure.servicebus.Timer;
import com.microsoft.azure.servicebus.TimerType;
import com.microsoft.azure.servicebus.amqp.IAmqpSender;
import com.microsoft.azure.servicebus.amqp.SendLinkHandler;
import com.microsoft.azure.servicebus.amqp.SessionHandler;
import java.nio.BufferOverflowException;
import java.time.Duration;
import java.util.Iterator;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Extendable;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.message.Message;

public class MessageSender
extends ClientEntity
implements IAmqpSender,
IErrorContextProvider {
    private static final Logger TRACE_LOGGER = Logger.getLogger("servicebus.trace");
    private final MessagingFactory underlyingFactory;
    private final String sendPath;
    private final Duration operationTimeout;
    private final RetryPolicy retryPolicy;
    private final Runnable operationTimer;
    private final Duration timerTimeout;
    private ConcurrentHashMap<byte[], ReplayableWorkItem<Void>> pendingSendWaiters;
    private Sender sendLink;
    private CompletableFuture<MessageSender> linkFirstOpen;
    private AtomicLong nextTag;
    private TimeoutTracker openLinkTracker;
    private boolean linkCreateScheduled;
    private Object linkCreateLock;
    private Exception lastKnownLinkError;

    public static CompletableFuture<MessageSender> create(MessagingFactory factory, String sendLinkName, String senderPath) {
        final MessageSender msgSender = new MessageSender(factory, sendLinkName, senderPath);
        msgSender.openLinkTracker = TimeoutTracker.create(factory.getOperationTimeout());
        msgSender.initializeLinkOpen(msgSender.openLinkTracker);
        msgSender.linkCreateScheduled = true;
        Timer.schedule(new Runnable(){

            @Override
            public void run() {
                msgSender.sendLink = msgSender.createSendLink();
            }
        }, Duration.ofSeconds(0L), TimerType.OneTimeRun);
        return msgSender.linkFirstOpen;
    }

    private MessageSender(MessagingFactory factory, String sendLinkName, String senderPath) {
        super(sendLinkName);
        this.sendPath = senderPath;
        this.underlyingFactory = factory;
        this.operationTimeout = factory.getOperationTimeout();
        this.timerTimeout = this.operationTimeout.getSeconds() > 9L ? this.operationTimeout.dividedBy(3L) : Duration.ofSeconds(5L);
        this.lastKnownLinkError = null;
        this.retryPolicy = factory.getRetryPolicy();
        this.pendingSendWaiters = new ConcurrentHashMap();
        this.nextTag = new AtomicLong(0L);
        this.linkCreateLock = new Object();
        this.operationTimer = new Runnable(){

            @Override
            public void run() {
                if (MessageSender.this.pendingSendWaiters != null) {
                    Map.Entry pendingSend;
                    Iterator pendingDeliveries = MessageSender.this.pendingSendWaiters.entrySet().iterator();
                    while (pendingDeliveries.hasNext() && (pendingSend = pendingDeliveries.next()) != null) {
                        ReplayableWorkItem pendingSendWork = (ReplayableWorkItem)pendingSend.getValue();
                        if (pendingSendWork.getTimeoutTracker().remaining().compareTo(ClientConstants.TIMER_TOLERANCE) >= 0) continue;
                        pendingDeliveries.remove();
                        Exception cause = pendingSendWork.getLastKnownException() == null ? MessageSender.this.lastKnownLinkError : pendingSendWork.getLastKnownException();
                        ServiceBusException exception = new ServiceBusException(cause != null && cause instanceof ServiceBusException ? ((ServiceBusException)cause).getIsTransient() : true, String.format(Locale.US, "Send operation timed out.", MessageSender.this.getSendPath(), MessageSender.this.sendLink.getName()), cause);
                        ExceptionUtil.completeExceptionally(pendingSendWork.getWork(), exception, MessageSender.this);
                    }
                }
            }
        };
    }

    public String getSendPath() {
        return this.sendPath;
    }

    private CompletableFuture<Void> send(byte[] bytes, int arrayOffset, int messageFormat) {
        return this.send(bytes, arrayOffset, messageFormat, null, null);
    }

    private CompletableFuture<Void> send(byte[] bytes, int arrayOffset, int messageFormat, CompletableFuture<Void> onSend, TimeoutTracker tracker) {
        byte[] tag = String.valueOf(this.nextTag.incrementAndGet()).getBytes();
        if (this.sendLink.getLocalState() == EndpointState.CLOSED) {
            this.scheduleRecreate(Duration.ofMillis(1L));
        } else {
            Delivery dlv = this.sendLink.delivery(tag);
            dlv.setMessageFormat(messageFormat);
            int sentMsgSize = this.sendLink.send(bytes, 0, arrayOffset);
            assert (sentMsgSize != arrayOffset) : "Contract of the ProtonJ library for Sender.Send API changed";
            this.sendLink.advance();
        }
        CompletableFuture<Void> onSendFuture = onSend == null ? new CompletableFuture<Void>() : onSend;
        this.pendingSendWaiters.put(tag, new ReplayableWorkItem(bytes, arrayOffset, messageFormat, onSendFuture, tracker == null ? this.operationTimeout : tracker.remaining()));
        return onSendFuture;
    }

    public CompletableFuture<Void> send(Iterable<Message> messages) {
        int encodedSize;
        if (messages == null || IteratorUtil.sizeEquals(messages, 0)) {
            throw new IllegalArgumentException("Sending Empty batch of messages is not allowed.");
        }
        Message firstMessage = messages.iterator().next();
        if (IteratorUtil.sizeEquals(messages, 1)) {
            return this.send(firstMessage);
        }
        Message batchMessage = Proton.message();
        batchMessage.setMessageAnnotations(firstMessage.getMessageAnnotations());
        byte[] bytes = new byte[262144];
        int byteArrayOffset = encodedSize = batchMessage.encode(bytes, 0, 262144);
        for (Message amqpMessage : messages) {
            Message messageWrappedByData = Proton.message();
            byte[] messageBytes = new byte[262144];
            int messageSizeBytes = amqpMessage.encode(messageBytes, 0, 262144);
            messageWrappedByData.setBody((Section)new Data(new Binary(messageBytes, 0, messageSizeBytes)));
            try {
                encodedSize = messageWrappedByData.encode(bytes, byteArrayOffset, 262144 - byteArrayOffset - 1);
            }
            catch (BufferOverflowException exception) {
                CompletableFuture<Void> sendTask = new CompletableFuture<Void>();
                sendTask.completeExceptionally(new PayloadSizeExceededException(String.format("Size of the payload exceeded Maximum message size: %s kb", 256), (Throwable)exception));
                return sendTask;
            }
            byteArrayOffset += encodedSize;
        }
        return this.send(bytes, byteArrayOffset, -2147404032);
    }

    public CompletableFuture<Void> send(Message msg) {
        byte[] bytes = new byte[262144];
        int encodedSize = 0;
        try {
            encodedSize = msg.encode(bytes, 0, 262144);
        }
        catch (BufferOverflowException exception) {
            CompletableFuture<Void> sendTask = new CompletableFuture<Void>();
            sendTask.completeExceptionally(new PayloadSizeExceededException(String.format("Size of the payload exceeded Maximum message size: %s kb", 256), (Throwable)exception));
            return sendTask;
        }
        return this.send(bytes, encodedSize, 0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onOpenComplete(Exception completionException) {
        if (completionException == null) {
            this.openLinkTracker = null;
            this.retryPolicy.resetRetryCount(this.getClientId());
            this.lastKnownLinkError = null;
            if (!this.linkFirstOpen.isDone()) {
                this.linkFirstOpen.complete(this);
                Timer.schedule(this.operationTimer, this.timerTimeout, TimerType.RepeatRun);
            } else if (!this.pendingSendWaiters.isEmpty()) {
                ConcurrentHashMap<byte[], ReplayableWorkItem<Void>> unacknowledgedSends = new ConcurrentHashMap<byte[], ReplayableWorkItem<Void>>();
                unacknowledgedSends.putAll(this.pendingSendWaiters);
                if (unacknowledgedSends.size() > 0) {
                    unacknowledgedSends.forEachEntry(1L, new Consumer<Map.Entry<byte[], ReplayableWorkItem<Void>>>(){

                        @Override
                        public void accept(Map.Entry<byte[], ReplayableWorkItem<Void>> sendWork) {
                            ReplayableWorkItem pendingSend = (ReplayableWorkItem)MessageSender.this.pendingSendWaiters.remove(sendWork.getKey());
                            if (pendingSend != null) {
                                MessageSender.this.send(pendingSend.getMessage(), pendingSend.getEncodedMessageSize(), pendingSend.getMessageFormat(), pendingSend.getWork(), pendingSend.getTimeoutTracker());
                            }
                        }
                    });
                }
                unacknowledgedSends.clear();
            }
        } else {
            ExceptionUtil.completeExceptionally(this.linkFirstOpen, completionException, this);
        }
        Object object = this.linkCreateLock;
        synchronized (object) {
            this.linkCreateScheduled = false;
        }
    }

    @Override
    public void onClose(ErrorCondition condition) {
        Exception completionException = condition != null ? ExceptionUtil.toException(condition) : new ServiceBusException(true, "The entity has been close due to transient failures (underlying link closed), please retry the operation.");
        this.onError(completionException);
    }

    @Override
    public void onError(Exception completionException) {
        Duration remainingTime = this.openLinkTracker == null ? this.operationTimeout : (this.openLinkTracker.elapsed().compareTo(this.operationTimeout) > 0 ? Duration.ofSeconds(0L) : this.operationTimeout.minus(this.openLinkTracker.elapsed()));
        Duration retryInterval = this.retryPolicy.getNextRetryInterval(this.getClientId(), completionException, remainingTime);
        if (completionException != null) {
            this.lastKnownLinkError = completionException;
        }
        if (retryInterval != null) {
            this.scheduleRecreate(retryInterval);
            return;
        }
        this.onOpenComplete(completionException);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void scheduleRecreate(Duration runAfter) {
        Object object = this.linkCreateLock;
        synchronized (object) {
            if (this.linkCreateScheduled) {
                return;
            }
            this.linkCreateScheduled = true;
        }
        Timer.schedule(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                if (MessageSender.this.sendLink.getLocalState() != EndpointState.CLOSED) {
                    return;
                }
                Sender sender = MessageSender.this.createSendLink();
                if (sender != null) {
                    Sender oldSender = MessageSender.this.sendLink;
                    MessageSender.this.underlyingFactory.deregisterForConnectionError((Link)oldSender);
                    MessageSender.this.sendLink = sender;
                } else {
                    Object object = MessageSender.this.linkCreateLock;
                    synchronized (object) {
                        MessageSender.this.linkCreateScheduled = false;
                    }
                }
                MessageSender.this.retryPolicy.incrementRetryCount(MessageSender.this.getClientId());
            }
        }, runAfter, TimerType.OneTimeRun);
    }

    @Override
    public void onSendComplete(final byte[] deliveryTag, DeliveryState outcome) {
        TRACE_LOGGER.log(Level.FINE, String.format("linkName[%s]", this.sendLink.getName()));
        ReplayableWorkItem<Void> pendingSendWorkItem = this.pendingSendWaiters.get(deliveryTag);
        if (pendingSendWorkItem != null) {
            CompletableFuture<Object> pendingSend = pendingSendWorkItem.getWork();
            if (outcome instanceof Accepted) {
                this.retryPolicy.resetRetryCount(this.getClientId());
                this.pendingSendWaiters.remove(deliveryTag);
                pendingSend.complete(null);
            } else if (outcome instanceof Rejected) {
                Rejected rejected = (Rejected)outcome;
                ErrorCondition error = rejected.getError();
                Exception exception = ExceptionUtil.toException(error);
                Duration retryInterval = this.retryPolicy.getNextRetryInterval(this.getClientId(), exception, pendingSendWorkItem.getTimeoutTracker().remaining());
                if (retryInterval == null) {
                    this.pendingSendWaiters.remove(deliveryTag);
                    ExceptionUtil.completeExceptionally(pendingSend, exception, this);
                } else {
                    pendingSendWorkItem.setLastKnownException(exception);
                    Timer.schedule(new Runnable(){

                        @Override
                        public void run() {
                            MessageSender.this.reSend(deliveryTag);
                        }
                    }, retryInterval, TimerType.OneTimeRun);
                }
            } else {
                this.pendingSendWaiters.remove(deliveryTag);
                ExceptionUtil.completeExceptionally(pendingSend, new ServiceBusException(false, outcome.toString()), this);
            }
        }
    }

    private void reSend(Object deliveryTag) {
        ReplayableWorkItem<Void> pendingSend = this.pendingSendWaiters.remove(deliveryTag);
        if (pendingSend != null) {
            byte[] tag = String.valueOf(this.nextTag.incrementAndGet()).getBytes();
            Delivery dlv = this.sendLink.delivery(tag);
            dlv.setMessageFormat(pendingSend.getMessageFormat());
            int sentMsgSize = this.sendLink.send(pendingSend.getMessage(), 0, pendingSend.getEncodedMessageSize());
            assert (sentMsgSize != pendingSend.getEncodedMessageSize()) : "Contract of the ProtonJ library for Sender.Send API changed";
            CompletableFuture onSend = new CompletableFuture();
            this.pendingSendWaiters.put(tag, new ReplayableWorkItem(pendingSend.getMessage(), pendingSend.getEncodedMessageSize(), pendingSend.getMessageFormat(), onSend, this.operationTimeout));
            this.sendLink.advance();
        }
    }

    private Sender createSendLink() {
        Connection connection = null;
        try {
            connection = this.underlyingFactory.getConnection().get(this.operationTimeout.getSeconds(), TimeUnit.SECONDS);
        }
        catch (InterruptedException | ExecutionException exception) {
            Throwable throwable = exception.getCause();
            if (throwable != null && throwable instanceof Exception) {
                this.onError((Exception)throwable);
            }
            if (exception instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            return null;
        }
        catch (TimeoutException exception) {
            this.onError(new ServiceBusException(false, "Connection creation timed out.", exception));
            return null;
        }
        if (connection == null || connection.getLocalState() == EndpointState.CLOSED) {
            return null;
        }
        Session session = connection.session();
        session.setOutgoingWindow(Integer.MAX_VALUE);
        session.open();
        BaseHandler.setHandler((Extendable)session, (Handler)new SessionHandler(this.sendPath));
        String sendLinkName = StringUtil.getRandomString();
        sendLinkName = sendLinkName.concat("_").concat(connection.getRemoteContainer());
        Sender sender = session.sender(sendLinkName);
        Target target = new Target();
        target.setAddress(this.sendPath);
        sender.setTarget((org.apache.qpid.proton.amqp.transport.Target)target);
        Source source = new Source();
        sender.setSource((org.apache.qpid.proton.amqp.transport.Source)source);
        sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
        SendLinkHandler handler = new SendLinkHandler(this);
        BaseHandler.setHandler((Extendable)sender, (Handler)handler);
        this.underlyingFactory.registerForConnectionError((Link)sender);
        sender.open();
        return sender;
    }

    private void initializeLinkOpen(TimeoutTracker timeout) {
        this.linkFirstOpen = new CompletableFuture();
        Timer.schedule(new Runnable(){

            @Override
            public void run() {
                if (!MessageSender.this.linkFirstOpen.isDone()) {
                    ServiceBusException operationTimedout = new ServiceBusException(true, String.format(Locale.US, "SendLink(%s).open() on Entity(%s) timed out", MessageSender.this.sendLink.getName(), MessageSender.this.getSendPath()));
                    if (TRACE_LOGGER.isLoggable(Level.WARNING)) {
                        TRACE_LOGGER.log(Level.WARNING, String.format(Locale.US, "message Sender(linkName: %s, path: %s) open call timedout", MessageSender.this.getClientId(), MessageSender.this.sendPath), operationTimedout);
                    }
                    ExceptionUtil.completeExceptionally(MessageSender.this.linkFirstOpen, operationTimedout, MessageSender.this);
                }
            }
        }, timeout.remaining(), TimerType.OneTimeRun);
    }

    @Override
    public CompletableFuture<Void> close() {
        if (this.sendLink != null && this.sendLink.getLocalState() != EndpointState.CLOSED) {
            this.sendLink.close();
        }
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public ErrorContext getContext() {
        boolean isLinkOpened;
        boolean bl = isLinkOpened = this.linkFirstOpen != null && this.linkFirstOpen.isDone();
        String referenceId = this.sendLink != null && this.sendLink.getRemoteProperties() != null && this.sendLink.getRemoteProperties().containsKey(ClientConstants.TRACKING_ID_PROPERTY) ? this.sendLink.getRemoteProperties().get(ClientConstants.TRACKING_ID_PROPERTY).toString() : (this.sendLink != null ? this.sendLink.getName() : null);
        SenderContext errorContext = new SenderContext(this.underlyingFactory != null ? this.underlyingFactory.getHostName() : null, this.sendPath, referenceId, isLinkOpened ? Integer.valueOf(this.sendLink.getCredit()) : null);
        return errorContext;
    }
}

