/*
 * Decompiled with CFR 0.152.
 */
package com.pushtechnology.diffusion.command.client.v4;

import com.pushtechnology.diffusion.api.internal.connection.ClientType;
import com.pushtechnology.diffusion.api.internal.connection.ConnectionTypeProvider;
import com.pushtechnology.diffusion.api.internal.connection.InternalConnectionType;
import com.pushtechnology.diffusion.api.internal.connection.ServerDetails;
import com.pushtechnology.diffusion.client.features.ClusterRoutingException;
import com.pushtechnology.diffusion.client.internal.routing.TopicRouting;
import com.pushtechnology.diffusion.client.internal.services.MutableServiceRegistry;
import com.pushtechnology.diffusion.client.internal.session.AbstractInternalSession;
import com.pushtechnology.diffusion.client.internal.session.InternalSession;
import com.pushtechnology.diffusion.client.internal.session.SessionConversationSetFactory;
import com.pushtechnology.diffusion.client.session.ServerInitializingException;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.session.SessionEstablishmentTransientException;
import com.pushtechnology.diffusion.client.session.SessionException;
import com.pushtechnology.diffusion.client.session.reconnect.ReconnectionStrategy;
import com.pushtechnology.diffusion.client.types.Credentials;
import com.pushtechnology.diffusion.command.client.v4.V4ServiceLocator;
import com.pushtechnology.diffusion.command.receiver.CommandService;
import com.pushtechnology.diffusion.command.sender.ServiceLocator;
import com.pushtechnology.diffusion.command.services.ServiceDefinitionRegistry;
import com.pushtechnology.diffusion.comms.connection.ConnectionCapabilities;
import com.pushtechnology.diffusion.comms.connection.ConnectionHandshakeEventListener;
import com.pushtechnology.diffusion.comms.connection.OutboundConnection;
import com.pushtechnology.diffusion.comms.connection.OutboundConnectionCallbacks;
import com.pushtechnology.diffusion.comms.connection.OutboundConnectionFactory;
import com.pushtechnology.diffusion.comms.connection.OutboundConnectionMessageHandler;
import com.pushtechnology.diffusion.comms.connection.ProtocolVersion;
import com.pushtechnology.diffusion.comms.connection.request.ConnectionRequest;
import com.pushtechnology.diffusion.comms.connection.response.ConnectionResponse;
import com.pushtechnology.diffusion.comms.connection.response.ResponseCode;
import com.pushtechnology.diffusion.connection.activity.monitor.SessionActivityMonitor;
import com.pushtechnology.diffusion.conversation.ConversationSet;
import com.pushtechnology.diffusion.flowcontrol.ConversationSetFlowMeasurement;
import com.pushtechnology.diffusion.io.bytes.IBytesInputStream;
import com.pushtechnology.diffusion.io.nio.NetworkChannel;
import com.pushtechnology.diffusion.io.serialisation.ReadSerialiser;
import com.pushtechnology.diffusion.io.serialisation.SerialisationContext;
import com.pushtechnology.diffusion.logs.i18n.I18nLogger;
import com.pushtechnology.diffusion.message.AbortNotificationMessage;
import com.pushtechnology.diffusion.message.ClientTopicMessage;
import com.pushtechnology.diffusion.message.Message;
import com.pushtechnology.diffusion.message.MessageChannelClosedReason;
import com.pushtechnology.diffusion.message.ParseMessageException;
import com.pushtechnology.diffusion.message.ServiceMessage;
import com.pushtechnology.diffusion.messagequeue.OutboundQueueConfiguration;
import com.pushtechnology.diffusion.session.impl.InternalSessionId;
import com.pushtechnology.diffusion.util.concurrent.threads.CommonThreadPools;
import com.pushtechnology.diffusion.v4.adapters.InboundServiceContext;
import com.pushtechnology.diffusion.v4.adapters.ServiceAdapter;
import java.io.EOFException;
import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.UnresolvedAddressException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import net.jcip.annotations.GuardedBy;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;

@ThreadSafe
final class V4InternalSession
extends AbstractInternalSession {
    private static final Logger LOG = I18nLogger.getLogger(V4InternalSession.class);
    private final OutboundConnectionFactory connectionFactory;
    private final ServiceLocator commandServiceLocator;
    private final TopicRouting topicRouting;
    private final ConnectionHandshakeEventListener connectionHandshakeEventListener;
    private final ServiceAdapter<InternalSession> serviceAdapter;
    private final OutboundQueueConfiguration queueConfiguration;
    private volatile OutboundConnection connection;
    private final List<ServerDetails> serverDetails;
    private final ReconnectionStrategy reconnectionStrategy;
    private final CommonThreadPools threadPools;
    private final int reconnectionTimeout;
    private final SessionActivityMonitor sessionActivityMonitor;
    private final int recoveryBufferSize;
    private volatile ConnectionResponse connectionResponse = null;
    private volatile Map<String, String> suppliedSessionProperties = null;

    V4InternalSession(OutboundConnectionFactory connectionFactory, CommonThreadPools threadPools, SerialisationContext serialisers, SessionConversationSetFactory conversationSetFactory, ConversationSet conversations, ConversationSetFlowMeasurement conversationFlowMeasurement, MutableServiceRegistry serviceRegistry, ServiceDefinitionRegistry serviceDefinitions, OutboundQueueConfiguration queueConfiguration, List<ServerDetails> serverDetails, ReconnectionStrategy reconnectionStrategy, int reconnectionTimeout, int recoveryBufferSize, SessionActivityMonitor sessionActivityMonitor, TopicRouting topicRouting, ConnectionHandshakeEventListener connectionHandshakeEventListner) {
        super(conversationSetFactory, conversations, conversationFlowMeasurement);
        this.connectionFactory = connectionFactory;
        this.threadPools = threadPools;
        this.queueConfiguration = queueConfiguration;
        this.serverDetails = serverDetails;
        this.reconnectionStrategy = reconnectionStrategy;
        this.reconnectionTimeout = reconnectionTimeout;
        this.recoveryBufferSize = recoveryBufferSize;
        this.sessionActivityMonitor = sessionActivityMonitor;
        this.topicRouting = topicRouting;
        this.connectionHandshakeEventListener = connectionHandshakeEventListner;
        V4ClientInboundServiceContext context = new V4ClientInboundServiceContext();
        this.serviceAdapter = new ServiceAdapter<InternalSession>(serviceRegistry, serviceDefinitions, serialisers, context);
        this.commandServiceLocator = new V4ServiceLocator(this, serialisers, this::send);
    }

    @Override
    public void connect(ProtocolVersion requestedProtocol, InternalConnectionType connectionType, String principal, Credentials credentials, Map<String, String> sessionProperties, String serverUUID) {
        this.doConnect(requestedProtocol, connectionType, principal, credentials, sessionProperties, serverUUID, null);
    }

    @Override
    public void connectReverse(NetworkChannel networkChannel, ProtocolVersion requestedProtocol, InternalConnectionType connectionType, String principal, Credentials credentials, Map<String, String> sessionProperties, String serverUUID) {
        this.doConnect(requestedProtocol, connectionType, principal, credentials, sessionProperties, serverUUID, networkChannel);
    }

    private void doConnect(ProtocolVersion requestedProtocol, InternalConnectionType connectionType, String principal, Credentials credentials, Map<String, String> sessionProperties, String serverUUID, NetworkChannel networkChannel) {
        LOG.trace("connecting {}", (Object)this);
        ConnectionRequest request = new ConnectionRequest(requestedProtocol, connectionType, ConnectionCapabilities.UNIFIED_ALL_CAPABILITIES, principal, credentials, sessionProperties, this.reconnectionTimeout, serverUUID);
        try {
            OutboundConnectionFactory.PendingConnection pendingConnection = networkChannel == null ? this.connectionFactory.connectMessageChannel(this.serverDetails, request, this.connectionHandshakeEventListener) : this.connectionFactory.connectReverseMessageChannel(this.serverDetails.get(0), request, this.connectionHandshakeEventListener, networkChannel);
            this.setPrincipal(principal);
            this.connectionResponse = pendingConnection.getResponse();
            this.connection = pendingConnection.createConnection(this.queueConfiguration, this.recoveryBufferSize, new ConnectionCallbacksImpl(), new MessageHandlerImpl(), this.topicRouting);
            this.suppliedSessionProperties = sessionProperties;
        }
        catch (SessionException e) {
            this.connectionFailed(e);
            throw e;
        }
        catch (IOException e) {
            this.connectionFailed(e);
            throw new SessionEstablishmentTransientException(e);
        }
        this.sessionActivityMonitor.onNewConnection(this.connection);
        this.addListener(new InternalSession.InternalSessionListener(){

            @Override
            public void onSessionEvent(InternalSession self, Session.State oldState, Session.State newState) {
                if (newState == Session.State.CLOSED_BY_CLIENT) {
                    V4InternalSession.this.connection.close();
                    V4InternalSession.this.removeListener(this);
                }
            }
        });
        this.setState(Session.State.CONNECTED_ACTIVE);
    }

    private void connectionFailed(Exception e) {
        LOG.debug("Failed to start session {}", (Object)this, (Object)e);
        this.setState(Session.State.CLOSED_FAILED);
    }

    private void messageFromServer(ClientTopicMessage message) {
        byte messageType = message.getMessageType();
        int topicId = message.getTopicId();
        if (messageType == 5) {
            this.topicRouting.notifyDelta(this, topicId, message.getBodyShared());
        } else {
            assert (messageType == 4) : messageType;
            this.topicRouting.notifyValue(this, topicId, message.getBody());
        }
    }

    OutboundConnection getConnection() {
        return this.connection;
    }

    @Override
    public ServiceLocator getServiceLocator() {
        return this.commandServiceLocator;
    }

    @Override
    public void onSystemPing() {
        this.sessionActivityMonitor.onSystemPing();
    }

    @Override
    public InternalSessionId getSessionId() {
        return this.connectionResponse != null ? this.connectionResponse.getSessionId() : null;
    }

    @Override
    public ProtocolVersion getProtocolVersion() {
        return this.connectionResponse != null ? this.connectionResponse.getProtocolVersion() : null;
    }

    @Override
    public Map<String, String> getProposedSessionProperties() {
        return this.suppliedSessionProperties;
    }

    @Override
    public String toString() {
        return String.format("V4InternalSession [%s, %s]", new Object[]{this.getState(), this.connection});
    }

    private void handleServiceMessage(ServiceMessage message) throws ParseMessageException {
        this.serviceAdapter.handleServiceMessage(this, this::send, message);
    }

    private void send(Message m) {
        OutboundConnection c = this.connection;
        if (c != null) {
            c.sendMessage(m);
        }
    }

    private static final class V4ClientInboundServiceContext
    implements InboundServiceContext<InternalSession> {
        private V4ClientInboundServiceContext() {
        }

        @Override
        public ProtocolVersion getProtocolVersion(InternalSession session) {
            return session.getProtocolVersion();
        }

        @Override
        public ConversationSet getConversations(InternalSession session) {
            return session.getConversations();
        }

        @Override
        public ClientType getClientType(InternalSession session) {
            return ConnectionTypeProvider.clientType();
        }

        @Override
        public <C, R> void onRequest(int serviceId, ReadSerialiser<C> commandSerialiser, ReadSerialiser<R> responseSerialiser, CommandService<C, R, ? super InternalSession> service, InternalSession session, IBytesInputStream input, CommandService.ServiceCallback<R> callback) throws IOException {
            service.onRequest(session, commandSerialiser.read(input), callback);
        }

        public String toString() {
            return "Client service context";
        }
    }

    private final class ConnectionCallbacksImpl
    implements OutboundConnectionCallbacks {
        @GuardedBy(value="this")
        private ScheduledFuture<?> scheduledReconnectTimeout;

        private ConnectionCallbacksImpl() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onLost() {
            ConnectionCallbacksImpl connectionCallbacksImpl = this;
            synchronized (connectionCallbacksImpl) {
                V4InternalSession.this.sessionActivityMonitor.onConnectionClosed();
                if (V4InternalSession.this.reconnectionTimeout <= 0 && V4InternalSession.this.setState(Session.State.CLOSED_FAILED)) {
                    LOG.info("RECONNECT_DISABLED", (Object)V4InternalSession.this.connection);
                    V4InternalSession.this.connection.close();
                    return;
                }
                if (V4InternalSession.this.setState(Session.State.RECOVERING_RECONNECT)) {
                    LOG.trace("{}: scheduling timeout", (Object)V4InternalSession.this);
                    this.scheduledReconnectTimeout = V4InternalSession.this.threadPools.getBackgroundThreadPool().schedule(this::onReconnectTimeout, (long)V4InternalSession.this.reconnectionTimeout, TimeUnit.MILLISECONDS);
                }
            }
            V4InternalSession.this.threadPools.getBackgroundThreadPool().execute(() -> V4InternalSession.this.reconnectionStrategy.performReconnection(new ReconnectionAttemptImplementation()));
        }

        private void onReconnectTimeout() {
            if (V4InternalSession.this.setState(Session.State.CLOSED_FAILED)) {
                LOG.info("RECONNECT_TIMEOUT_REACHED", (Object)V4InternalSession.this.connection);
                V4InternalSession.this.connection.close();
            }
        }

        @Override
        public void onMaximumMessageSizeExceeded() {
            V4InternalSession.this.sessionActivityMonitor.onConnectionClosed();
            if (V4InternalSession.this.setState(Session.State.CLOSED_FAILED)) {
                V4InternalSession.this.getErrorHandler().notifyError(MessageChannelClosedReason.MESSAGE_TOO_LARGE::name);
            }
        }

        @Override
        public void onClosed() {
            V4InternalSession.this.sessionActivityMonitor.onConnectionClosed();
            if (V4InternalSession.this.setState(Session.State.CLOSED_BY_SERVER)) {
                LOG.debug("{}: serverDisconnected", (Object)V4InternalSession.this);
            }
        }

        private final class ReconnectionAttemptImplementation
        implements ReconnectionStrategy.ReconnectionAttempt {
            private ReconnectionAttemptImplementation() {
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void start() {
                if (V4InternalSession.this.getState() == Session.State.RECOVERING_RECONNECT) {
                    LOG.debug("{}: attempting reconnect", (Object)V4InternalSession.this);
                    try {
                        V4InternalSession.this.connection.reconnect(this::onReconnection);
                        ConnectionCallbacksImpl connectionCallbacksImpl = ConnectionCallbacksImpl.this;
                        synchronized (connectionCallbacksImpl) {
                            V4InternalSession.this.sessionActivityMonitor.onNewConnection(V4InternalSession.this.connection);
                            if (V4InternalSession.this.setState(Session.State.CONNECTED_ACTIVE)) {
                                LOG.trace("{}: clearing timeout", (Object)V4InternalSession.this);
                                ConnectionCallbacksImpl.this.scheduledReconnectTimeout.cancel(true);
                                ConnectionCallbacksImpl.this.scheduledReconnectTimeout = null;
                            }
                        }
                    }
                    catch (SessionEstablishmentTransientException | IOException ex) {
                        this.logTransientException(ex);
                        ConnectionCallbacksImpl.this.onLost();
                    }
                    catch (SessionException ex) {
                        if (LOG.isWarnEnabled()) {
                            LOG.warn("RECONNECT_REJECTED_BY_SERVER", (Object)V4InternalSession.this.connection, (Object)ex.getLocalizedMessage());
                        }
                        this.abort();
                    }
                } else {
                    this.abort();
                }
            }

            private void logTransientException(Exception ex) {
                if (LOG.isWarnEnabled()) {
                    Throwable cause = ex.getCause();
                    if (this.isStackTraceSuppressed(cause) || this.isStackTraceSuppressed(ex)) {
                        Object message = cause != null ? ex.getLocalizedMessage() + " : " + cause.getClass().getSimpleName() : ex.getLocalizedMessage();
                        LOG.warn("RECONNECT_FAILED", (Object)V4InternalSession.this.connection, message);
                    } else {
                        LOG.warn("RECONNECT_FAILED", V4InternalSession.this.connection, ex.getLocalizedMessage(), ex);
                    }
                }
            }

            private boolean isStackTraceSuppressed(Throwable ex) {
                return ex instanceof ServerInitializingException || ex instanceof ConnectException || ex instanceof SocketTimeoutException || ex instanceof ClosedChannelException || ex instanceof UnresolvedAddressException || ex instanceof EOFException;
            }

            private void onReconnection(ConnectionResponse reconnectionResponse) {
                if (reconnectionResponse.getCode() == ResponseCode.RECONNECTED_WITH_MESSAGE_LOSS) {
                    V4InternalSession.this.replaceConversationSet(new ClusterRoutingException("Operation cancelled because session reconnected to a different server"));
                    V4InternalSession.this.topicRouting.notifyUnsubscriptionOfAllTopics();
                }
            }

            @Override
            public void abort() {
                V4InternalSession.this.setState(Session.State.CLOSED_FAILED);
                V4InternalSession.this.connection.close();
            }
        }
    }

    final class MessageHandlerImpl
    implements OutboundConnectionMessageHandler {
        MessageHandlerImpl() {
        }

        @Override
        public void handleMessage(Message message, OutboundConnection outbound) throws ParseMessageException {
            block6: {
                LOG.trace("{}: messageFromServer - {}", (Object)this, (Object)message);
                try {
                    ConversationSetFlowMeasurement.disableForThread();
                    if (message instanceof ClientTopicMessage) {
                        V4InternalSession.this.messageFromServer((ClientTopicMessage)message);
                        break block6;
                    }
                    if (message instanceof ServiceMessage) {
                        V4InternalSession.this.handleServiceMessage((ServiceMessage)message);
                        break block6;
                    }
                    if (message instanceof AbortNotificationMessage) {
                        outbound.abort();
                        break block6;
                    }
                    throw new ParseMessageException("Unknown message type: " + String.valueOf(message));
                }
                finally {
                    ConversationSetFlowMeasurement.enableForThread();
                }
            }
        }
    }
}

