/*
 * Decompiled with CFR 0.152.
 */
package com.pushtechnology.diffusion.comms.http;

import com.pushtechnology.diffusion.api.internal.connection.ServerDetails;
import com.pushtechnology.diffusion.clients.token.SessionToken;
import com.pushtechnology.diffusion.comms.connection.ConnectionInfo;
import com.pushtechnology.diffusion.comms.connection.NetworkChannelFactory;
import com.pushtechnology.diffusion.comms.connection.ProtocolVersion;
import com.pushtechnology.diffusion.comms.http.ChannelCreationConfiguration;
import com.pushtechnology.diffusion.comms.http.MultiplexerChannelCreator;
import com.pushtechnology.diffusion.comms.http.NetworkChannelCreator;
import com.pushtechnology.diffusion.comms.http.PollingClientOutboundHandshake;
import com.pushtechnology.diffusion.comms.http.RequestBistable;
import com.pushtechnology.diffusion.comms.websocket.MaxMessageSizeException;
import com.pushtechnology.diffusion.content.encoding.ZlibCompression;
import com.pushtechnology.diffusion.http.HTTPConstants;
import com.pushtechnology.diffusion.http.HTTPHeaders;
import com.pushtechnology.diffusion.http.URIEncoder;
import com.pushtechnology.diffusion.io.base64.Base64OutputStream;
import com.pushtechnology.diffusion.io.bytes.IBytesOutputStream;
import com.pushtechnology.diffusion.io.bytes.IBytesOutputStreamImpl;
import com.pushtechnology.diffusion.io.http.ParseHTTPException;
import com.pushtechnology.diffusion.io.nio.MultiplexerExecutor;
import com.pushtechnology.diffusion.io.nio.NetworkChannel;
import com.pushtechnology.diffusion.io.nio.NetworkContext;
import com.pushtechnology.diffusion.io.nio.ReadChannelHandler;
import com.pushtechnology.diffusion.io.nio.ReadControlSource;
import com.pushtechnology.diffusion.logs.i18n.I18nLogger;
import com.pushtechnology.diffusion.message.AbortNotificationMessage;
import com.pushtechnology.diffusion.message.Message;
import com.pushtechnology.diffusion.message.MessageChannelClosedReason;
import com.pushtechnology.diffusion.message.MessageChannelFeeder;
import com.pushtechnology.diffusion.message.MessageChannelListener;
import com.pushtechnology.diffusion.message.MessageEncoding;
import com.pushtechnology.diffusion.message.MessageLogger;
import com.pushtechnology.diffusion.message.MessageParser;
import com.pushtechnology.diffusion.messagechannel.MessageChannelImpl;
import com.pushtechnology.diffusion.messagechannel.MessageChannelMultiplexerClient;
import com.pushtechnology.diffusion.multiplexer.MultiplexerState;
import com.pushtechnology.diffusion.threads.InboundThreadOnly;
import com.pushtechnology.diffusion.threads.MultiplexerOnly;
import com.pushtechnology.diffusion.util.concurrent.threads.ExecutionPool;
import com.pushtechnology.diffusion.utils.Base64;
import com.pushtechnology.diffusion.utils.CharsetUtils;
import com.pushtechnology.diffusion.utils.ConfigurationUtils;
import com.pushtechnology.diffusion.utils.FastEncoder;
import com.pushtechnology.diffusion.utils.bytebuffer.DirectByteBufferPool;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.zip.DataFormatException;
import net.jcip.annotations.NotThreadSafe;
import org.slf4j.Logger;

public final class PollingClientMessageChannel
extends MessageChannelImpl
implements ReadChannelHandler {
    public static final String CLIENT_HTTP_PIPELINING = "diffusion.client.http_pipelining";
    private static final Logger LOG = I18nLogger.getLogger(PollingClientMessageChannel.class);
    private static final String DIFFUSION_CONNECTION_HEADER = "diffusion-connection";
    private static final String RECONNECT_HEADER_VALUE = "reconnect";
    private final boolean httpPipelining;
    private final PollChannel pollChannel;
    private final SendChannel sendChannel;
    private final int maximumMessageSize;
    private final NetworkContext networkContext;
    private final NetworkChannelFactory networkChannelFactory;
    private final ServerDetails serverDetails;
    private final ExecutionPool inboundThreadPool;
    private final MessageParser messageParser;
    private final SessionToken token;

    public static PollingClientMessageChannel createPollingClientMessageChannel(int maximumMessageSize, NetworkContext networkContext, ConnectionInfo connectionInfo, NetworkChannel networkChannel, NetworkChannelFactory networkChannelFactory, MessageParser messageParser, SessionToken token, ServerDetails serverDetails, ExecutionPool inboundThreadPool, MessageChannelMultiplexerClient multiplexerClient, MessageChannelListener listener) {
        PollingClientMessageChannel messageChannel = new PollingClientMessageChannel(maximumMessageSize, networkContext, connectionInfo, networkChannel, networkChannelFactory, messageParser, token, serverDetails, inboundThreadPool, multiplexerClient, listener);
        messageChannel.pollChannel.poll();
        return messageChannel;
    }

    private PollingClientMessageChannel(int maximumMessageSize, NetworkContext networkContext, ConnectionInfo connectionInfo, NetworkChannel networkChannel, NetworkChannelFactory networkChannelFactory, MessageParser messageParser, SessionToken token, ServerDetails serverDetails, ExecutionPool inboundThreadPool, MessageChannelMultiplexerClient multiplexerClient, MessageChannelListener listener) {
        super(connectionInfo, serverDetails.getOutputBufferSize(), multiplexerClient, listener);
        this.token = token;
        this.networkContext = networkContext;
        this.networkChannelFactory = networkChannelFactory;
        this.serverDetails = serverDetails;
        this.inboundThreadPool = inboundThreadPool;
        this.messageParser = messageParser;
        this.maximumMessageSize = maximumMessageSize;
        this.httpPipelining = ConfigurationUtils.getBooleanSystemProperty(CLIENT_HTTP_PIPELINING);
        this.pollChannel = new PollChannel(networkChannel);
        this.sendChannel = new SendChannel();
    }

    @Override
    protected void writeBufferComplete() {
        if (this.httpPipelining) {
            this.sendChannel.setReady();
            this.setReadyToSend();
        }
    }

    @Override
    @MultiplexerOnly
    protected void doSendMessages(MultiplexerState state, Message firstMessage, MessageChannelFeeder feeder, long now, MultiplexerExecutor partialWriteExecutor) {
        this.sendChannel.doSendMessages(state, firstMessage, feeder, now, partialWriteExecutor);
    }

    @Override
    @MultiplexerOnly
    public boolean prepareToSend() {
        return super.prepareToSend() && this.sendChannel.prepareToSend();
    }

    boolean isSendChannelClosed() {
        return this.sendChannel.sendState.isClosed();
    }

    boolean isPollChannelClosed() {
        return this.pollChannel.pollState.isClosed();
    }

    @Override
    protected void forceCloseConnections() {
        this.pollChannel.fail();
        this.sendChannel.fail();
    }

    @Override
    @MultiplexerOnly
    public void closeOutbound(MessageChannelClosedReason reason, MultiplexerExecutor partialCloseExecutor, DirectByteBufferPool directByteBufferPool) {
        this.sendChannel.closeOutbound(reason, partialCloseExecutor, directByteBufferPool);
    }

    @Override
    public NetworkChannel getChannel() {
        return this.pollChannel.getChannel();
    }

    @Override
    @InboundThreadOnly
    public ReadControlSource.ReadControl handleInput(ByteBuffer buffer) throws IOException {
        return this.pollChannel.handleInput(buffer);
    }

    @Override
    public String toString() {
        return this.getClass().getSimpleName() + "[" + this.outputStateToString() + "," + this.pollChannel.toString() + "," + this.sendChannel.toString() + "]";
    }

    @Override
    @InboundThreadOnly
    public void handleEOF() {
        LOG.trace("PollChannel.handleEOF() {}", (Object)this);
        this.pollChannel.poll();
    }

    @Override
    public boolean isConnectionHandler() {
        return false;
    }

    @Override
    public void closeTask() {
        LOG.trace("PollChannel.closeTask() {}", (Object)this);
        this.pollChannel.closeTask();
    }

    @Override
    @InboundThreadOnly
    public void closeTaskOnError(IOException cause) {
        LOG.trace("PollChannel.closeTaskOnError() {}", (Object)this, (Object)cause);
        this.pollChannel.closeTaskOnError(cause);
    }

    @Override
    public Object inboundThreadAffinityKey() {
        return this.getListener().inboundThreadAffinityKey();
    }

    private void setPollingChannelReadyToSend() {
        this.getMultiplexerClient().executeInMultiplexer(pool -> this.setReadyToSend());
    }

    PollChannel getPollChannel() {
        return this.pollChannel;
    }

    SendChannel getSendChannel() {
        return this.sendChannel;
    }

    @MultiplexerOnly
    private static void writeMessageToStream(ConnectionInfo connectionInfo, Message message, OutputStream out) throws IOException {
        message.writeWithPollingHeader(out, connectionInfo);
        MessageLogger.appendMessage(message);
    }

    @MultiplexerOnly
    private static int messageSize(ConnectionInfo connectionInfo, Message message) {
        return message.size(connectionInfo) + 5;
    }

    @MultiplexerOnly
    private void formatMessageHeaders(ByteBuffer buffer, String method, int sequence) {
        PollingClientMessageChannel.encodeHost(buffer, this.serverDetails);
        PollingClientMessageChannel.encodeHeader(buffer, HTTPConstants.VERSION_HEADER_BYTES, this.getConnectionInfo().getProtocolVersion().asByte());
        PollingClientMessageChannel.encodeSessionToken(buffer, this.token);
        PollingClientMessageChannel.encodeHeader(buffer, HTTPConstants.METHOD_HEADER_BYTES, method);
        PollingClientMessageChannel.encodeHeader(buffer, HTTPConstants.MESSAGE_SEQUENCE_BYTES, sequence);
        PollingClientMessageChannel.encodeHeader(buffer, HTTPConstants.CONNECTION_BYTES, "keep-alive");
    }

    private static void encodeRequestLine(ByteBuffer destination, String path) {
        destination.put(PollingClientOutboundHandshake.POST_BYTES);
        URIEncoder.percentEncodePath(path, destination);
        destination.put(HTTPConstants.HTTP_VERSION_CRLF_BYTES);
    }

    private static void encodeHeader(ByteBuffer destination, byte[] key, Object value) {
        destination.put(key);
        destination.put(HTTPConstants.COLON_SPACE_BYTES);
        FastEncoder.asciiEncode(value.toString(), destination);
        destination.put(HTTPConstants.CRLF);
    }

    private static void encodeHost(ByteBuffer destination, ServerDetails serverDetails) {
        destination.put(HTTPConstants.HOST_BYTES);
        URIEncoder.percentEncode(serverDetails.getHost(), destination);
        destination.put((byte)58);
        FastEncoder.asciiEncode(Integer.toString(serverDetails.getPort()), destination);
        destination.put(HTTPConstants.CRLF);
    }

    private static void encodeSessionToken(ByteBuffer destination, SessionToken sessionToken) {
        destination.put(HTTPConstants.SESSION_TOKEN_HEADER_BYTES);
        destination.put(HTTPConstants.COLON_SPACE_BYTES);
        sessionToken.toBytes().copyTo(destination);
        destination.put(HTTPConstants.CRLF);
    }

    final class PollChannel {
        private final MultiplexerChannelCreator channel;
        @MultiplexerOnly
        private int sequence = 0;
        private final RequestBistable pollState = new RequestBistable();

        PollChannel(NetworkChannel downStream) {
            LOG.trace("PollChannel({}) opened", (Object)downStream);
            this.channel = new MultiplexerChannelCreator(new ChannelCreationConfiguration(PollingClientMessageChannel.this.networkChannelFactory, PollingClientMessageChannel.this.serverDetails, PollingClientMessageChannel.this.networkContext, PollingClientMessageChannel.this.inboundThreadPool, PollingClientMessageChannel.this), PollingClientMessageChannel.this.getMultiplexerClient(), downStream);
        }

        public String toString() {
            return this.getClass().getSimpleName() + String.valueOf(this.pollState) + String.valueOf(this.channel);
        }

        private void poll() {
            this.setBusy();
            PollingClientMessageChannel.this.getMultiplexerClient().enqueueEvent(PollingClientMessageChannel.this.pollChannel::sendPollRequest);
        }

        boolean setReady() {
            return this.pollState.setReady();
        }

        private boolean setBusy() {
            return this.pollState.setBusy();
        }

        private void closeTask() {
            this.channel.close();
            NetworkChannel closeChannel = this.channel.getChannelToClose();
            if (closeChannel == null) {
                this.completeClose(null);
            } else {
                PollingClientMessageChannel.this.closeInbound(closeChannel, () -> this.completeClose(null));
            }
        }

        @InboundThreadOnly
        private void closeTaskOnError(IOException cause) {
            if (this.pollState.isBusy()) {
                if (cause instanceof MaxMessageSizeException) {
                    PollingClientMessageChannel.this.close(MessageChannelClosedReason.MESSAGE_TOO_LARGE, cause);
                } else {
                    PollingClientMessageChannel.this.close(MessageChannelClosedReason.READ_ERROR, cause);
                }
            } else {
                PollingClientMessageChannel.this.pollChannel.closeNetworkChannel();
                if (PollingClientMessageChannel.this.isSendChannelClosed() && this.pollState.close()) {
                    PollingClientMessageChannel.this.onCloseCompleted(MessageChannelClosedReason.CONNECTION_LOST);
                } else {
                    PollingClientMessageChannel.this.pollChannel.poll();
                }
            }
        }

        private void completeClose(IOException e) {
            LOG.trace("PollChannel.completeClose({}), {}", (Object)e, (Object)this);
            if (PollingClientMessageChannel.this.isSendChannelClosed() && this.pollState.close()) {
                PollingClientMessageChannel.this.onCloseCompleted(MessageChannelClosedReason.CONNECTION_LOST);
            }
            this.channel.completeClose();
        }

        private void closeNetworkChannel() {
            this.channel.shutdown();
        }

        private void fail() {
            this.channel.fail();
        }

        @InboundThreadOnly
        public ReadControlSource.ReadControl handleInput(ByteBuffer buffer) throws IOException {
            LOG.trace("PollChannel data received for {}", (Object)this);
            if (!this.pollState.isBusy()) {
                throw new IOException("Poll response without a request");
            }
            int startOfHeaders = buffer.position();
            HTTPHeaders headers = HTTPHeaders.parseBuffer(buffer);
            if (headers == null) {
                buffer.compact();
                return ReadControlSource.READ_CONTROL.partial();
            }
            String contentLengthString = headers.find("content-length");
            if (contentLengthString == null) {
                throw new ParseHTTPException("The 'content-length:' header is missing");
            }
            int contentLength = Integer.parseInt(contentLengthString, 10);
            int bytesMissing = contentLength - buffer.remaining();
            if (bytesMissing > 0) {
                buffer.position(startOfHeaders);
                buffer.compact();
                return ReadControlSource.READ_CONTROL.partial();
            }
            String diffusionConnectionString = headers.find(PollingClientMessageChannel.DIFFUSION_CONNECTION_HEADER);
            if (PollingClientMessageChannel.RECONNECT_HEADER_VALUE.equalsIgnoreCase(diffusionConnectionString)) {
                LOG.trace("Poll request received by wrong server. Attempting reconnect");
                PollingClientMessageChannel.this.close(MessageChannelClosedReason.SESSION_UNKNOWN_TO_SERVER, null);
                return ReadControlSource.READ_CONTROL.close();
            }
            buffer.limit(buffer.position() + contentLength);
            if (!this.parseAndNotifyMessages(this.getDecodedResponseBuffer(buffer, headers))) {
                LOG.trace("PollChannel.handleInput() {}, read ABORT", (Object)this);
                PollingClientMessageChannel.this.close(MessageChannelClosedReason.REMOTE_CLOSE_REQUESTED, null);
                return ReadControlSource.READ_CONTROL.complete();
            }
            if (this.channel.isConnectedOutbound()) {
                LOG.trace("Poll response received for {}", (Object)PollingClientMessageChannel.this);
                this.setReady();
                this.poll();
                return ReadControlSource.READ_CONTROL.complete();
            }
            LOG.trace("Poll response received, but no channel for {}", (Object)PollingClientMessageChannel.this);
            CompletableFuture<NetworkChannel> channelCreation = new CompletableFuture<NetworkChannel>();
            NetworkChannelCreator creator = new NetworkChannelCreator(PollingClientMessageChannel.this.networkChannelFactory, PollingClientMessageChannel.this.serverDetails, PollingClientMessageChannel.this.networkContext, channelCreation);
            channelCreation.whenComplete((nc, failure) -> {
                if (nc != null) {
                    LOG.trace("Connected poll channel for {}", (Object)PollingClientMessageChannel.this);
                    this.poll();
                } else {
                    LOG.trace("Failed to connect poll channel for {}", (Object)PollingClientMessageChannel.this);
                    PollingClientMessageChannel.this.close(MessageChannelClosedReason.CONNECTION_LOST, (Throwable)failure);
                }
            });
            PollingClientMessageChannel.this.getMultiplexerClient().dispatchInNonMultiplexerThread(creator);
            return ReadControlSource.READ_CONTROL.suspendedComplete((CompletableFuture)channelCreation);
        }

        private boolean parseAndNotifyMessages(ByteBuffer body) throws IOException {
            while (body.hasRemaining()) {
                byte messageTypeAndEncoding;
                int length;
                if (PollingClientMessageChannel.this.getConnectionInfo().getProtocolVersion().isAtLeast(ProtocolVersion.PROTOCOL_12_VERSION)) {
                    length = body.getInt();
                    messageTypeAndEncoding = body.get();
                } else {
                    messageTypeAndEncoding = body.get();
                    length = body.getInt();
                }
                if (length > PollingClientMessageChannel.this.maximumMessageSize) {
                    throw MaxMessageSizeException.logWithoutStackTrace("Received message length of " + length + " bytes exceeding the configured maximum message size of " + PollingClientMessageChannel.this.maximumMessageSize + " bytes");
                }
                body.limit(body.position() + length);
                Message message = PollingClientMessageChannel.this.messageParser.parseMessage(MessageEncoding.extractMessageType(messageTypeAndEncoding), MessageEncoding.extractMessageEncoding(messageTypeAndEncoding), body);
                PollingClientMessageChannel.this.getListener().messageReceived(message);
                body.limit(body.capacity());
                if (message != AbortNotificationMessage.INSTANCE) continue;
                return false;
            }
            return true;
        }

        private ByteBuffer getDecodedResponseBuffer(ByteBuffer buffer, HTTPHeaders headers) throws IOException {
            if (null != headers.find("content-encoding")) {
                ZlibCompression compressor = ZlibCompression.forThread();
                try {
                    byte[] compressedPayload = new byte[buffer.remaining()];
                    buffer.get(compressedPayload, 0, compressedPayload.length);
                    IBytesOutputStream decompressed = compressor.decompress(compressedPayload, 0, compressedPayload.length);
                    return this.decodeBase64(decompressed.fromUTF8());
                }
                catch (DataFormatException e) {
                    throw new IOException("Fail to decompress the response body", e);
                }
            }
            byte[] data = new byte[buffer.remaining()];
            buffer.get(data);
            return this.decodeBase64(CharsetUtils.asciiToString(data));
        }

        NetworkChannel getChannel() {
            return this.channel.getChannel();
        }

        private ByteBuffer decodeBase64(String data) {
            return ByteBuffer.wrap(Base64.decode(data));
        }

        @MultiplexerOnly
        private void sendPollRequest(MultiplexerState state) {
            if (!PollingClientMessageChannel.this.isOutputOpen()) {
                return;
            }
            if (!this.channel.isConnectedOutbound()) {
                if (this.channel.isConnected()) {
                    this.closeNetworkChannel();
                    if (this.setReady()) {
                        this.poll();
                    }
                } else if (this.channel.isFailed()) {
                    LOG.trace("Error sending poll request for {}", (Object)this);
                    PollingClientMessageChannel.this.close(MessageChannelClosedReason.CONNECTION_LOST, null);
                } else if (!this.channel.isConnectionPending()) {
                    this.channel.connect().whenComplete((nc, failure) -> {
                        if (failure != null) {
                            PollingClientMessageChannel.this.close(MessageChannelClosedReason.CONNECTION_LOST, (Throwable)failure);
                        } else if (this.setReady()) {
                            this.poll();
                        }
                    });
                }
            } else {
                NetworkChannel channelForSend = this.channel.getChannel();
                LOG.trace("PollChannel Sending a new poll request to server for {}", (Object)this);
                ByteBuffer buffer = channelForSend.bufferForWriting(state.getDirectByteBufferPool(), 1024);
                PollingClientMessageChannel.encodeRequestLine(buffer, PollingClientMessageChannel.this.serverDetails.getPath());
                PollingClientMessageChannel.this.formatMessageHeaders(buffer, "1", this.sequence++);
                buffer.put(HTTPConstants.ZERO_CONTENT_LENGTH_CRLF_BYTES);
                buffer.put(HTTPConstants.CRLF);
                buffer.flip();
                channelForSend.nonBlockingWrite(buffer, state.getDirectByteBufferPool(), PollingClientMessageChannel.this.getMultiplexerClient()).whenComplete((result, error) -> {
                    if (error != null) {
                        LOG.trace("Poll request failed: {}", (Object)this);
                        if (this.setReady()) {
                            this.poll();
                        }
                    }
                });
            }
        }
    }

    @NotThreadSafe
    final class SendChannel
    implements ReadChannelHandler {
        @MultiplexerOnly
        private int sequence = 0;
        private final RequestBistable sendState = new RequestBistable();
        private final MultiplexerChannelCreator channel;

        private SendChannel() {
            this.channel = new MultiplexerChannelCreator(new ChannelCreationConfiguration(PollingClientMessageChannel.this.networkChannelFactory, PollingClientMessageChannel.this.serverDetails, PollingClientMessageChannel.this.networkContext, PollingClientMessageChannel.this.inboundThreadPool, this), PollingClientMessageChannel.this.getMultiplexerClient());
        }

        @InboundThreadOnly
        private boolean isBusy() {
            return this.sendState.isBusy();
        }

        boolean setReady() {
            return this.sendState.setReady();
        }

        @MultiplexerOnly
        private boolean prepareToSend() {
            LOG.trace("SendChannel.prepareToSend(): {}", (Object)this);
            if (!PollingClientMessageChannel.this.isOutputOpen()) {
                return false;
            }
            if (PollingClientMessageChannel.this.isSendChannelClosed()) {
                return false;
            }
            if (this.channel.isConnectedOutbound()) {
                this.sendState.setBusy();
                return true;
            }
            if (!this.channel.isConnectionPending()) {
                this.channel.connect().whenComplete((nc, failure) -> {
                    if (failure != null) {
                        PollingClientMessageChannel.this.close(MessageChannelClosedReason.CONNECTION_LOST, (Throwable)failure);
                    } else {
                        this.setReady();
                        PollingClientMessageChannel.this.setPollingChannelReadyToSend();
                    }
                });
                if (this.channel.isConnectedOutbound()) {
                    this.sendState.setBusy();
                    return true;
                }
            }
            return false;
        }

        @Override
        public NetworkChannel getChannel() {
            return this.channel.getChannel();
        }

        public String toString() {
            return this.getClass().getSimpleName() + String.valueOf(this.sendState) + String.valueOf(this.channel);
        }

        @Override
        @InboundThreadOnly
        public ReadControlSource.ReadControl handleInput(ByteBuffer buffer) throws IOException {
            LOG.trace("SendChannel data received for {}", (Object)this);
            if (!buffer.hasRemaining()) {
                return ReadControlSource.READ_CONTROL.partial();
            }
            boolean readyToSend = false;
            while (buffer.hasRemaining()) {
                HTTPHeaders httpHeaders = HTTPHeaders.parseBuffer(buffer);
                if (httpHeaders == null) {
                    if (readyToSend) {
                        PollingClientMessageChannel.this.setPollingChannelReadyToSend();
                    }
                    return ReadControlSource.READ_CONTROL.partial();
                }
                if (buffer.hasRemaining() && !PollingClientMessageChannel.this.httpPipelining) {
                    throw new IOException("The server is not expected to send a body with this response");
                }
                String responseLine = httpHeaders.getFirstLine();
                if (responseLine.contains("200 OK")) {
                    String diffusionConnectionString = httpHeaders.find(PollingClientMessageChannel.DIFFUSION_CONNECTION_HEADER);
                    if (PollingClientMessageChannel.RECONNECT_HEADER_VALUE.equalsIgnoreCase(diffusionConnectionString)) {
                        LOG.trace("Send request received by wrong server. Attempting reconnect");
                        PollingClientMessageChannel.this.close(MessageChannelClosedReason.SESSION_UNKNOWN_TO_SERVER, null);
                        return ReadControlSource.READ_CONTROL.close();
                    }
                    readyToSend = true;
                    continue;
                }
                if (responseLine.contains("413 Request Entity Too Large")) {
                    throw MaxMessageSizeException.logWithoutStackTrace("The server received a message whose length exceeded the configured maximum message size");
                }
                throw new IOException("Unexpected HTTP response code: " + responseLine);
            }
            if (readyToSend) {
                this.setReady();
                PollingClientMessageChannel.this.setPollingChannelReadyToSend();
            }
            return ReadControlSource.READ_CONTROL.complete();
        }

        @Override
        @InboundThreadOnly
        public void handleEOF() {
            LOG.trace("SendChannel.handleEOF() {}", (Object)this);
        }

        @Override
        public boolean isConnectionHandler() {
            return false;
        }

        @Override
        public void closeTask() {
            LOG.trace("SendChannel.closeTask() {}", (Object)this);
            this.channel.close();
            if (this.isBusy()) {
                LOG.trace("SendChannel.closeTask() busy close {}", (Object)this);
                PollingClientMessageChannel.this.close(MessageChannelClosedReason.CONNECTION_LOST, null);
                return;
            }
            NetworkChannel closeChannel = this.channel.getChannelToClose();
            if (closeChannel == null) {
                this.channel.completeClose();
            } else {
                PollingClientMessageChannel.this.closeInbound(closeChannel, this.channel::completeClose);
            }
        }

        private void closeNetworkChannel() {
            this.channel.shutdown();
        }

        private void fail() {
            this.channel.fail();
        }

        @Override
        public void closeTaskOnError(IOException cause) {
            LOG.trace("SendChannel.closeTaskOnError() {}", (Object)this, (Object)cause);
            if (this.isBusy()) {
                PollingClientMessageChannel.this.close(MessageChannelClosedReason.CONNECTION_LOST, cause);
            } else {
                this.closeNetworkChannel();
            }
        }

        @MultiplexerOnly
        private void closeOutbound(MessageChannelClosedReason reason, MultiplexerExecutor partialCloseExecutor, DirectByteBufferPool directByteBufferPool) {
            this.channel.close();
            NetworkChannel channelToClose = this.channel.getChannelToClose();
            if (channelToClose != null) {
                channelToClose.nonBlockingCloseOutbound(directByteBufferPool, partialCloseExecutor).whenComplete((result, ex) -> this.completeCloseOutbound(reason, (Throwable)ex));
            } else {
                this.completeCloseOutbound(reason, null);
                this.channel.completeClose();
            }
        }

        private void completeCloseOutbound(MessageChannelClosedReason reason, Throwable e) {
            LOG.trace("SendChannel.completeClose({}, {}), {}", new Object[]{reason, e, this});
            this.sendState.close();
            PollingClientMessageChannel.this.onCloseOutbound(reason, e);
        }

        @Override
        public Object inboundThreadAffinityKey() {
            return this;
        }

        @MultiplexerOnly
        void doSendMessages(MultiplexerState state, Message firstMessage, MessageChannelFeeder feeder, long now, MultiplexerExecutor partialWriteExecutor) {
            MessageChannelListener.SendResult resultType;
            LOG.trace("SendChannel.doSendMessages(): {}", (Object)this);
            NetworkChannel ch = this.getChannel();
            ConnectionInfo connectionInfo = PollingClientMessageChannel.this.getConnectionInfo();
            int outputBufferSize = PollingClientMessageChannel.this.serverDetails.getOutputBufferSize();
            IBytesOutputStreamImpl messages = IBytesOutputStreamImpl.forThread();
            int numberOfMessages = 1;
            int billedMessages = firstMessage.billedCost();
            try (Base64OutputStream out = new Base64OutputStream(messages);){
                PollingClientMessageChannel.writeMessageToStream(connectionInfo, firstMessage, out);
                while (true) {
                    Message message;
                    if ((message = feeder.peekMessage()) == null) {
                        resultType = MessageChannelListener.SendResult.QUEUE_DRAINED;
                        break;
                    }
                    if (out.encodedSizeWithPendingData(PollingClientMessageChannel.messageSize(connectionInfo, message)) > outputBufferSize - this.httpOverhead() - ((IBytesOutputStream)messages).length()) {
                        resultType = MessageChannelListener.SendResult.MESSAGES_PENDING;
                        break;
                    }
                    PollingClientMessageChannel.removePolled(feeder, message);
                    PollingClientMessageChannel.writeMessageToStream(connectionInfo, message, out);
                    ++numberOfMessages;
                    billedMessages += message.billedCost();
                }
            }
            catch (IOException e) {
                throw new AssertionError((Object)e);
            }
            ByteBuffer outputBuffer = ch.bufferForWriting(state.getDirectByteBufferPool(), this.httpOverhead() + ((IBytesOutputStream)messages).length());
            this.formatForHTTP(messages, outputBuffer);
            outputBuffer.flip();
            MessageLogger.logMessages(feeder, numberOfMessages);
            PollingClientMessageChannel.this.writeBufferToChannel(ch, state, outputBuffer, feeder, resultType, numberOfMessages, billedMessages, now, partialWriteExecutor);
            LOG.trace("SendChannel.doSendMessages(): {}: sent {} messages", (Object)this, (Object)numberOfMessages);
        }

        @MultiplexerOnly
        private void formatForHTTP(IBytesOutputStream messages, ByteBuffer outputBuffer) {
            PollingClientMessageChannel.encodeRequestLine(outputBuffer, PollingClientMessageChannel.this.serverDetails.getPath());
            PollingClientMessageChannel.this.formatMessageHeaders(outputBuffer, "2", this.sequence++);
            outputBuffer.put(HTTPConstants.CONTENT_LENGTH_BYTES);
            FastEncoder.asciiEncode(Integer.toString(messages.length()), outputBuffer);
            outputBuffer.put(HTTPConstants.CRLF);
            outputBuffer.put(HTTPConstants.CRLF);
            messages.copyTo(outputBuffer);
        }

        @MultiplexerOnly
        private int httpOverhead() {
            int httpOverhead = 256;
            return 256 + PollingClientMessageChannel.this.serverDetails.getPath().length();
        }
    }
}

