/*
 * Decompiled with CFR 0.152.
 */
package com.pushtechnology.diffusion.client.internal.routing;

import com.pushtechnology.diffusion.client.callbacks.ErrorReason;
import com.pushtechnology.diffusion.client.features.Topics;
import com.pushtechnology.diffusion.client.internal.routing.StreamProxy;
import com.pushtechnology.diffusion.client.internal.routing.TopicCacheEntry;
import com.pushtechnology.diffusion.client.internal.routing.TopicStreamProxy;
import com.pushtechnology.diffusion.client.internal.routing.ValueStreamProxy;
import com.pushtechnology.diffusion.client.topics.details.TopicSpecification;
import com.pushtechnology.diffusion.collections.ImmutableSet;
import com.pushtechnology.diffusion.collections.InternSet;
import com.pushtechnology.diffusion.datatype.BinaryDelta;
import com.pushtechnology.diffusion.datatype.DataTypes;
import com.pushtechnology.diffusion.io.bytes.IBytes;
import com.pushtechnology.diffusion.threads.InboundThreadOnly;
import java.util.function.Consumer;
import java.util.function.Predicate;

@InboundThreadOnly
abstract class TopicCacheEntryImpl
implements TopicCacheEntry {
    private final String path;
    private final TopicSpecification specification;
    private ImmutableSet<StreamProxy> proxies;

    TopicCacheEntryImpl(String topicPath, TopicSpecification topicSpecification, ImmutableSet<StreamProxy> streamProxies) {
        this.path = topicPath;
        this.specification = topicSpecification;
        this.proxies = streamProxies;
    }

    @Override
    public final String getTopicPath() {
        return this.path;
    }

    @Override
    public final void notifyInitialStreamsOfSubscription(ImmutableSet<StreamProxy> fallbackStreams) {
        this.invokeStreams(fallbackStreams, a -> a.onSubscription(this.path, this.specification));
    }

    @Override
    public final void notifyFallbackSubscription(DataTypes dataTypes, StreamProxy fallbackStream) {
        if (this.useFallbackStreams() && this.isCompatibleStream(fallbackStream)) {
            this.notifySubscription(dataTypes, fallbackStream);
        }
    }

    private void notifySubscription(DataTypes dataTypes, StreamProxy stream) {
        stream.onSubscription(this.path, this.specification);
        IBytes value = this.currentValue();
        if (value != null && stream instanceof ValueStreamProxy) {
            this.valueStreamOnValue(dataTypes, (ValueStreamProxy)stream, this.path, this.specification, null, value);
        }
    }

    @Override
    public final void notifyStreamsOfUnsubscription(Topics.UnsubscribeReason reason, ImmutableSet<StreamProxy> fallbackStreams) {
        this.invokeStreams(fallbackStreams, a -> a.onUnsubscription(this.path, this.specification, reason));
    }

    private void invokeStreams(ImmutableSet<StreamProxy> fallbackStreams, Consumer<StreamProxy> action) {
        if (this.useFallbackStreams()) {
            this.invokeCompatibleFallbackStreams(fallbackStreams, action);
        } else {
            for (StreamProxy f : this.proxies) {
                action.accept(f);
            }
        }
    }

    private void invokeCompatibleFallbackStreams(ImmutableSet<StreamProxy> streamProxies, Consumer<StreamProxy> action) {
        for (StreamProxy f : streamProxies) {
            if (!this.isCompatibleStream(f)) continue;
            action.accept(f);
        }
    }

    private boolean isCompatibleStream(StreamProxy streamProxy) {
        if (streamProxy instanceof ValueStreamProxy) {
            return this.isCompatibleStream((ValueStreamProxy)streamProxy, this.specification);
        }
        return true;
    }

    protected abstract <V> boolean isCompatibleStream(ValueStreamProxy<V> var1, TopicSpecification var2);

    protected final void notifyValue(DataTypes dataTypes, IBytes oldValue, IBytes newValue, ImmutableSet<StreamProxy> fallbackStreams) {
        this.invokeStreams(fallbackStreams, s -> {
            if (s instanceof ValueStreamProxy) {
                this.valueStreamOnValue(dataTypes, (ValueStreamProxy)s, this.path, this.specification, oldValue, newValue);
            } else {
                ((TopicStreamProxy)s).onValue(this.path, this.specification, newValue);
            }
        });
    }

    protected abstract <V> void valueStreamOnValue(DataTypes var1, ValueStreamProxy<V> var2, String var3, TopicSpecification var4, IBytes var5, IBytes var6);

    protected final void notifyDelta(DataTypes dataTypes, IBytes data, BinaryDelta delta, IBytes oldValue, IBytes newValue, ImmutableSet<StreamProxy> fallbackStreams) {
        this.invokeStreams(fallbackStreams, s -> {
            if (s instanceof ValueStreamProxy) {
                this.valueStreamOnDelta(dataTypes, (ValueStreamProxy)s, this.path, this.specification, delta, oldValue, newValue);
            } else {
                IBytes copy = IBytes.toIBytes(data.toByteArray());
                ((TopicStreamProxy)s).onDelta(this.path, this.specification, copy);
            }
        });
    }

    protected final void notifyError(ImmutableSet<StreamProxy> fallbackStreams, ErrorReason reason) {
        this.invokeStreams(fallbackStreams, s -> s.onError(reason));
    }

    protected abstract <V> void valueStreamOnDelta(DataTypes var1, ValueStreamProxy<V> var2, String var3, TopicSpecification var4, BinaryDelta var5, IBytes var6, IBytes var7);

    private boolean useFallbackStreams() {
        return this.proxies.isEmpty();
    }

    @Override
    public final void addStream(DataTypes dataTypes, StreamProxy streamProxy, ImmutableSet<StreamProxy> fallbackStreams, InternSet<ImmutableSet<StreamProxy>> internedStreamProxies) {
        if (this.isCompatibleStream(streamProxy)) {
            ImmutableSet<StreamProxy> newProxies;
            if (this.useFallbackStreams()) {
                this.invokeCompatibleFallbackStreams(fallbackStreams, a -> a.onUnsubscription(this.path, this.specification, Topics.UnsubscribeReason.STREAM_CHANGE));
            }
            if (this.proxies != (newProxies = this.proxies.with(streamProxy))) {
                this.proxies = internedStreamProxies.intern(newProxies);
                this.notifySubscription(dataTypes, streamProxy);
            }
        }
    }

    @Override
    public final void removeStream(DataTypes dataTypes, Predicate<StreamProxy> predicate, ImmutableSet<StreamProxy> fallbackStreams, InternSet<ImmutableSet<StreamProxy>> internedStreamProxies) {
        if (!this.useFallbackStreams()) {
            this.proxies = internedStreamProxies.intern(this.proxies.withoutFirst(predicate));
            if (this.useFallbackStreams()) {
                this.invokeCompatibleFallbackStreams(fallbackStreams, p -> this.notifySubscription(dataTypes, (StreamProxy)p));
            }
        }
    }

    protected abstract IBytes currentValue();

    public final String toString() {
        StringBuilder sb = new StringBuilder(128);
        sb.append(this.path).append(' ').append(this.specification).append(" -> ").append(this.proxies.size()).append(" stream(s)");
        return sb.toString();
    }
}

