/*
 * Decompiled with CFR 0.152.
 */
package com.pushtechnology.diffusion.v4.adapters;

import com.pushtechnology.diffusion.api.internal.connection.ClientType;
import com.pushtechnology.diffusion.client.callbacks.ErrorReason;
import com.pushtechnology.diffusion.command.ErrorReasonException;
import com.pushtechnology.diffusion.command.receiver.CommandService;
import com.pushtechnology.diffusion.command.receiver.NullServiceCallback;
import com.pushtechnology.diffusion.command.services.ServiceDefinition;
import com.pushtechnology.diffusion.command.services.ServiceDefinitionRegistry;
import com.pushtechnology.diffusion.command.services.ServiceRegistry;
import com.pushtechnology.diffusion.comms.connection.ProtocolVersion;
import com.pushtechnology.diffusion.conversation.ConversationId;
import com.pushtechnology.diffusion.io.bytes.IBytesInputStream;
import com.pushtechnology.diffusion.io.encoding.EncodedDataCodec;
import com.pushtechnology.diffusion.io.serialisation.SerialisationContext;
import com.pushtechnology.diffusion.io.serialisation.Serialiser;
import com.pushtechnology.diffusion.logs.i18n.I18nLogger;
import com.pushtechnology.diffusion.logs.i18n.I18nUtils;
import com.pushtechnology.diffusion.message.ParseMessageException;
import com.pushtechnology.diffusion.message.Sender;
import com.pushtechnology.diffusion.message.ServiceMessage;
import com.pushtechnology.diffusion.v4.adapters.InboundServiceContext;
import com.pushtechnology.diffusion.v4.adapters.ServiceMessageHandler;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import net.jcip.annotations.Immutable;

public final class ServiceAdapter<E>
implements ServiceMessageHandler<E> {
    private static final I18nLogger LOG = I18nLogger.getLogger(ServiceAdapter.class);
    private final SerialisationContext serialisationContext;
    private final InboundServiceContext<E> serviceContext;
    private final ServiceRegistry<? super E> serviceRegistry;
    private final ServiceDefinitionRegistry serviceDefinitionRegistry;

    public ServiceAdapter(ServiceRegistry<? super E> serviceRegistry, ServiceDefinitionRegistry serviceDefinitionRegistry, SerialisationContext serialisers, InboundServiceContext<E> serviceContext) {
        this.serviceRegistry = serviceRegistry;
        this.serviceDefinitionRegistry = serviceDefinitionRegistry;
        this.serialisationContext = serialisers;
        this.serviceContext = serviceContext;
    }

    private void sendError(E session, Sender sender, int serviceId, long cid, ErrorReason reason, String description) {
        LOG.debug("Sending error {}: {} to {} for service {}", reason, description, session, serviceId);
        ProtocolVersion protocolVersion = this.serviceContext.getProtocolVersion(session);
        ServiceMessage message = protocolVersion.isAtLeast(ProtocolVersion.PROTOCOL_18_VERSION) ? ServiceMessage.createError(cid, reason, description) : (protocolVersion.isAtLeast(ProtocolVersion.PROTOCOL_12_VERSION) ? ServiceMessage.createProtocol9Error(serviceId, cid, reason, description) : ServiceMessage.createProtocol9Error(serviceId, cid, ServiceAdapter.toLegacyErrorReason(reason), description));
        sender.send(message);
    }

    private static ErrorReason toLegacyErrorReason(ErrorReason reason) {
        if (reason == ErrorReason.ACCESS_DENIED || reason == ErrorReason.TOPIC_TREE_REGISTRATION_CONFLICT) {
            return reason;
        }
        return ErrorReason.COMMUNICATION_FAILURE;
    }

    @Override
    public void handleServiceMessage(E session, Sender sender, ServiceMessage serviceMessage) throws ParseMessageException {
        IBytesInputStream input = serviceMessage.getBody().asInputStream();
        byte messageType = serviceMessage.getMessageType();
        switch (messageType) {
            case 0: {
                int serviceId = ServiceMessage.parseServiceId(input);
                long cid = ServiceMessage.parseConversationId(input);
                this.handleRequest(session, sender, this.serviceContext.getClientType(session), serviceId, cid, input);
                break;
            }
            case 6: {
                this.handleResponse(session, ServiceMessage.parseConversationId(input), input);
                break;
            }
            case 7: {
                this.handleError(session, ServiceMessage.parseConversationId(input), input);
                break;
            }
            case 1: {
                ServiceMessage.parseServiceId(input);
                this.handleResponse(session, ServiceMessage.parseConversationId(input), input);
                break;
            }
            case 2: {
                ServiceMessage.parseServiceId(input);
                this.handleError(session, ServiceMessage.parseConversationId(input), input);
                break;
            }
            default: {
                throw new AssertionError((Object)("Invalid service message: " + messageType));
            }
        }
    }

    void handleRequest(E session, Sender sender, ClientType clientType, int serviceId, long cid, IBytesInputStream input) {
        CommandService<?, ?, ? super E> service = this.serviceRegistry.get(serviceId);
        if (service == null) {
            this.sendErrorServiceUnsupported(session, sender, serviceId, cid);
            return;
        }
        this.handleRequest(session, sender, clientType, serviceId, cid, input, service);
    }

    private <C, R> void handleRequest(E session, Sender sender, ClientType clientType, int serviceId, long cid, IBytesInputStream input, CommandService<C, R, ? super E> service) {
        ProtocolVersion protocolVersion = this.serviceContext.getProtocolVersion(session);
        ServiceDefinition serviceDefinition = this.serviceDefinitionRegistry.getService(serviceId);
        Serialiser requestSerialiser = serviceDefinition.requestSerialiserFor(this.serialisationContext, clientType, protocolVersion.asByte());
        if (requestSerialiser == null) {
            this.sendErrorServiceUnsupported(session, sender, serviceId, cid);
            return;
        }
        Serialiser responseSerialiser = serviceDefinition.responseSerialiserFor(this.serialisationContext, clientType, protocolVersion.asByte());
        CommandService.ServiceCallback callback = protocolVersion.isAtLeast(ProtocolVersion.PROTOCOL_12_VERSION) && cid == ConversationId.ONEWAY_CID.getId() ? NullServiceCallback.nullServiceCallback() : new ServiceCallbackImpl(session, sender, serviceId, cid, responseSerialiser);
        try {
            this.serviceContext.onRequest(serviceId, requestSerialiser, responseSerialiser, service, (E)session, input, callback);
        }
        catch (IOException e) {
            LOG.debug("Failed to deserialise service request from {}", (Object)session, (Object)e);
            callback.fail(ErrorReason.COMMUNICATION_FAILURE, e.getMessage());
        }
    }

    void handleResponse(E session, long cid, InputStream input) {
        this.serviceContext.getConversations(session).respondIfPresent(new ConversationId(cid), input);
    }

    void handleError(E session, long cid, InputStream input) {
        ErrorReason errorReason;
        ConversationId conversationId = new ConversationId(cid);
        try {
            String description = EncodedDataCodec.readString(input);
            int reasonCode = EncodedDataCodec.readInt32(input);
            errorReason = new ErrorReason(reasonCode, description);
        }
        catch (IOException e) {
            errorReason = ErrorReason.COMMUNICATION_FAILURE;
        }
        this.serviceContext.getConversations(session).discard(conversationId, new ErrorReasonException(errorReason));
    }

    private void sendErrorServiceUnsupported(E session, Sender sender, int serviceId, long cid) {
        I18nLogger.Translation translation = I18nUtils.formatMessage("COMMAND_ERROR_SERVICE_UNSUPPORTED", serviceId, session);
        LOG.warn(translation);
        this.sendError(session, sender, serviceId, cid, ErrorReason.COMMUNICATION_FAILURE, translation.getText());
    }

    @Immutable
    private final class ServiceCallbackImpl<R>
    extends AtomicBoolean
    implements CommandService.ServiceCallback<R> {
        private final E session;
        private final Sender sender;
        private final int serviceId;
        private final long cid;
        private final Serialiser<R> responseSerialiser;

        ServiceCallbackImpl(E session, Sender sender, int serviceId, long cid, Serialiser<R> responseSerialiser) {
            this.session = session;
            this.sender = sender;
            this.serviceId = serviceId;
            this.cid = cid;
            this.responseSerialiser = responseSerialiser;
        }

        @Override
        public void respond(R response) {
            ServiceMessage responseMessage;
            ProtocolVersion protocolVersion = ServiceAdapter.this.serviceContext.getProtocolVersion(this.session);
            try {
                responseMessage = protocolVersion.isAtLeast(ProtocolVersion.PROTOCOL_18_VERSION) ? ServiceMessage.createResponse(this.cid, this.responseSerialiser, response) : ServiceMessage.createProtocol9Response(this.serviceId, this.cid, this.responseSerialiser, response);
            }
            catch (Exception e) {
                this.fail(ErrorReasonException.localExceptionToErrorReason(e), e.getMessage());
                return;
            }
            this.markDone("respond", response);
            ServiceAdapter.this.serviceContext.onResponse(this.serviceId);
            this.sender.send(responseMessage);
        }

        @Override
        public void fail(ErrorReason errorReason, String message) {
            this.markDone("fail", errorReason);
            String messageOrDefault = message == null || message.isEmpty() ? errorReason.getDescription() : message;
            ServiceAdapter.this.serviceContext.onError(this.serviceId);
            ServiceAdapter.this.sendError(this.session, this.sender, this.serviceId, this.cid, errorReason, messageOrDefault);
        }

        private void markDone(String action, Object parameter) {
            if (!this.compareAndSet(false, true)) {
                throw new IllegalStateException("Can't " + action + " with '" + String.valueOf(parameter) + "' as callback is complete - " + String.valueOf(this));
            }
        }

        @Override
        public String toString() {
            return "Callback for request to service=" + this.serviceId + ", cid=" + this.cid + " from " + String.valueOf(this.session);
        }
    }
}

