/*
 * Decompiled with CFR 0.152.
 */
package com.pushtechnology.diffusion.client.features.impl;

import com.pushtechnology.diffusion.client.ClientUtils;
import com.pushtechnology.diffusion.client.callbacks.Stream;
import com.pushtechnology.diffusion.client.features.ScriptException;
import com.pushtechnology.diffusion.client.features.TimeSeries;
import com.pushtechnology.diffusion.client.features.TopicCreationResult;
import com.pushtechnology.diffusion.client.features.TopicUpdate;
import com.pushtechnology.diffusion.client.features.Topics;
import com.pushtechnology.diffusion.client.features.UpdateConstraint;
import com.pushtechnology.diffusion.client.features.UpdateStream;
import com.pushtechnology.diffusion.client.features.control.topics.SessionTrees;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl;
import com.pushtechnology.diffusion.client.features.control.topics.views.TopicView;
import com.pushtechnology.diffusion.client.features.impl.AbstractFeature;
import com.pushtechnology.diffusion.client.features.impl.InternalTopics;
import com.pushtechnology.diffusion.client.features.impl.update.stream.UpdateStreamBuilder;
import com.pushtechnology.diffusion.client.internal.routing.TopicRouting;
import com.pushtechnology.diffusion.client.internal.session.InternalSession;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.session.SessionClosedException;
import com.pushtechnology.diffusion.client.topics.TopicSelector;
import com.pushtechnology.diffusion.client.topics.details.TopicSpecification;
import com.pushtechnology.diffusion.client.topics.details.TopicType;
import com.pushtechnology.diffusion.client.topics.impl.FetchContext;
import com.pushtechnology.diffusion.client.topics.impl.FetchRequestImpl;
import com.pushtechnology.diffusion.client.types.ErrorReport;
import com.pushtechnology.diffusion.command.commands.SetTopicDetailsLevelRequest;
import com.pushtechnology.diffusion.command.commands.TopicSelectionRequest;
import com.pushtechnology.diffusion.command.commands.UnsubscribeAllRequest;
import com.pushtechnology.diffusion.command.commands.control.topics.MissingTopicEvent;
import com.pushtechnology.diffusion.command.commands.control.topics.MissingTopicPropagationRequest;
import com.pushtechnology.diffusion.command.sender.ServiceLocator;
import com.pushtechnology.diffusion.command.sender.ServiceReference;
import com.pushtechnology.diffusion.command.services.definition.CommonServices;
import com.pushtechnology.diffusion.comms.connection.ProtocolVersion;
import com.pushtechnology.diffusion.constraints.Unconstrained;
import com.pushtechnology.diffusion.datatype.DataType;
import com.pushtechnology.diffusion.datatype.DataTypes;
import com.pushtechnology.diffusion.datatype.impl.DataTypesImpl;
import com.pushtechnology.diffusion.datatype.impl.TopicTypeToDataType;
import com.pushtechnology.diffusion.io.bytes.IBytes;
import com.pushtechnology.diffusion.java7.Functions;
import com.pushtechnology.diffusion.session.impl.SessionIdImpl;
import com.pushtechnology.diffusion.sessiontrees.SessionTreeBranchList;
import com.pushtechnology.diffusion.topics.selectors.TopicSelectorParser;
import com.pushtechnology.diffusion.topics.update.conditional.AddAndSetTopicRequest;
import com.pushtechnology.diffusion.topics.update.conditional.ApplyJSONPatchRequest;
import com.pushtechnology.diffusion.topics.update.conditional.SetTopicRequest;
import com.pushtechnology.diffusion.topics.update.update.stream.CreateUpdateStreamAndSetRequest;
import com.pushtechnology.diffusion.topics.update.update.stream.CreateUpdateStreamRequest;
import com.pushtechnology.diffusion.topics.update.update.stream.CreateUpdateStreamResponse;
import com.pushtechnology.diffusion.topics.update.update.stream.UpdateStreamAddTopicRequest;
import com.pushtechnology.diffusion.topics.update.update.stream.UpdateStreamAddTopicResponse;
import com.pushtechnology.diffusion.topics.update.update.stream.UpdateStreamId;
import com.pushtechnology.diffusion.topics.update.update.stream.UpdateStreamRequest;
import com.pushtechnology.diffusion.topics.views.CreateTopicViewResult;
import com.pushtechnology.diffusion.topics.views.GetTopicViewResult;
import com.pushtechnology.diffusion.topics.views.ListTopicViewsResult;
import com.pushtechnology.diffusion.topics.views.NamedTopicViewSpecification;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import net.jcip.annotations.Immutable;

@Immutable
public final class TopicsImpl
extends AbstractFeature
implements InternalTopics {
    private final ServiceReference<TopicSelectionRequest, Void> unsubscribeService;
    private final ServiceReference<TopicSelectionRequest, Void> subscribeService;
    private final ServiceReference<SetTopicRequest, Void> setTopicService;
    private final ServiceReference<AddAndSetTopicRequest, TopicControl.AddTopicResult> addAndSetTopicService;
    private final ServiceReference<NamedTopicViewSpecification, CreateTopicViewResult> createTopicViewService;
    private final ServiceReference<String, Void> removeTopicViewService;
    private final ServiceReference<Void, ListTopicViewsResult> listTopicViewsService;
    private final ServiceReference<String, GetTopicViewResult> getTopicViewService;
    private final ServiceReference<ApplyJSONPatchRequest, Integer> applyJsonPatchService;
    private final ServiceReference<SessionTrees.BranchMappingTable, Void> putBranchMappingTableService;
    private final ServiceReference<Void, SessionTreeBranchList> getSessionTreeBranchesWithMappingsService;
    private final ServiceReference<String, SessionTrees.BranchMappingTable> getBranchMappingTableService;
    private final ServiceReference<CreateUpdateStreamRequest, CreateUpdateStreamResponse> createUpdateStreamService;
    private final ServiceReference<CreateUpdateStreamAndSetRequest, CreateUpdateStreamResponse> createUpdateStreamAndSetService;
    private final ServiceReference<UpdateStreamAddTopicRequest, UpdateStreamAddTopicResponse> streamAddTopicService;
    private final ServiceReference<AddAndSetTopicRequest, UpdateStreamAddTopicResponse> streamAddAndSetTopicService;
    private final ServiceReference<UpdateStreamId, Void> checkUpdateStreamService;
    private final ServiceReference<UpdateStreamRequest, Void> streamSetTopicService;
    private final ServiceReference<UpdateStreamRequest, Void> streamApplyDeltaService;
    private final ServiceReference<UnsubscribeAllRequest, Void> unsubscribeAllService;
    private final TopicRouting topicRouting;
    private final TopicSelectorParser topicSelectorParser;
    private final FetchContext fetchContext;
    private final DataTypes dataTypes;
    private final TopicTypeToDataType topicTypeToDataType;
    private final MissingTopicPropagator missingTopicPropagator;

    public TopicsImpl(Session session, InternalSession internalSession, DataTypes dataTypes, TopicTypeToDataType topicTypeToDataType, TopicSelectorParser topicSelectorParser, TopicRouting topicRouting) {
        super(session, internalSession);
        this.dataTypes = dataTypes;
        this.topicSelectorParser = topicSelectorParser;
        this.topicTypeToDataType = topicTypeToDataType;
        ServiceLocator services = internalSession.getServiceLocator();
        this.subscribeService = services.obtainService(CommonServices.SUBSCRIBE);
        this.unsubscribeService = services.obtainService(CommonServices.UNSUBSCRIBE);
        this.setTopicService = services.obtainService(CommonServices.SET_TOPIC);
        this.addAndSetTopicService = services.obtainService(CommonServices.ADD_AND_SET_TOPIC);
        this.createTopicViewService = services.obtainService(CommonServices.CREATE_TOPIC_VIEW);
        this.removeTopicViewService = services.obtainService(CommonServices.REMOVE_TOPIC_VIEW);
        this.listTopicViewsService = services.obtainService(CommonServices.LIST_TOPIC_VIEWS);
        this.getTopicViewService = services.obtainService(CommonServices.GET_TOPIC_VIEW);
        this.applyJsonPatchService = services.obtainService(CommonServices.APPLY_JSON_PATCH);
        this.putBranchMappingTableService = services.obtainService(CommonServices.PUT_BRANCH_MAPPING_TABLE);
        this.getSessionTreeBranchesWithMappingsService = services.obtainService(CommonServices.GET_SESSION_TREE_BRANCHES_WITH_MAPPINGS);
        this.getBranchMappingTableService = services.obtainService(CommonServices.GET_BRANCH_MAPPING_TABLE);
        this.createUpdateStreamService = services.obtainService(CommonServices.CREATE_UPDATE_STREAM);
        this.createUpdateStreamAndSetService = services.obtainService(CommonServices.CREATE_UPDATE_STREAM_AND_SET);
        this.streamAddTopicService = services.obtainService(CommonServices.STREAM_ADD_TOPIC);
        this.streamAddAndSetTopicService = services.obtainService(CommonServices.STREAM_ADD_AND_SET_TOPIC);
        this.checkUpdateStreamService = services.obtainService(CommonServices.CHECK_UPDATE_STREAM);
        this.streamSetTopicService = services.obtainService(CommonServices.STREAM_SET_TOPIC);
        this.streamApplyDeltaService = services.obtainService(CommonServices.STREAM_APPLY_DELTA);
        this.unsubscribeAllService = services.obtainService(CommonServices.UNSUBSCRIBE_ALL);
        this.fetchContext = new FetchContext(services.obtainService(CommonServices.FETCH_QUERY), topicSelectorParser, topicTypeToDataType, session.getAttributes().getMaximumMessageSize(), dataTypes);
        this.topicRouting = topicRouting;
        this.missingTopicPropagator = internalSession.getProtocolVersion().isAtLeast(ProtocolVersion.PROTOCOL_23_VERSION) ? new MissingTopicPropagatorImpl(services.obtainService(CommonServices.MISSING_TOPIC_PROPAGATOR)) : new Protocol22MissingTopicPropagator(services.obtainService(CommonServices.MISSING_TOPIC_PROPAGATION));
    }

    @Override
    public void addTopicStream(TopicSelector topics, InternalTopics.TopicStream stream) throws SessionClosedException {
        this.topicRouting.addStream(Objects.requireNonNull(topics, "topics is null"), Objects.requireNonNull(stream, "stream is null"));
    }

    @Override
    public <V> void addStream(TopicSelector topics, Class<? extends V> valueClass, Topics.ValueStream<V> stream) throws IllegalArgumentException, SessionClosedException {
        this.dataTypes.validateValueClass(valueClass);
        this.topicRouting.addStream(Objects.requireNonNull(topics, "topics is null"), Objects.requireNonNull(valueClass, "valueClass is null"), Objects.requireNonNull(stream, "stream is null"));
    }

    @Override
    public <V> void addStream(String topics, Class<? extends V> valueClass, Topics.ValueStream<V> stream) throws IllegalArgumentException, SessionClosedException {
        this.addStream(this.topicSelectorParser.parse(Objects.requireNonNull(topics, "topics is null")), valueClass, stream);
    }

    @Override
    public <V> void addTimeSeriesStream(TopicSelector topics, Class<? extends V> eventValueClass, Topics.ValueStream<TimeSeries.Event<V>> stream) throws IllegalArgumentException, SessionClosedException {
        this.dataTypes.validateValueClass(eventValueClass);
        this.topicRouting.addTimeSeriesStream(Objects.requireNonNull(topics, "topics is null"), Objects.requireNonNull(eventValueClass, "eventValueClass is null"), Objects.requireNonNull(stream, "stream is null"));
    }

    @Override
    public <V> void addTimeSeriesStream(String topics, Class<? extends V> valueClass, Topics.ValueStream<TimeSeries.Event<V>> stream) throws IllegalArgumentException, SessionClosedException {
        this.addTimeSeriesStream(this.topicSelectorParser.parse(Objects.requireNonNull(topics, "topics is null")), valueClass, stream);
    }

    @Override
    public void removeStream(Stream stream) throws SessionClosedException {
        this.topicRouting.removeStream(Objects.requireNonNull(stream, "stream is null"));
    }

    @Override
    public <V> void addFallbackStream(Class<? extends V> valueClass, Topics.ValueStream<V> stream) throws IllegalArgumentException, SessionClosedException {
        this.dataTypes.validateValueClass(valueClass);
        this.topicRouting.addFallbackStream(Objects.requireNonNull(valueClass, "valueClass is null"), Objects.requireNonNull(stream, "stream is null"));
    }

    @Override
    public CompletableFuture<?> subscribe(TopicSelector topics, String scope) {
        return this.subscribeService.sendCommand(new TopicSelectionRequest(Objects.requireNonNull(topics).getExpression(), ClientUtils.validateSelectionScope(scope))).thenApply(Functions.identity());
    }

    @Override
    public CompletableFuture<?> subscribe(String topics, String scope) {
        return this.subscribe(this.topicSelectorParser.parse(topics), scope);
    }

    @Override
    public CompletableFuture<?> unsubscribe(TopicSelector topics, String scope) {
        return this.unsubscribeService.sendCommand(new TopicSelectionRequest(Objects.requireNonNull(topics).getExpression(), ClientUtils.validateSelectionScope(scope))).thenApply(Functions.identity());
    }

    @Override
    public CompletableFuture<?> unsubscribe(String topics, String scope) {
        return this.unsubscribe(this.topicSelectorParser.parse(topics), scope);
    }

    @Override
    public CompletableFuture<?> unsubscribeAllScopes(String topics) {
        return this.unsubscribeAllScopes(this.topicSelectorParser.parse(topics));
    }

    @Override
    public CompletableFuture<?> unsubscribeAllScopes(TopicSelector topics) {
        return this.unsubscribeAllService.sendCommand(new UnsubscribeAllRequest(Objects.requireNonNull(topics).getExpression())).thenApply(Functions.identity());
    }

    @Override
    public CompletableFuture<Object> setTopicDetailsLevel() {
        if (this.getProtocolVersion().isEarlierThan(ProtocolVersion.PROTOCOL_19_VERSION)) {
            return this.internalSession().getServiceLocator().obtainService(CommonServices.SET_TOPIC_DETAILS_LEVEL).sendCommand(SetTopicDetailsLevelRequest.FULL).thenApply(Functions.identity());
        }
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public void disableValueCaching() {
        this.topicRouting.disableValueCaching();
    }

    @Override
    public boolean supportsMissingTopicNotifications() {
        return this.getProtocolVersion().isAtLeast(ProtocolVersion.PROTOCOL_11_VERSION);
    }

    @Override
    public CompletableFuture<Void> notifyMissingTopic(TopicSelector selector, Map<String, String> sessionProperties, List<String> serverNames, List<Long> serverIds) {
        return this.missingTopicPropagator.propagate(selector, sessionProperties, serverNames, serverIds);
    }

    @Override
    public <T> CompletableFuture<?> set(String path, Class<T> valueClass, T value) {
        return this.set(path, valueClass, value, Unconstrained.get());
    }

    @Override
    public <T> CompletableFuture<?> set(String path, Class<T> valueClass, T value, UpdateConstraint constraint) {
        DataType<T> dataType = this.dataTypes.getByClass(valueClass);
        if (!(value != null || Long.class.isAssignableFrom(valueClass) || Double.class.isAssignableFrom(valueClass) || String.class.isAssignableFrom(valueClass))) {
            throw new NullPointerException("null can only be passed for int64, double or string topics");
        }
        IBytes serialisedValue = (IBytes)dataType.toBytes(value);
        return this.setTopicService.sendCommand(new SetTopicRequest(path, DataTypesImpl.toTopicType(dataType), serialisedValue, constraint)).thenApply(Functions.identity());
    }

    @Override
    public <T> CompletableFuture<TopicCreationResult> addAndSet(String path, TopicSpecification specification, Class<T> valueClass, T value) {
        return this.addAndSet(path, specification, valueClass, value, Unconstrained.get());
    }

    @Override
    public <T> CompletableFuture<TopicCreationResult> addAndSet(String path, TopicSpecification specification, Class<T> valueClass, T value, UpdateConstraint constraint) {
        DataType<T> valueDataType;
        DataType<?> specDataType = this.topicTypeToDataType.get(specification.getType());
        if (specDataType != (valueDataType = this.dataTypes.getByClass(valueClass)) && specification.getType() != TopicType.TIME_SERIES) {
            throw new IllegalArgumentException("The specification and value have different data types");
        }
        if (!(value != null || Long.class.isAssignableFrom(valueClass) || Double.class.isAssignableFrom(valueClass) || String.class.isAssignableFrom(valueClass))) {
            throw new NullPointerException("null can only be passed for int64, double or string topics");
        }
        IBytes serialisedValue = (IBytes)valueDataType.toBytes(value);
        return this.addAndSetTopicService.sendCommand(new AddAndSetTopicRequest(path, specification, serialisedValue, constraint)).thenApply(addResult -> addResult == TopicControl.AddTopicResult.CREATED ? TopicCreationResult.CREATED : TopicCreationResult.EXISTS);
    }

    @Override
    public <T> UpdateStream<T> createUpdateStream(String path, Class<T> valueClass) {
        return this.createUpdateStream(path, valueClass, Unconstrained.get());
    }

    @Override
    public <T> UpdateStream<T> createUpdateStream(String path, Class<T> valueClass, UpdateConstraint constraint) {
        return this.newUpdateStreamBuilder().constraint(constraint).build(path, valueClass);
    }

    @Override
    public <T> UpdateStream<T> createUpdateStream(String path, TopicSpecification specification, Class<T> valueClass) {
        return this.createUpdateStream(path, specification, valueClass, Unconstrained.get());
    }

    @Override
    public <T> UpdateStream<T> createUpdateStream(String path, TopicSpecification specification, Class<T> valueClass, UpdateConstraint constraint) {
        return this.newUpdateStreamBuilder().constraint(constraint).specification(specification).build(path, valueClass);
    }

    @Override
    public UpdateStream.Builder newUpdateStreamBuilder() {
        return new UpdateStreamBuilder(this.createUpdateStreamService, this.createUpdateStreamAndSetService, this.streamAddTopicService, this.streamAddAndSetTopicService, this.checkUpdateStreamService, this.streamSetTopicService, this.streamApplyDeltaService, this.topicTypeToDataType, this.dataTypes);
    }

    @Override
    public CompletableFuture<TopicUpdate.JsonPatchResult> applyJsonPatch(String path, String patch) {
        return this.applyJsonPatch(path, patch, Unconstrained.get());
    }

    @Override
    public CompletableFuture<TopicUpdate.JsonPatchResult> applyJsonPatch(String path, String patch, UpdateConstraint constraint) {
        return this.applyJsonPatchService.sendCommand(new ApplyJSONPatchRequest(Objects.requireNonNull(path, "path is null"), Objects.requireNonNull(patch, "patch is null"), Objects.requireNonNull(constraint, "constraint is null"))).thenApply(JsonPatchResultImpl::new);
    }

    @Override
    public CompletableFuture<TopicView> createTopicView(String name, String specification) throws IllegalArgumentException {
        if (Objects.requireNonNull(name, "name is null").isEmpty()) {
            throw new IllegalArgumentException("Name cannot be an empty string.");
        }
        return this.createTopicViewService.sendCommand(new NamedTopicViewSpecification(name, Objects.requireNonNull(specification, "specification is null"))).thenCompose(result -> {
            if (result.isSuccess()) {
                return CompletableFuture.completedFuture(result.getView());
            }
            List<ErrorReport> errors = result.getErrors();
            CompletableFuture future = new CompletableFuture();
            future.completeExceptionally(new ScriptException("Failed to create topic view '" + name + "'", errors));
            return future;
        });
    }

    @Override
    public CompletableFuture<List<TopicView>> listTopicViews() {
        return this.listTopicViewsService.sendCommand(null).thenApply(ListTopicViewsResult::getViews);
    }

    @Override
    public CompletableFuture<?> removeTopicView(String name) {
        return this.removeTopicViewService.sendCommand(Objects.requireNonNull(name, "name is null.")).thenApply(Functions.identity());
    }

    @Override
    public CompletableFuture<TopicView> getTopicView(String name) {
        return this.getTopicViewService.sendCommand(Objects.requireNonNull(name, "name is null.")).thenApply(GetTopicViewResult::getView);
    }

    @Override
    public Topics.FetchRequest<Void> fetchRequest() {
        return new FetchRequestImpl<Void>(this.fetchContext);
    }

    public CompletableFuture<Void> putBranchMappingTable(SessionTrees.BranchMappingTable branchMappingTable) {
        return this.putBranchMappingTableService.sendCommand(Objects.requireNonNull(branchMappingTable, "branchMappingTable is null")).thenApply(Functions.identity());
    }

    @Override
    public CompletableFuture<List<String>> listSessionTreeBranchesWithMappings() {
        return this.getSessionTreeBranchesWithMappingsService.sendCommand(null).thenApply(Functions.identity());
    }

    @Override
    public CompletableFuture<SessionTrees.BranchMappingTable> getBranchMappingTable(String sessionTreeBranch) {
        return this.getBranchMappingTableService.sendCommand(Objects.requireNonNull(sessionTreeBranch, "sessionTreeBranch is null")).thenApply(Functions.identity());
    }

    @Override
    public TopicRouting getTopicRouting() {
        return this.topicRouting;
    }

    private static final class MissingTopicPropagatorImpl
    implements MissingTopicPropagator {
        private final ServiceReference<MissingTopicEvent, Void> service;

        private MissingTopicPropagatorImpl(ServiceReference<MissingTopicEvent, Void> service) {
            this.service = service;
        }

        @Override
        public CompletableFuture<Void> propagate(TopicSelector selector, Map<String, String> sessionProperties, List<String> serverNames, List<Long> serverIds) {
            return this.service.sendCommand(new MissingTopicEvent(selector.getExpression(), sessionProperties, serverNames, serverIds)).thenApply(Functions.identity());
        }
    }

    private static interface MissingTopicPropagator {
        public CompletableFuture<Void> propagate(TopicSelector var1, Map<String, String> var2, List<String> var3, List<Long> var4);
    }

    private static final class Protocol22MissingTopicPropagator
    implements MissingTopicPropagator {
        private final ServiceReference<MissingTopicPropagationRequest, Boolean> service;

        private Protocol22MissingTopicPropagator(ServiceReference<MissingTopicPropagationRequest, Boolean> service) {
            this.service = service;
        }

        @Override
        public CompletableFuture<Void> propagate(TopicSelector selector, Map<String, String> sessionProperties, List<String> serverNames, List<Long> serverIds) {
            return this.service.sendCommand(new MissingTopicPropagationRequest(SessionIdImpl.parseString(sessionProperties.get("$SessionId")), selector.getExpression())).thenCompose(result -> CompletableFuture.completedFuture(null));
        }
    }

    private static final class JsonPatchResultImpl
    implements TopicUpdate.JsonPatchResult {
        private final int failedOperation;

        private JsonPatchResultImpl(int failedOperation) {
            this.failedOperation = failedOperation;
        }

        @Override
        public Optional<Integer> failedOperation() {
            return this.failedOperation < 0 ? Optional.empty() : Optional.of(this.failedOperation);
        }

        public String toString() {
            return this.failedOperation < 0 ? "Patch applied succesfully." : "Patch failed on operation <" + this.failedOperation + ">.";
        }
    }
}

