/*
 * Decompiled with CFR 0.152.
 */
package com.pushtechnology.diffusion.client.internal.services;

import com.pushtechnology.diffusion.client.callbacks.Registration;
import com.pushtechnology.diffusion.client.features.control.clients.ClientControl;
import com.pushtechnology.diffusion.client.internal.services.AbstractRegistration;
import com.pushtechnology.diffusion.client.internal.services.MutableServiceRegistry;
import com.pushtechnology.diffusion.client.internal.services.QueueEventHandlerRegistration;
import com.pushtechnology.diffusion.client.internal.session.InternalSession;
import com.pushtechnology.diffusion.command.ErrorReasonException;
import com.pushtechnology.diffusion.command.commands.control.client.QueueEventRequest;
import com.pushtechnology.diffusion.command.receiver.AbstractCommandService;
import com.pushtechnology.diffusion.command.receiver.CommandService;
import com.pushtechnology.diffusion.command.services.definition.CommonServices;
import com.pushtechnology.diffusion.control.ControlGroup;
import com.pushtechnology.diffusion.control.registration.ControlRegistrationParameters;
import com.pushtechnology.diffusion.conversation.NoSuchConversationException;
import com.pushtechnology.diffusion.util.concurrent.threads.WaitProtectedCompletableFuture;
import java.util.concurrent.CompletableFuture;
import net.jcip.annotations.ThreadSafe;

@ThreadSafe
public final class QueueEventHandlerRegistrationImpl
extends AbstractRegistration
implements QueueEventHandlerRegistration {
    public QueueEventHandlerRegistrationImpl(InternalSession internalSession, MutableServiceRegistry serviceRegistry) {
        super(internalSession);
        serviceRegistry.add(CommonServices.QUEUE_EVENT_HANDLER, new QueueEventHandlerService());
    }

    @Override
    public CompletableFuture<Registration> registerQueueEventHandler(final ClientControl.QueueEventStream handler) {
        WaitProtectedCompletableFuture<Registration> result = new WaitProtectedCompletableFuture<Registration>();
        this.registerServerControlHandler(new ControlRegistrationParameters(CommonServices.QUEUE_EVENT_HANDLER, ControlGroup.DEFAULT), new AbstractRegistration.AbstractHandlerAdapter<QueueEventRequest>(result){

            @Override
            public void respondToHandler(QueueEventRequest response) {
                switch (response.getQueueEvent()) {
                    case LOWER_THRESHOLD_CROSSED: {
                        handler.onLowerThresholdCrossed(response.getSessionId(), response.getMessageQueuePolicy());
                        break;
                    }
                    default: {
                        handler.onUpperThresholdCrossed(response.getSessionId(), response.getMessageQueuePolicy());
                    }
                }
            }

            @Override
            protected void reportPostRegistrationError(Throwable reason) {
                handler.onError(ErrorReasonException.localExceptionToErrorReason(reason));
            }

            @Override
            public void closeHandler() {
                handler.onClose();
            }

            public String toString() {
                return handler.toString();
            }
        });
        return result;
    }

    private static final class QueueEventHandlerService
    extends AbstractCommandService<QueueEventRequest, Void, InternalSession> {
        private QueueEventHandlerService() {
        }

        @Override
        public void safeOnRequest(InternalSession internalSession, QueueEventRequest request, CommandService.ServiceCallback<Void> callback) throws NoSuchConversationException {
            internalSession.getConversations().respond(request.getContext(), request);
            callback.respond(null);
        }
    }
}

