/*
 * Decompiled with CFR 0.152.
 */
package com.pushtechnology.diffusion.client.internal.routing;

import com.pushtechnology.diffusion.client.callbacks.ErrorReason;
import com.pushtechnology.diffusion.client.callbacks.Stream;
import com.pushtechnology.diffusion.client.features.TimeSeries;
import com.pushtechnology.diffusion.client.features.Topics;
import com.pushtechnology.diffusion.client.features.impl.InternalTopics;
import com.pushtechnology.diffusion.client.internal.routing.DataTypeTopicCacheEntry;
import com.pushtechnology.diffusion.client.internal.routing.InternedTopicSpecifications;
import com.pushtechnology.diffusion.client.internal.routing.NoConversionTopicCacheEntry;
import com.pushtechnology.diffusion.client.internal.routing.StreamProxy;
import com.pushtechnology.diffusion.client.internal.routing.TimeSeriesEventStreamProxy;
import com.pushtechnology.diffusion.client.internal.routing.TimeSeriesTopicCacheEntry;
import com.pushtechnology.diffusion.client.internal.routing.TopicCacheEntry;
import com.pushtechnology.diffusion.client.internal.routing.TopicCacheEntryImpl;
import com.pushtechnology.diffusion.client.internal.routing.TopicRouting;
import com.pushtechnology.diffusion.client.internal.routing.TopicStreamProxy;
import com.pushtechnology.diffusion.client.internal.routing.ValueStreamProxy;
import com.pushtechnology.diffusion.client.internal.services.MutableServiceRegistry;
import com.pushtechnology.diffusion.client.internal.session.InternalSession;
import com.pushtechnology.diffusion.client.internal.session.SessionErrorImpl;
import com.pushtechnology.diffusion.client.topics.TopicSelector;
import com.pushtechnology.diffusion.client.topics.details.TopicSpecification;
import com.pushtechnology.diffusion.client.topics.details.TopicType;
import com.pushtechnology.diffusion.collections.ImmutableSet;
import com.pushtechnology.diffusion.collections.InternSet;
import com.pushtechnology.diffusion.collections.WeakInternSet;
import com.pushtechnology.diffusion.command.sender.ServiceLocator;
import com.pushtechnology.diffusion.datatype.DataType;
import com.pushtechnology.diffusion.datatype.DataTypes;
import com.pushtechnology.diffusion.datatype.impl.TopicTypeToDataType;
import com.pushtechnology.diffusion.io.bytes.IBytes;
import com.pushtechnology.diffusion.logs.i18n.I18nLogger;
import com.pushtechnology.diffusion.threads.InboundThreadOnly;
import com.pushtechnology.diffusion.util.concurrent.threads.CommonThreadPools;
import com.pushtechnology.diffusion.util.concurrent.threads.ExecutionPool;
import com.pushtechnology.diffusion.utils.ConfigurationUtils;
import com.pushtechnology.repackaged.picocontainer.Disposable;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.function.Supplier;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;

@ThreadSafe
public class TopicRoutingImpl
implements TopicRouting,
Disposable {
    private static final Logger LOG = I18nLogger.getLogger(TopicRoutingImpl.class);
    protected static final int INITIAL_CAPACITY = ConfigurationUtils.getIntegerSystemProperty("diffusion.topic_routing.initial_capacity", 1024);
    private final InternedTopicSpecifications internedSpecifications;
    private final DataTypes dataTypes;
    private final TopicTypeToDataType topicTypeToDataType;
    private final ExecutionPool inboundThreadPool;
    @InboundThreadOnly
    private final Map<Integer, TopicCacheEntry> byId = new HashMap<Integer, TopicCacheEntry>(INITIAL_CAPACITY, 0.75f);
    @InboundThreadOnly
    private final Map<TopicSelector, ImmutableSet<StreamProxy>> streamProxies = new HashMap<TopicSelector, ImmutableSet<StreamProxy>>();
    private final InternSet<ImmutableSet<StreamProxy>> internedStreamProxies = new WeakInternSet<ImmutableSet<StreamProxy>>();
    @InboundThreadOnly
    private ImmutableSet<StreamProxy> fallbackProxies = ImmutableSet.empty();
    @InboundThreadOnly
    private final Set<StreamProxy> temporaryHashSet = new HashSet<StreamProxy>();
    private volatile boolean cacheDisabled = ConfigurationUtils.getBooleanSystemProperty("diffusion.disabletopicvaluecache");
    private volatile boolean closed = false;

    public TopicRoutingImpl(DataTypes dataTypes, TopicTypeToDataType topicTypeToDataType, CommonThreadPools threadPools, InternedTopicSpecifications internedSpecifications) {
        this.dataTypes = dataTypes;
        this.topicTypeToDataType = topicTypeToDataType;
        this.internedSpecifications = internedSpecifications;
        this.inboundThreadPool = threadPools.getDefaultInboundThreadPool();
    }

    @Override
    public void wireSesssionServices(MutableServiceRegistry localServices, ServiceLocator remoteServices) {
    }

    @Override
    @InboundThreadOnly
    public final void notifySubscription(int topicId, String topicPath, TopicSpecification specification) {
        TopicCacheEntryImpl entry;
        TopicSpecification internedSpecification = this.internedSpecifications.intern(specification);
        ImmutableSet<StreamProxy> streamsForTopicPath = this.streamsForTopicPath(topicPath);
        TopicType topicType = internedSpecification.getType();
        if (this.cacheDisabled || topicType == TopicType.UNKNOWN_TOPIC_TYPE) {
            entry = new NoConversionTopicCacheEntry(topicPath, internedSpecification, streamsForTopicPath);
        } else if (topicType == TopicType.TIME_SERIES) {
            String eventValueTypeName = internedSpecification.getProperties().get("TIME_SERIES_EVENT_VALUE_TYPE");
            DataType<?> eventValueType = this.dataTypes.getByName(eventValueTypeName);
            entry = new TimeSeriesTopicCacheEntry(topicPath, internedSpecification, eventValueType, streamsForTopicPath);
        } else {
            entry = new DataTypeTopicCacheEntry(topicPath, internedSpecification, this.topicTypeToDataType.get(topicType), streamsForTopicPath);
        }
        TopicCacheEntry oldById = this.putCached(topicId, entry);
        if (oldById != null) {
            throw new IllegalStateException("Existing entry " + String.valueOf(oldById) + " found for id " + topicId + " when adding " + String.valueOf(entry));
        }
        entry.notifyInitialStreamsOfSubscription(this.fallbackProxies);
    }

    @Override
    public final void notifyUnsubscriptionOfAllTopics() {
        this.runIfNotClosed(() -> {
            for (TopicCacheEntry removed : this.byId.values()) {
                this.notifyStreamsOfUnsubscription(removed, Topics.UnsubscribeReason.SUBSCRIPTION_REFRESH);
            }
            this.byId.clear();
        });
    }

    @Override
    @InboundThreadOnly
    public final void notifyValue(InternalSession session, int topicId, IBytes data) {
        if (this.closed) {
            return;
        }
        TopicCacheEntry entry = this.getCached(topicId);
        if (entry == null) {
            session.getErrorHandler().notifyError(new SessionErrorImpl("Data loss on topic with ID " + topicId + " : session closing", null));
            session.close();
            return;
        }
        entry.handleValue(this.dataTypes, data, this.fallbackProxies);
    }

    @InboundThreadOnly
    protected final TopicCacheEntry getCached(int topicId) {
        return this.byId.get(topicId);
    }

    @InboundThreadOnly
    protected TopicCacheEntry putCached(int topicId, TopicCacheEntry entry) {
        return this.byId.put(topicId, entry);
    }

    @InboundThreadOnly
    protected TopicCacheEntry removeCached(int topicId) {
        return this.byId.remove(topicId);
    }

    @Override
    @InboundThreadOnly
    public final void notifyDelta(InternalSession session, int topicId, IBytes data) {
        if (this.closed) {
            return;
        }
        TopicCacheEntry entry = this.getCached(topicId);
        if (entry == null) {
            session.getErrorHandler().notifyError(new SessionErrorImpl("Data loss on topic with ID " + topicId + " - possibly due to reconnection : session closing", null));
            session.close();
            return;
        }
        this.handleDelta(data, entry);
    }

    private void handleDelta(IBytes data, TopicCacheEntry entry) {
        entry.handleDelta(this.dataTypes, data, this.fallbackProxies);
    }

    @Override
    @InboundThreadOnly
    public final void notifyUnsubscription(int topicId, Topics.UnsubscribeReason reason) {
        TopicCacheEntry removed = this.removeCached(topicId);
        if (removed != null) {
            this.notifyStreamsOfUnsubscription(removed, reason);
        }
    }

    private void notifyStreamsOfUnsubscription(TopicCacheEntry removed, Topics.UnsubscribeReason reason) {
        removed.notifyStreamsOfUnsubscription(reason, this.fallbackProxies);
    }

    @Override
    public final <V> void addStream(TopicSelector topics, Class<? extends V> valueClass, Topics.ValueStream<V> stream) {
        this.addStreamProxy(topics, () -> new ValueStreamProxy(valueClass, stream));
    }

    @Override
    public final void addStream(TopicSelector topics, InternalTopics.TopicStream stream) {
        this.addStreamProxy(topics, () -> new TopicStreamProxy(stream));
    }

    private void addStreamProxy(TopicSelector selector, Supplier<StreamProxy> proxyFactory) {
        this.runIfNotClosed(() -> {
            ImmutableSet<StreamProxy> newProxies;
            ImmutableSet<StreamProxy> previous = this.streamProxies.get(selector);
            StreamProxy proxy = (StreamProxy)proxyFactory.get();
            if (previous == null) {
                newProxies = ImmutableSet.of(new StreamProxy[]{proxy});
            } else {
                newProxies = previous.with(proxy);
                if (newProxies == previous) {
                    return;
                }
            }
            this.streamProxies.put(selector, this.internedStreamProxies.intern(newProxies));
            for (TopicCacheEntry cacheEntry : this.byId.values()) {
                if (!selector.selects(cacheEntry.getTopicPath())) continue;
                cacheEntry.addStream(this.dataTypes, proxy, this.fallbackProxies, this.internedStreamProxies);
            }
        });
    }

    @Override
    public final <V> void addFallbackStream(Class<? extends V> valueClass, Topics.ValueStream<V> stream) {
        this.addFallbackStreamProxy(() -> new ValueStreamProxy(valueClass, stream));
    }

    private void addFallbackStreamProxy(Supplier<StreamProxy> proxyFactory) {
        this.runIfNotClosed(() -> {
            ImmutableSet<StreamProxy> previous = this.fallbackProxies;
            StreamProxy proxy = (StreamProxy)proxyFactory.get();
            ImmutableSet<StreamProxy> newProxies = previous.with(proxy);
            if (newProxies == previous) {
                return;
            }
            this.fallbackProxies = newProxies;
            for (TopicCacheEntry cacheEntry : this.byId.values()) {
                cacheEntry.notifyFallbackSubscription(this.dataTypes, proxy);
            }
        });
    }

    @Override
    public final <V> void addTimeSeriesStream(TopicSelector topics, Class<? extends V> eventValueClass, Topics.ValueStream<TimeSeries.Event<V>> stream) {
        this.addStreamProxy(topics, () -> new TimeSeriesEventStreamProxy(eventValueClass, stream));
    }

    @Override
    public final void removeStream(Stream stream) {
        Predicate<StreamProxy> isProxyForStream = v -> v.getStream().equals(stream);
        this.runIfNotClosed(() -> {
            boolean removed = this.removeFallbackStreamProxy(isProxyForStream);
            if (this.removeStreamProxy(isProxyForStream)) {
                removed = true;
                for (TopicCacheEntry entry : this.byId.values()) {
                    entry.removeStream(this.dataTypes, isProxyForStream, this.fallbackProxies, this.internedStreamProxies);
                }
            }
            if (removed) {
                try {
                    stream.onClose();
                }
                catch (Exception ex) {
                    LOG.error("TOPICS_STREAM_EXCEPTION", (Object)stream, (Object)ex);
                }
            }
        });
    }

    @InboundThreadOnly
    private boolean removeFallbackStreamProxy(Predicate<StreamProxy> isProxyForStream) {
        ImmutableSet<StreamProxy> current = this.fallbackProxies;
        ImmutableSet<StreamProxy> filtered = this.fallbackProxies.withoutFirst(isProxyForStream);
        if (filtered != current) {
            this.fallbackProxies = filtered;
            return true;
        }
        return false;
    }

    @InboundThreadOnly
    private boolean removeStreamProxy(Predicate<StreamProxy> isProxyForStream) {
        boolean removed = false;
        Iterator<Map.Entry<TopicSelector, ImmutableSet<StreamProxy>>> entries = this.streamProxies.entrySet().iterator();
        while (entries.hasNext()) {
            Map.Entry<TopicSelector, ImmutableSet<StreamProxy>> entry = entries.next();
            ImmutableSet<StreamProxy> filtered = entry.getValue().withoutFirst(isProxyForStream);
            if (filtered.isEmpty()) {
                entries.remove();
                removed = true;
                continue;
            }
            if (filtered == entry.getValue()) continue;
            entry.setValue(this.internedStreamProxies.intern(filtered));
            removed = true;
        }
        return removed;
    }

    @InboundThreadOnly
    private ImmutableSet<StreamProxy> streamsForTopicPath(String topicPath) {
        Map<TopicSelector, ImmutableSet<StreamProxy>> proxies = this.streamProxies;
        if (proxies.isEmpty()) {
            return ImmutableSet.empty();
        }
        Set<StreamProxy> selected = this.temporaryHashSet;
        for (Map.Entry<TopicSelector, ImmutableSet<StreamProxy>> entry : proxies.entrySet()) {
            if (!entry.getKey().selects(topicPath)) continue;
            selected.addAll((Collection<StreamProxy>)entry.getValue());
        }
        ImmutableSet<StreamProxy> result = ImmutableSet.from(selected);
        selected.clear();
        return this.internedStreamProxies.intern(result);
    }

    @Override
    public final void disableValueCaching() {
        this.cacheDisabled = true;
    }

    @Override
    public final void dispose() {
        this.runIfNotClosed(() -> {
            this.closed = true;
            this.byId.clear();
            Set<StreamProxy> toClose = this.temporaryHashSet;
            toClose.addAll(this.fallbackProxies);
            for (Set set : this.streamProxies.values()) {
                toClose.addAll(set);
            }
            for (StreamProxy streamProxy : toClose) {
                streamProxy.onError(ErrorReason.SESSION_CLOSED);
            }
            toClose.clear();
            this.streamProxies.clear();
            this.fallbackProxies = ImmutableSet.empty();
        });
    }

    private void runIfNotClosed(Runnable r) {
        if (!this.closed) {
            this.inboundThreadPool.execute(this, () -> {
                if (!this.closed) {
                    r.run();
                }
            });
        }
    }
}

