/*
 * Decompiled with CFR 0.152.
 */
package com.pushtechnology.diffusion.multiplexer.messageclient;

import com.pushtechnology.diffusion.io.nio.MultiplexerExecutor;
import com.pushtechnology.diffusion.logs.i18n.I18nLogger;
import com.pushtechnology.diffusion.message.Message;
import com.pushtechnology.diffusion.message.MessageChannel;
import com.pushtechnology.diffusion.message.MessageChannelClosedReason;
import com.pushtechnology.diffusion.message.MessageChannelFeeder;
import com.pushtechnology.diffusion.message.MessageChannelListener;
import com.pushtechnology.diffusion.messagequeue.OutboundMessageQueue;
import com.pushtechnology.diffusion.messagequeue.RecoveryBuffer;
import com.pushtechnology.diffusion.messagequeue.RecoveryBufferImpl;
import com.pushtechnology.diffusion.multiplexer.BaseMultiplexerClient;
import com.pushtechnology.diffusion.multiplexer.Multiplexer;
import com.pushtechnology.diffusion.multiplexer.MultiplexerEvent;
import com.pushtechnology.diffusion.multiplexer.MultiplexerState;
import com.pushtechnology.diffusion.multiplexer.diagnostics.SessionFields;
import com.pushtechnology.diffusion.multiplexer.messageclient.MessageQueueMultiplexerClient;
import com.pushtechnology.diffusion.multiplexer.messageclient.MessageQueueMultiplexerClientCallbacks;
import com.pushtechnology.diffusion.multiplexer.messageclient.MessageQueueMultiplexerState;
import com.pushtechnology.diffusion.reports.Record;
import com.pushtechnology.diffusion.threads.MultiplexerOnly;
import com.pushtechnology.diffusion.utils.ConfigurationUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import net.jcip.annotations.NotThreadSafe;
import org.slf4j.Logger;

@NotThreadSafe
public abstract class AbstractMessageQueueMultiplexerClient<T extends Multiplexer, C extends MessageQueueMultiplexerClientCallbacks>
extends BaseMultiplexerClient<T>
implements MessageQueueMultiplexerClient,
MessageQueueMultiplexerState,
MessageChannelFeeder {
    public static final String NETWORK_PUSH_TIMEOUT_PROPERTY = "diffusion.outbound.push_timeout";
    private static final Long NETWORK_PUSH_RETRY_DELAY_MS = ConfigurationUtils.getLongSystemProperty("diffusion.outbound.push_retry_delay", 1L);
    private static final Logger LOG = I18nLogger.getLogger(AbstractMessageQueueMultiplexerClient.class);
    private static final int RECOVERY_BUFFER_INDEX_SIZE = ConfigurationUtils.getIntegerSystemProperty("diffusion.recoverybuffer.size", 8);
    private final OutboundMessageQueue messageQueue;
    private final C clientCallbacks;
    @MultiplexerOnly
    private final RecoveryBuffer recoveryBuffer;
    @MultiplexerOnly
    private MessageChannel messageChannel;

    protected AbstractMessageQueueMultiplexerClient(T multiplexer, OutboundMessageQueue messageQueue, int recoveryBufferSize, C clientCallbacks) {
        super(multiplexer);
        this.messageQueue = messageQueue;
        this.clientCallbacks = clientCallbacks;
        this.recoveryBuffer = recoveryBufferSize > 0 && clientCallbacks.getReconnectPeriod() > 0L ? new RecoveryBufferImpl(recoveryBufferSize, RECOVERY_BUFFER_INDEX_SIZE) : RecoveryBuffer.EMPTY_RECOVERY_BUFFER;
    }

    protected final C getClientCallbacks() {
        return this.clientCallbacks;
    }

    @MultiplexerOnly
    protected final MessageChannel getMessageChannel() {
        return this.messageChannel;
    }

    @Override
    @MultiplexerOnly
    public OutboundMessageQueue getMessageQueue() {
        return this.messageQueue;
    }

    @MultiplexerOnly
    private boolean isReadyToProcess() {
        MessageChannel channel = this.messageChannel;
        return channel != null && this.messageQueue.hasAvailableMessages() && channel.prepareToSend();
    }

    @Override
    public final void dispatchInNonMultiplexerThread(Runnable runnable) {
        this.getMultiplexer().dispatchInNonMultiplexerThread(runnable);
    }

    @Override
    @MultiplexerOnly
    public final boolean process(MultiplexerState multiplexerState, long now) {
        if (!this.isReadyToProcess()) {
            return false;
        }
        this.sendMessages(multiplexerState, now);
        if (this.isReadyToProcess()) {
            multiplexerState.queueForProcessing(this);
        }
        return true;
    }

    @MultiplexerOnly
    protected void sendMessages(MultiplexerState multiplexerState, long now) {
        MessageChannel mc = this.messageChannel;
        LOG.trace("Sending messages to {}...", (Object)mc);
        mc.sendMessages(multiplexerState, this, now, this);
        if (this.recoveryBuffer.markTime(now)) {
            multiplexerState.trimRecoveryBufferAfter(this.getTrimPeriod(multiplexerState), this);
        }
    }

    private int getTrimPeriod(MultiplexerState multiplexerState) {
        return (int)Math.min(this.clientCallbacks.getReconnectPeriod(), (long)multiplexerState.getMaximumRecoveryTrimTime());
    }

    @Override
    @MultiplexerOnly
    public final boolean sendMessage(MultiplexerState multiplexerState, Message message) {
        if (!this.isRegistered()) {
            return false;
        }
        OutboundMessageQueue queue = this.getMessageQueue();
        if (!this.tryQueue(queue, multiplexerState, message)) {
            this.closeSessionOnQueueOverflow(multiplexerState, queue, message);
            return false;
        }
        multiplexerState.queueForProcessing(this);
        return true;
    }

    protected final void closeSessionOnQueueOverflow(MultiplexerState multiplexerState, OutboundMessageQueue queue, Message message) {
        String report = queue.createSummary();
        if (LOG.isWarnEnabled()) {
            String reason = queue.describeOverflow();
            LOG.warn("QUEUES_MESSAGE_QUEUE_LIMIT_REACHED", this.clientCallbacks, message.bodySize(), reason, report);
        }
        if (this.messageChannel != null) {
            this.messageChannel.closeOutbound(MessageChannelClosedReason.MESSAGE_QUEUE_LIMIT_REACHED, this, multiplexerState.getDirectByteBufferPool());
        }
        this.unregister(multiplexerState);
    }

    private boolean tryQueue(OutboundMessageQueue queue, MultiplexerState multiplexerState, Message message) {
        switch (queue.queue(message)) {
            case ACCEPTED: {
                return true;
            }
            case QUEUE_FULL: {
                return this.recoverFromOverflow(multiplexerState, queue, message);
            }
        }
        return false;
    }

    @MultiplexerOnly
    protected boolean recoverFromOverflow(MultiplexerState multiplexerState, OutboundMessageQueue queue, Message message) {
        long start = multiplexerState.getLastTime();
        long deadline = start + (long)this.getNetworkPushTimeout();
        long now = start;
        while (now < deadline) {
            MessageChannel channel = this.getMessageChannel();
            if (channel == null || !channel.isOutputOpen()) {
                return false;
            }
            if (this.recoverQueueSpaceBySendingData(multiplexerState, queue, message, channel, now)) {
                return true;
            }
            now = AbstractMessageQueueMultiplexerClient.processOtherClients(multiplexerState, now, Math.min(deadline, now + NETWORK_PUSH_RETRY_DELAY_MS));
        }
        return false;
    }

    private static long processOtherClients(MultiplexerState multiplexerState, long start, long end) {
        long now = start;
        while (multiplexerState.processClient() < end - now) {
            now = multiplexerState.getLastTime();
        }
        now = multiplexerState.getLastTime();
        LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(end - now));
        return now;
    }

    protected int getNetworkPushTimeout() {
        int overridden = ConfigurationUtils.getIntegerSystemProperty(NETWORK_PUSH_TIMEOUT_PROPERTY, -1);
        if (overridden >= 0) {
            return overridden;
        }
        MessageChannel channel = this.getMessageChannel();
        if (channel == null) {
            return 0;
        }
        return Math.max(1, channel.getOutputBufferSize() >> 7);
    }

    private boolean recoverQueueSpaceBySendingData(MultiplexerState multiplexerState, OutboundMessageQueue queue, Message message, MessageChannel channel, long now) {
        try {
            if (channel.prepareToSend() || channel.nonBlockingFlush(multiplexerState.getDirectByteBufferPool())) {
                this.sendMessages(multiplexerState, now);
                return queue.hasCapacityFor(message);
            }
        }
        catch (IOException e) {
            LOG.trace("nonBlockingFlush failed", e);
        }
        return false;
    }

    @Override
    @MultiplexerOnly
    protected void doUnregister(MultiplexerState multiplexerState) {
        this.messageChannel = null;
        this.messageQueue.clear();
        this.recoveryBuffer.clear();
    }

    @Override
    @MultiplexerOnly
    public final Message peekMessage() {
        return this.getMessageQueue().peek();
    }

    @Override
    @MultiplexerOnly
    public final Message pollMessage() {
        Message removed = this.messageQueue.poll();
        this.recoveryBuffer.put(removed);
        this.messageQueue.addToSequence(1);
        return removed;
    }

    @Override
    @MultiplexerOnly
    public final void trimRecoveryBuffer(MultiplexerState state, long now) {
        int trimPeriod;
        MessageChannel channel = this.messageChannel;
        if (channel != null && channel.isOutputOpen() && this.recoveryBuffer.flush(now - (long)(trimPeriod = this.getTrimPeriod(state)))) {
            state.trimRecoveryBufferAfter(trimPeriod, this);
        }
    }

    @MultiplexerOnly
    protected final void recoverMessages(MultiplexerState multiplexerState, int recipientLastSequence) {
        int currentSequence = this.messageQueue.getSequence();
        int missing = currentSequence - recipientLastSequence;
        if (this.recoveryBuffer.canRecover(missing)) {
            ArrayList<Message> snapshot = new ArrayList<Message>(this.messageQueue.size());
            for (Message m : this.messageQueue) {
                snapshot.add(m);
            }
            this.messageQueue.clear();
            this.recoveryBuffer.recover(missing, this.messageQueue::queue);
            for (Message m : snapshot) {
                this.messageQueue.queue(m);
            }
            this.messageQueue.setSequence(recipientLastSequence);
            this.recoveryBuffer.clear();
        } else {
            int outboundLoss = currentSequence - this.recoveryBuffer.size() - recipientLastSequence;
            LOG.warn("CLIENTS_RECONNECT_LOST_OUTBOUND_MESSAGES", (Object)outboundLoss, (Object)this);
            MessageChannel mc = this.messageChannel;
            if (mc != null) {
                mc.closeOutbound(MessageChannelClosedReason.MESSAGES_LOST, this, multiplexerState.getDirectByteBufferPool());
            }
            this.unregister(multiplexerState);
        }
    }

    @MultiplexerOnly
    protected final void resetOutboundSequenceAndRecoveryBuffer() {
        this.recoveryBuffer.clear();
        this.messageQueue.setSequence(0);
    }

    @Override
    public final int getAvailableSequence() {
        return this.messageQueue.getSequence() + 1 - this.recoveryBuffer.size();
    }

    @Override
    public void closeMessageChannelAndUnregister(Runnable postCloseAction) {
        this.enqueueEvent(new CloseChannelEvent(postCloseAction));
    }

    @Override
    @MultiplexerOnly
    public final void setMessageChannel(MultiplexerState state, MessageChannel newChannel) {
        this.messageChannel = newChannel;
        if (this.isReadyToProcess()) {
            state.queueForProcessing(this);
        }
    }

    @Override
    public final void executeInMultiplexer(MultiplexerExecutor.Task task) {
        this.enqueueEvent(state -> {
            task.execute(state.getDirectByteBufferPool());
            if (this.isReadyToProcess()) {
                state.queueForProcessing(this);
            }
        });
    }

    @Override
    public boolean delaySend(int writeThresholdBytes) {
        OutboundMessageQueue mq = this.messageQueue;
        return mq.bytesQueued() < (long)writeThresholdBytes && mq.canDelaySend();
    }

    public final String toString() {
        return "MultiplexerClient[" + String.valueOf(this.getClientCallbacks()) + "]";
    }

    @Override
    @MultiplexerOnly
    public void diagnosticReport(Record<SessionFields> record) {
        super.diagnosticReport(record);
        record.set(SessionFields.DESCRIPTION, this.clientCallbacks);
        record.set(SessionFields.MQ_SIZE, this.messageQueue.size());
        record.set(SessionFields.MQ_LARGEST, this.messageQueue.getLargestSize());
        record.set(SessionFields.MQ_SEQUENCE, this.messageQueue.getSequence());
        record.set(SessionFields.RECOVERY_BUFFER_SIZE, this.recoveryBuffer.size());
        record.set(SessionFields.MESSAGE_CHANNEL, this.messageChannel);
    }

    private final class CloseChannelEvent
    implements MultiplexerEvent<MultiplexerState> {
        private final Runnable postCloseAction;

        private CloseChannelEvent(Runnable postCloseAction) {
            this.postCloseAction = postCloseAction;
        }

        @Override
        public void handleEvent(MultiplexerState state) {
            MessageChannel mc = AbstractMessageQueueMultiplexerClient.this.getMessageChannel();
            if (mc != null && mc.isOutputOpen()) {
                mc.setListener(new MessageChannelListener.NullMessageChannelListener(){

                    @Override
                    public void messageChannelClosed(MessageChannelClosedReason reason, Throwable cause) {
                        AbstractMessageQueueMultiplexerClient.this.enqueueEvent(state -> {
                            AbstractMessageQueueMultiplexerClient.this.unregister(state);
                            CloseChannelEvent.this.postCloseAction.run();
                        });
                    }
                });
                mc.closeOutbound(MessageChannelClosedReason.LOCAL_CLOSE_REQUESTED, AbstractMessageQueueMultiplexerClient.this, state.getDirectByteBufferPool());
            } else {
                AbstractMessageQueueMultiplexerClient.this.unregister(state);
                this.postCloseAction.run();
            }
        }
    }
}

