/*
 * Decompiled with CFR 0.152.
 */
package com.pushtechnology.diffusion.client.internal.services;

import com.pushtechnology.diffusion.client.callbacks.ErrorReason;
import com.pushtechnology.diffusion.client.features.IncompatibleDatatypeException;
import com.pushtechnology.diffusion.client.features.Messaging;
import com.pushtechnology.diffusion.client.internal.services.AbstractRegistration;
import com.pushtechnology.diffusion.client.internal.services.FilterResponseListener;
import com.pushtechnology.diffusion.client.internal.services.MutableServiceRegistry;
import com.pushtechnology.diffusion.client.internal.session.InternalSession;
import com.pushtechnology.diffusion.command.ErrorReasonException;
import com.pushtechnology.diffusion.command.commands.control.client.CountOrParserErrors;
import com.pushtechnology.diffusion.command.commands.control.client.FilterResponse;
import com.pushtechnology.diffusion.command.commands.control.client.MessagingClientFilterSendRequest;
import com.pushtechnology.diffusion.command.commands.control.client.MessagingResponse;
import com.pushtechnology.diffusion.command.receiver.AbstractCommandService;
import com.pushtechnology.diffusion.command.receiver.CommandService;
import com.pushtechnology.diffusion.command.sender.ServiceReference;
import com.pushtechnology.diffusion.command.services.definition.CommonServices;
import com.pushtechnology.diffusion.conversation.ConversationId;
import com.pushtechnology.diffusion.conversation.ConversationSet;
import com.pushtechnology.diffusion.conversation.NoSuchConversationException;
import com.pushtechnology.diffusion.conversation.ResponseHandler;
import com.pushtechnology.diffusion.datatype.DataType;
import com.pushtechnology.diffusion.datatype.DataTypes;
import com.pushtechnology.diffusion.datatype.InvalidDataException;
import com.pushtechnology.diffusion.io.bytes.IBytes;
import com.pushtechnology.diffusion.logs.i18n.I18nLogger;
import com.pushtechnology.diffusion.session.impl.InternalSessionId;
import java.util.concurrent.CompletableFuture;
import net.jcip.annotations.Immutable;
import net.jcip.annotations.NotThreadSafe;
import org.slf4j.Logger;

@Immutable
public final class FilterResponseListenerImpl
extends AbstractRegistration
implements FilterResponseListener {
    private final InternalSession session;
    private final DataTypes dataTypes;
    private static final Logger LOG = I18nLogger.getLogger(FilterResponseListenerImpl.class);

    public FilterResponseListenerImpl(InternalSession session, DataTypes dataTypes, MutableServiceRegistry services) {
        super(session);
        services.add(CommonServices.FILTER_RESPONSE, new FilterResponseResultReceiver());
        this.session = session;
        this.dataTypes = dataTypes;
    }

    @Override
    public <T, R> CompletableFuture<CountOrParserErrors> sendFilterRequest(String filter, String path, T request, Class<T> requestType, Class<R> responseType, Messaging.FilteredRequestCallback<? super R> callback) {
        DataType<T> requestDataType = this.dataTypes.getByClass(requestType);
        ConversationSet conversationSet = this.session.getConversations();
        ServiceReference<MessagingClientFilterSendRequest, CountOrParserErrors> sender = this.session.getServiceLocator().obtainService(CommonServices.MESSAGING_FILTER_SENDER);
        ConversationId cid = conversationSet.newConversation(new FilterResponseHandler<R>(callback, responseType));
        MessagingClientFilterSendRequest filterRequest = new MessagingClientFilterSendRequest(cid, filter, path, requestDataType, IBytes.toIBytes(requestDataType.toBytes(request)));
        return sender.sendCommand(filterRequest).whenComplete((result, ex) -> conversationSet.respondIfPresent(cid, ex == null ? result : null));
    }

    private static final class FilterResponseResultReceiver
    extends AbstractCommandService<FilterResponse, Void, InternalSession> {
        private FilterResponseResultReceiver() {
        }

        @Override
        protected void safeOnRequest(InternalSession session, FilterResponse response, CommandService.ServiceCallback<Void> callback) throws NoSuchConversationException {
            session.getConversations().respond(response.getContext(), response);
        }
    }

    @NotThreadSafe
    private static final class FilterResponseHandler<R>
    implements ResponseHandler {
        private final Messaging.FilteredRequestCallback<? super R> callback;
        private final Class<R> responseType;
        private int expected = -1;
        private int counter;

        private FilterResponseHandler(Messaging.FilteredRequestCallback<? super R> callback, Class<R> responseType) {
            this.callback = callback;
            this.responseType = responseType;
        }

        @Override
        public boolean onResponse(ConversationId id, Object response) {
            if (response == null) {
                return true;
            }
            if (response instanceof FilterResponse) {
                ++this.counter;
                return this.onFilterResponse((FilterResponse)response);
            }
            assert (response instanceof CountOrParserErrors);
            return this.onCountOrParserErrorsResponse((CountOrParserErrors)response);
        }

        @Override
        public void onDiscard(ConversationId id, Throwable reason) {
        }

        private boolean onCountOrParserErrorsResponse(CountOrParserErrors response) {
            if (!response.getErrors().isEmpty()) {
                return true;
            }
            this.expected = response.getCount();
            return this.isDone();
        }

        private boolean onFilterResponse(FilterResponse response) {
            InternalSessionId sessionId = response.getSessionId();
            MessagingResponse messagingResponse = response.getResponse();
            ErrorReason errorReason = response.getErrorReason();
            if (errorReason != null) {
                this.callback.onResponseError(sessionId, ErrorReasonException.toApiException(errorReason, errorReason.getDescription()));
            } else {
                DataType<?> responseDataType = messagingResponse.getDataType();
                if (responseDataType.canReadAs(this.responseType)) {
                    R responseData;
                    try {
                        responseData = responseDataType.readAs(this.responseType, messagingResponse.getResponse());
                    }
                    catch (InvalidDataException ex) {
                        this.callback.onResponseError(sessionId, ex);
                        return this.isDone();
                    }
                    try {
                        this.callback.onResponse(sessionId, responseData);
                    }
                    catch (Exception ex) {
                        LOG.error("MESSAGING_FILTER_EXCEPTION", (Object)this.callback, (Object)ex);
                    }
                } else {
                    this.callback.onResponseError(sessionId, new IncompatibleDatatypeException("The server sent a " + String.valueOf(responseDataType) + " which cannot be read as a " + String.valueOf(this.responseType)));
                }
            }
            return this.isDone();
        }

        private boolean isDone() {
            return this.counter >= this.expected && this.expected != -1;
        }
    }
}

