/*
 * Decompiled with CFR 0.152.
 */
package com.pushtechnology.diffusion.client.internal.routing;

import com.pushtechnology.diffusion.client.callbacks.ErrorReason;
import com.pushtechnology.diffusion.client.features.TimeSeries;
import com.pushtechnology.diffusion.client.internal.routing.StreamProxy;
import com.pushtechnology.diffusion.client.internal.routing.TimeSeriesEventStreamProxy;
import com.pushtechnology.diffusion.client.internal.routing.TopicCacheEntryImpl;
import com.pushtechnology.diffusion.client.internal.routing.ValueStreamProxy;
import com.pushtechnology.diffusion.client.topics.details.TopicSpecification;
import com.pushtechnology.diffusion.client.topics.details.TopicType;
import com.pushtechnology.diffusion.collections.ImmutableSet;
import com.pushtechnology.diffusion.datatype.BinaryDelta;
import com.pushtechnology.diffusion.datatype.Bytes;
import com.pushtechnology.diffusion.datatype.DataType;
import com.pushtechnology.diffusion.datatype.DataTypes;
import com.pushtechnology.diffusion.datatype.DeltaType;
import com.pushtechnology.diffusion.datatype.InvalidDataException;
import com.pushtechnology.diffusion.io.bytes.IBytes;
import com.pushtechnology.diffusion.logs.i18n.I18nLogger;
import com.pushtechnology.diffusion.threads.InboundThreadOnly;
import com.pushtechnology.diffusion.timeseries.datatype.TimeSeriesEventDataType;
import org.slf4j.Logger;

@InboundThreadOnly
class DataTypeTopicCacheEntry<T>
extends TopicCacheEntryImpl {
    private static final Logger LOG = I18nLogger.getLogger(DataTypeTopicCacheEntry.class);
    private final DataType<T> dataType;
    private final boolean dontRetainValue;
    private IBytes value = null;

    private static boolean isCompatibleStream(DataType<?> dataType, StreamProxy streamProxy, TopicSpecification topicSpecification) {
        if (streamProxy instanceof TimeSeriesEventStreamProxy) {
            return topicSpecification.getType() == TopicType.TIME_SERIES;
        }
        if (streamProxy instanceof ValueStreamProxy) {
            return DataTypeTopicCacheEntry.isCompatibleStream(dataType, (ValueStreamProxy)streamProxy);
        }
        return true;
    }

    private static <V> boolean isCompatibleStream(DataType<?> dataType, ValueStreamProxy<V> streamProxy) {
        return dataType instanceof TimeSeriesEventDataType ? DataTypeTopicCacheEntry.isCompatibleStream((TimeSeriesEventDataType)dataType, streamProxy) : dataType.canReadAs(streamProxy.getValueClass());
    }

    private static <V> boolean isCompatibleStream(TimeSeriesEventDataType<?> dataType, ValueStreamProxy<V> streamProxy) {
        return dataType.canReadEventValueAs(streamProxy.getValueClass());
    }

    DataTypeTopicCacheEntry(String topicPath, TopicSpecification topicSpecification, DataType<T> dataType, ImmutableSet<StreamProxy> unfilteredProxies) {
        super(topicPath, topicSpecification, unfilteredProxies.without(s -> !DataTypeTopicCacheEntry.isCompatibleStream(dataType, s, topicSpecification)));
        this.dataType = dataType;
        this.dontRetainValue = Boolean.parseBoolean(topicSpecification.getProperties().get("DONT_RETAIN_VALUE"));
    }

    @Override
    public final void handleValue(DataTypes dataTypes, IBytes data, ImmutableSet<StreamProxy> fallbackStreamProxies) {
        this.notifyValue(dataTypes, this.value, data, fallbackStreamProxies);
        if (!this.dontRetainValue) {
            this.value = data;
        }
    }

    @Override
    public final void handleDelta(DataTypes dataTypes, IBytes data, ImmutableSet<StreamProxy> fallbackStreamProxies) {
        IBytes oldValue = this.value;
        if (oldValue == null) {
            LOG.error("TOPIC_CACHE_DELTA_NO_VALUE", (Object)this);
            this.notifyError(fallbackStreamProxies, ErrorReason.COMMUNICATION_FAILURE);
        } else {
            BinaryDelta binaryDelta;
            try {
                DeltaType<Bytes, BinaryDelta> deltaType = this.dataType.serializedBinaryDeltaType();
                binaryDelta = deltaType.readDelta(data);
                this.value = IBytes.toIBytes(deltaType.apply(this.value, binaryDelta));
            }
            catch (InvalidDataException e) {
                LOG.error("TOPIC_CACHE_CANT_APPLY_DELTA", (Object)this, (Object)e);
                this.notifyError(fallbackStreamProxies, ErrorReason.INVALID_DATA);
                return;
            }
            this.notifyDelta(dataTypes, data, binaryDelta, oldValue, this.value, fallbackStreamProxies);
        }
    }

    @Override
    protected final IBytes currentValue() {
        return this.value;
    }

    @Override
    protected <V> boolean isCompatibleStream(ValueStreamProxy<V> streamProxy, TopicSpecification specification) {
        return DataTypeTopicCacheEntry.isCompatibleStream(this.dataType, streamProxy);
    }

    @Override
    protected final <V> void valueStreamOnValue(DataTypes dataTypes, ValueStreamProxy<V> streamProxy, String path, TopicSpecification specification, IBytes oldValue, IBytes newValue) {
        V newV;
        Object oldV;
        Class<V> valueClass = streamProxy.getValueClass();
        try {
            if (DataTypeTopicCacheEntry.isDeserializeAsTimeSeriesEvent(this.dataType, streamProxy)) {
                TimeSeriesEventDataType tsDatatype = (TimeSeriesEventDataType)this.dataType;
                oldV = oldValue != null ? (Object)DataTypeTopicCacheEntry.getEventValue(tsDatatype, valueClass, oldValue) : null;
                newV = DataTypeTopicCacheEntry.getEventValue(tsDatatype, valueClass, newValue);
            } else {
                DataType<V> actualDatatype = streamProxy instanceof TimeSeriesEventStreamProxy ? TimeSeriesEventDataType.create(dataTypes.getByClass(((TimeSeriesEventStreamProxy)streamProxy).getEventValueClass())) : this.dataType;
                oldV = oldValue != null ? (Object)actualDatatype.readAs(valueClass, oldValue) : null;
                newV = actualDatatype.readAs(valueClass, newValue);
            }
        }
        catch (InvalidDataException | IllegalArgumentException e) {
            if (LOG.isErrorEnabled()) {
                LOG.error("TOPIC_CACHE_CANT_CONVERT_VALUE", valueClass, this.getTopicPath(), e);
            }
            streamProxy.onError(ErrorReason.INVALID_DATA);
            return;
        }
        streamProxy.onValue(path, specification, oldV, newV);
    }

    @Override
    protected final <V> void valueStreamOnDelta(DataTypes dataTypes, ValueStreamProxy<V> streamProxy, String path, TopicSpecification specification, BinaryDelta delta, IBytes oldValue, IBytes newValue) {
        V newV;
        V oldV;
        Class<V> valueClass = streamProxy.getValueClass();
        try {
            if (DataTypeTopicCacheEntry.isDeserializeAsTimeSeriesEvent(this.dataType, streamProxy)) {
                TimeSeriesEventDataType tsDatatype = (TimeSeriesEventDataType)this.dataType;
                oldV = DataTypeTopicCacheEntry.getEventValue(tsDatatype, valueClass, oldValue);
                newV = DataTypeTopicCacheEntry.getEventValue(tsDatatype, valueClass, newValue);
            } else {
                DataType<V> actualDatatype = streamProxy instanceof TimeSeriesEventStreamProxy ? TimeSeriesEventDataType.create(dataTypes.getByClass(((TimeSeriesEventStreamProxy)streamProxy).getEventValueClass())) : this.dataType;
                oldV = actualDatatype.readAs(valueClass, oldValue);
                newV = actualDatatype.readAs(valueClass, newValue);
            }
        }
        catch (InvalidDataException | IllegalArgumentException e) {
            if (LOG.isErrorEnabled()) {
                LOG.error("TOPIC_CACHE_CANT_CONVERT_VALUE", valueClass, this.getTopicPath(), e);
            }
            streamProxy.onError(ErrorReason.INVALID_DATA);
            return;
        }
        streamProxy.onDelta(path, specification, delta, oldV, newV);
    }

    private static <V, U> V getEventValue(TimeSeriesEventDataType<U> timeSeriesEventDataType, Class<? extends V> valueClass, IBytes event) {
        TimeSeries.Event e = (TimeSeries.Event)timeSeriesEventDataType.readValue(event);
        return timeSeriesEventDataType.readEventValueAs(valueClass, e);
    }

    private static boolean isDeserializeAsTimeSeriesEvent(DataType<?> dataType, ValueStreamProxy<?> streamProxy) {
        return dataType instanceof TimeSeriesEventDataType && !(streamProxy instanceof TimeSeriesEventStreamProxy);
    }
}

