/*
 * Decompiled with CFR 0.152.
 */
package com.pushtechnology.diffusion.client.features.impl.update.stream;

import com.pushtechnology.diffusion.client.features.InvalidUpdateStreamException;
import com.pushtechnology.diffusion.client.features.TopicCreationResult;
import com.pushtechnology.diffusion.client.features.impl.update.stream.InternalUpdateStream;
import com.pushtechnology.diffusion.client.features.impl.update.stream.InvalidSetStream;
import com.pushtechnology.diffusion.client.features.impl.update.stream.SetStream;
import com.pushtechnology.diffusion.client.features.impl.update.stream.UpdateStreamImpl;
import com.pushtechnology.diffusion.command.sender.ServiceReference;
import com.pushtechnology.diffusion.datatype.DataType;
import com.pushtechnology.diffusion.datatype.primitive.impl.DoubleDataTypeImpl;
import com.pushtechnology.diffusion.datatype.primitive.impl.Int64DataTypeImpl;
import com.pushtechnology.diffusion.datatype.primitive.impl.StringDataTypeImpl;
import com.pushtechnology.diffusion.io.bytes.IBytes;
import com.pushtechnology.diffusion.topics.update.update.stream.CreateUpdateStreamResponse;
import com.pushtechnology.diffusion.topics.update.update.stream.UpdateStreamId;
import com.pushtechnology.diffusion.topics.update.update.stream.UpdateStreamRequest;
import com.pushtechnology.diffusion.util.concurrent.threads.WaitProtectedCompletableFuture;
import com.pushtechnology.diffusion.utils.tuple.Pair;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import net.jcip.annotations.GuardedBy;
import net.jcip.annotations.ThreadSafe;

@ThreadSafe
final class PendingSetStream<T>
implements InternalUpdateStream<T> {
    private final UpdateStreamImpl<T> stream;
    private final CompletableFuture<TopicCreationResult> pendingResult;
    private final T initialValue;
    private final DataType<T> dataType;
    private final BiFunction<T, T, IBytes> toValueBytes;
    private final BiFunction<T, T, IBytes> toValueOrDeltaBytes;
    private final ServiceReference<UpdateStreamId, Void> validateService;
    private final ServiceReference<UpdateStreamRequest, Void> updateService;
    private final ServiceReference<UpdateStreamRequest, Void> setService;
    @GuardedBy(value="stream")
    private T nextValue;
    @GuardedBy(value="stream")
    private final List<Pair<CompletableFuture<TopicCreationResult>, T>> deferredUpdates;

    PendingSetStream(UpdateStreamImpl<T> stream, CompletableFuture<TopicCreationResult> pendingResult, T initialValue, DataType<T> dataType, BiFunction<T, T, IBytes> toValueBytes, BiFunction<T, T, IBytes> toValueOrDeltaBytes, ServiceReference<UpdateStreamId, Void> validateService, ServiceReference<UpdateStreamRequest, Void> updateService, ServiceReference<UpdateStreamRequest, Void> setService) {
        this.stream = stream;
        this.pendingResult = pendingResult;
        this.initialValue = initialValue;
        this.dataType = dataType;
        this.toValueBytes = toValueBytes;
        this.toValueOrDeltaBytes = toValueOrDeltaBytes;
        this.validateService = validateService;
        this.updateService = updateService;
        this.setService = setService;
        this.nextValue = initialValue;
        this.deferredUpdates = new ArrayList<Pair<CompletableFuture<TopicCreationResult>, T>>(0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public T get() {
        UpdateStreamImpl<T> updateStreamImpl = this.stream;
        synchronized (updateStreamImpl) {
            return this.nextValue;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<TopicCreationResult> set(T value) {
        if (!(value != null || this.dataType instanceof Int64DataTypeImpl || this.dataType instanceof DoubleDataTypeImpl || this.dataType instanceof StringDataTypeImpl)) {
            throw new IllegalArgumentException("null can only be passed for int64, double or string topics");
        }
        WaitProtectedCompletableFuture<TopicCreationResult> result = new WaitProtectedCompletableFuture<TopicCreationResult>();
        UpdateStreamImpl<T> updateStreamImpl = this.stream;
        synchronized (updateStreamImpl) {
            this.deferredUpdates.add(Pair.of(result, value));
            this.nextValue = value;
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean onSetComplete(CreateUpdateStreamResponse response) {
        UpdateStreamImpl<T> updateStreamImpl = this.stream;
        synchronized (updateStreamImpl) {
            this.stream.setDelegate(new SetStream<T>(this.stream, response, this.validateService, response.isRetainsValue() ? this.updateService : this.setService, response.isRetainsValue() ? this.toValueOrDeltaBytes : this.toValueBytes, this.nextValue));
            if (!this.deferredUpdates.isEmpty()) {
                if (response.isRetainsValue() && response.isSupportsConflation()) {
                    this.processPendingUpdatesRetainsValue(response);
                } else {
                    this.processPendingUpdatesDoesntRetainValue(response);
                }
            }
        }
        return true;
    }

    private void processPendingUpdatesRetainsValue(CreateUpdateStreamResponse response) {
        T last = this.deferredUpdates.get(this.deferredUpdates.size() - 1).getSecond();
        if (!Objects.equals(this.initialValue, last)) {
            this.updateService.sendCommand(new UpdateStreamRequest(response.getStreamId(), this.toValueOrDeltaBytes.apply(this.initialValue, last))).whenComplete((id, e) -> {
                if (e != null) {
                    this.stream.onSetFailed((Throwable)e);
                    this.deferredUpdates.forEach(update -> ((CompletableFuture)update.getFirst()).completeExceptionally((Throwable)e));
                } else {
                    this.stream.onSetComplete(response);
                    this.deferredUpdates.forEach(update -> ((CompletableFuture)update.getFirst()).complete(TopicCreationResult.EXISTS));
                }
            });
        } else {
            this.deferredUpdates.forEach(update -> ((CompletableFuture)update.getFirst()).complete(TopicCreationResult.EXISTS));
        }
    }

    private void processPendingUpdatesDoesntRetainValue(CreateUpdateStreamResponse response) {
        this.deferredUpdates.forEach(update -> ((CompletableFuture)this.setService.sendCommand(new UpdateStreamRequest(response.getStreamId(), (IBytes)this.dataType.toBytes(update.getSecond()))).thenAccept(r -> {
            this.stream.onSetComplete(response);
            ((CompletableFuture)update.getFirst()).complete(TopicCreationResult.EXISTS);
        })).exceptionally(e -> {
            this.stream.onSetFailed((Throwable)e);
            ((CompletableFuture)update.getFirst()).completeExceptionally((Throwable)e);
            return null;
        }));
    }

    @Override
    public CompletableFuture<TopicCreationResult> validate() {
        return this.pendingResult.exceptionally(ex -> {
            throw new InvalidUpdateStreamException((Throwable)ex);
        });
    }

    @Override
    public boolean onValidateComplete(CreateUpdateStreamResponse response) {
        throw new IllegalStateException("No validate request has been sent");
    }

    @Override
    public boolean onValidateFailed(Throwable ex) {
        throw new IllegalStateException("No validate request has been sent");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean onSetFailed(Throwable ex) {
        UpdateStreamImpl<T> updateStreamImpl = this.stream;
        synchronized (updateStreamImpl) {
            this.stream.setDelegate(new InvalidSetStream<T>(ex, this.nextValue));
            this.deferredUpdates.forEach(update -> ((CompletableFuture)update.getFirst()).completeExceptionally(new InvalidUpdateStreamException(ex)));
        }
        return true;
    }
}

