/*
 * Decompiled with CFR 0.152.
 */
package com.pushtechnology.diffusion.client.features.impl;

import com.pushtechnology.diffusion.client.callbacks.Registration;
import com.pushtechnology.diffusion.client.features.Messaging;
import com.pushtechnology.diffusion.client.features.impl.AbstractFeature;
import com.pushtechnology.diffusion.client.internal.services.FilterResponseListener;
import com.pushtechnology.diffusion.client.internal.services.RequestReceiverRegistration;
import com.pushtechnology.diffusion.client.internal.session.ClosedSessionListener;
import com.pushtechnology.diffusion.client.internal.session.InternalSession;
import com.pushtechnology.diffusion.client.internal.streams.RequestStreamRegistry;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.session.SessionClosedException;
import com.pushtechnology.diffusion.client.session.SessionId;
import com.pushtechnology.diffusion.command.commands.control.client.CountOrParserErrors;
import com.pushtechnology.diffusion.command.commands.control.client.MessagingClientSendRequest;
import com.pushtechnology.diffusion.command.commands.control.client.MessagingResponse;
import com.pushtechnology.diffusion.command.commands.send.MessagingSendRequest;
import com.pushtechnology.diffusion.command.sender.ServiceReference;
import com.pushtechnology.diffusion.command.services.definition.CommonServices;
import com.pushtechnology.diffusion.datatype.DataType;
import com.pushtechnology.diffusion.datatype.DataTypes;
import com.pushtechnology.diffusion.io.bytes.IBytes;
import com.pushtechnology.diffusion.java7.Functions;
import com.pushtechnology.diffusion.session.impl.InternalSessionId;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;

public class MessagingImpl
extends AbstractFeature
implements Messaging {
    private final RequestStreamRegistry requestStreamRegistry;
    private final DataTypes dataTypes;
    private final RequestReceiverRegistration requestReceiverRegistration;
    private final FilterResponseListener filterResponseListener;
    private final ServiceReference<MessagingSendRequest, MessagingResponse> messagingSendService;
    private final ServiceReference<MessagingClientSendRequest, MessagingResponse> requestSender;

    public MessagingImpl(Session session, InternalSession internalSession, DataTypes dataTypes, RequestStreamRegistry requestStreamRegistry, RequestReceiverRegistration requestReceiverRegistration, FilterResponseListener filterResponseListener) {
        super(session, internalSession);
        this.requestStreamRegistry = requestStreamRegistry;
        this.dataTypes = dataTypes;
        this.requestReceiverRegistration = requestReceiverRegistration;
        this.filterResponseListener = filterResponseListener;
        this.messagingSendService = internalSession.getServiceLocator().obtainService(CommonServices.MESSAGING_SEND);
        this.requestSender = internalSession.getServiceLocator().obtainService(CommonServices.MESSAGING_RECEIVER_SERVER);
        internalSession.addListener(new ClosedSessionListener(){

            @Override
            public void onClosed() {
                MessagingImpl.this.requestStreamRegistry.discardAll();
            }
        });
    }

    @Override
    public <T, R> CompletableFuture<R> sendRequest(String path, T request, Class<T> requestType, Class<R> responseType) {
        DataType<T> requestDataType = this.dataTypes.getByClass(Objects.requireNonNull(requestType, "request type is null"));
        Objects.requireNonNull(responseType, "response type is null");
        MessagingSendRequest sendRequest = new MessagingSendRequest(Objects.requireNonNull(path, "path is null"), requestDataType, IBytes.toIBytes(requestDataType.toBytes(request)));
        return this.messagingSendService.sendCommand(sendRequest).thenApply(response -> response.getDataType().readAs(responseType, response.getResponse()));
    }

    @Override
    public <T, R> Messaging.RequestStream<?, ?> setRequestStream(String path, Class<? extends T> requestType, Class<? super R> responseType, Messaging.RequestStream<T, R> requestStream) {
        return this.requestStreamRegistry.add(Objects.requireNonNull(path, "path is null"), Objects.requireNonNull(requestType, "request type is null"), Objects.requireNonNull(responseType, "response type is null"), Objects.requireNonNull(requestStream, "stream is null"));
    }

    @Override
    public Messaging.RequestStream<?, ?> removeRequestStream(String path) {
        return this.requestStreamRegistry.remove(Objects.requireNonNull(path, "path is null"));
    }

    @Override
    public <T, R> CompletableFuture<Registration> addRequestHandler(String path, Class<? extends T> requestType, Class<? super R> responseType, Messaging.RequestHandler<T, R> handler, String ... sessionProperties) {
        return this.requestReceiverRegistration.registerRequestReceiver(Objects.requireNonNull(path, "path is null"), Objects.requireNonNull(requestType, "request type is null"), Objects.requireNonNull(responseType, "response type is null"), Objects.requireNonNull(handler, "request handler is null"), Arrays.asList(sessionProperties)).thenApply(Functions.identity());
    }

    @Override
    public <T, R> CompletableFuture<R> sendRequest(SessionId sessionId, String path, T request, Class<T> requestType, Class<R> responseType) {
        DataType<T> requestDataType = this.dataTypes.getByClass(Objects.requireNonNull(requestType, "request type is null"));
        Objects.requireNonNull(responseType, "response type is null");
        MessagingClientSendRequest sendRequest = new MessagingClientSendRequest(Objects.requireNonNull((InternalSessionId)sessionId, "sessionId is null"), Objects.requireNonNull(path, "path is null"), requestDataType, IBytes.toIBytes(requestDataType.toBytes(request)));
        return this.requestSender.sendCommand(sendRequest).thenApply(response -> response.getDataType().readAs(responseType, response.getResponse()));
    }

    @Override
    public <T, R> CompletableFuture<Integer> sendRequestToFilter(String filter, String path, T request, Class<T> requestType, Class<R> responseType, Messaging.FilteredRequestCallback<? super R> callback) throws SessionClosedException {
        return this.filterResponseListener.sendFilterRequest(Objects.requireNonNull(filter, "session filter is null"), Objects.requireNonNull(path, "path is null"), request, Objects.requireNonNull(requestType, "request type is null"), Objects.requireNonNull(responseType, "response type is null"), Objects.requireNonNull(callback, "callback is null")).thenApply(CountOrParserErrors::getCountChecked);
    }
}

