/*
 * Decompiled with CFR 0.152.
 */
package com.pushtechnology.diffusion.client.features.impl.update.stream;

import com.pushtechnology.diffusion.client.features.ClusterRoutingException;
import com.pushtechnology.diffusion.client.features.RecoverableUpdateStream;
import com.pushtechnology.diffusion.client.features.TopicCreationResult;
import com.pushtechnology.diffusion.client.features.UpdateStream;
import com.pushtechnology.diffusion.client.features.UpdateStreamRetryLimitException;
import com.pushtechnology.diffusion.client.session.retry.RetryStrategy;
import com.pushtechnology.diffusion.exceptions.DiffusionInterruptedException;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import net.jcip.annotations.GuardedBy;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public final class RecoverableUpdateStreamImpl<T>
implements RecoverableUpdateStream<T> {
    private static final Logger LOG = LoggerFactory.getLogger(RecoverableUpdateStreamImpl.class);
    private final Supplier<UpdateStream<T>> streamSupplier;
    private final RetryStrategy strategy;
    @GuardedBy(value="queue")
    private final ConcurrentLinkedQueue<PendingTopicUpdate<T>> queue;
    private final AtomicLong counter = new AtomicLong(Long.MIN_VALUE);
    private final AtomicReference<UpdateStream<T>> delegate;
    private final AtomicReference<Throwable> priorException = new AtomicReference<Object>(null);

    public RecoverableUpdateStreamImpl(Supplier<UpdateStream<T>> supplier, RetryStrategy strategy) {
        this.streamSupplier = Objects.requireNonNull(supplier, "supplier is null");
        this.delegate = new AtomicReference<UpdateStream<T>>(Objects.requireNonNull(supplier.get(), "supplier returns null"));
        this.strategy = Objects.requireNonNull(strategy, "strategy is null");
        this.queue = new ConcurrentLinkedQueue();
    }

    @Override
    public CompletableFuture<TopicCreationResult> set(T t) {
        PendingTopicUpdate<T> update = new PendingTopicUpdate<T>(this.counter.getAndIncrement(), t);
        this.queue.add(update);
        CompletionStage cf = this.delegate.get().set(t).whenComplete((topicCreationResult, throwable) -> this.processUpdateResponse(update, (TopicCreationResult)((Object)topicCreationResult), (Throwable)throwable));
        update.initFuture((CompletableFuture<TopicCreationResult>)cf);
        return cf;
    }

    @Override
    public T get() {
        return this.delegate.get().get();
    }

    @Override
    public CompletableFuture<TopicCreationResult> validate() {
        return this.delegate.get().validate().whenComplete((topicCreationResult, throwable) -> {
            if (throwable != null) {
                this.processException((Throwable)throwable);
            }
        });
    }

    @Override
    public boolean isRecoverable() {
        return this.isRecoverable(this.priorException.get());
    }

    boolean isRecoverable(Throwable cause) {
        if (null == cause) {
            return false;
        }
        return cause instanceof ClusterRoutingException || this.isRecoverable(cause.getCause());
    }

    @Override
    public void recover() throws UpdateStreamRetryLimitException {
        if (!this.isRecoverable(this.priorException.get())) {
            throw new IllegalStateException("Can only recover following a recoverable exception", this.priorException.get());
        }
        this.priorException.set(null);
        this.synchronisePendingWork();
        int reattempts = this.strategy.getAttempts();
        while (!this.queue.isEmpty()) {
            try {
                RecoverableUpdateStreamImpl.uncheckedSleep(this.strategy.getInterval());
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Attempting recovery of {} topic updates", (Object)this.queueSize());
                }
                this.delegate.set(this.streamSupplier.get());
                this.delegate.get().validate().get();
                CompletableFuture[] cfs = (CompletableFuture[])this.queue.stream().map(update -> {
                    RecoverableUpdateStreamImpl recoverableUpdateStreamImpl = this;
                    synchronized (recoverableUpdateStreamImpl) {
                        return this.delegate.get().set(update.getUpdateValue()).whenComplete((topicCreationResult, throwable) -> this.processUpdateResponse((PendingTopicUpdate<T>)update, (TopicCreationResult)((Object)((Object)topicCreationResult)), (Throwable)throwable));
                    }
                }).toArray(CompletableFuture[]::new);
                CompletableFuture.allOf(cfs).get(this.strategy.getInterval(), TimeUnit.MILLISECONDS);
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Completed recovery during attempt {}", (Object)(this.strategy.getAttempts() - reattempts));
                }
                return;
            }
            catch (InterruptedException | CancellationException | ExecutionException | TimeoutException ex) {
                if (reattempts != Integer.MAX_VALUE) {
                    --reattempts;
                }
                if (this.isRecoverable(ex) && reattempts > 0) continue;
                throw (UpdateStreamRetryLimitException)new UpdateStreamRetryLimitException("Cannot recover within strategy: " + String.valueOf(this.strategy)).initCause(ex);
            }
        }
    }

    private void synchronisePendingWork() throws UpdateStreamRetryLimitException {
        try {
            CompletableFuture.allOf((CompletableFuture[])this.queue.stream().map(PendingTopicUpdate::getFuture).toArray(CompletableFuture[]::new)).get(this.strategy.getInterval(), TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException | CancellationException | ExecutionException ex) {
            LOG.debug("Synchronizing CompletableFutures", ex);
        }
        catch (TimeoutException ex) {
            throw (UpdateStreamRetryLimitException)new UpdateStreamRetryLimitException("Cannot recover within strategy: " + String.valueOf(this.strategy)).initCause(ex);
        }
    }

    private void processUpdateResponse(PendingTopicUpdate<T> update, TopicCreationResult topicCreationResult, Throwable throwable) {
        if (throwable == null) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Received set response: {}, result: {}", (Object)update.getUpdateValue(), (Object)topicCreationResult);
            }
            if (!this.queue.remove(update)) {
                throw new AssertionError((Object)("Cannot find topic update in queue " + String.valueOf(update)));
            }
        } else {
            this.processException(throwable);
            if (this.isRecoverable(throwable) && LOG.isTraceEnabled()) {
                LOG.trace("Recoverable error detected. {} updates can be recovered.", (Object)this.queue.size(), (Object)throwable);
            }
        }
    }

    private void processException(Throwable throwable) {
        this.priorException.compareAndSet(null, throwable);
    }

    private static void uncheckedSleep(long interval) {
        try {
            LOG.trace("Pausing for {}ms", (Object)interval);
            Thread.sleep(interval);
        }
        catch (InterruptedException e) {
            throw new DiffusionInterruptedException(e);
        }
    }

    int queueSize() {
        return this.queue.size();
    }

    public String toString() {
        return "RecoverableUpdateStream{strategy=" + String.valueOf(this.strategy) + ", delegate=" + String.valueOf(this.delegate) + ", priorException=" + String.valueOf(this.priorException) + "}";
    }

    @ThreadSafe
    private static final class PendingTopicUpdate<T> {
        private final long counter;
        private final T updateValue;
        @GuardedBy(value="this")
        private CompletableFuture<TopicCreationResult> future;

        private PendingTopicUpdate(long counter, T updateValue) {
            this.counter = counter;
            this.updateValue = updateValue;
        }

        private T getUpdateValue() {
            return this.updateValue;
        }

        private synchronized CompletableFuture<TopicCreationResult> getFuture() {
            return this.future;
        }

        private synchronized void initFuture(CompletableFuture<TopicCreationResult> newFuture) {
            if (this.future != null) {
                throw new IllegalStateException("future is already set");
            }
            this.future = newFuture;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            PendingTopicUpdate that = (PendingTopicUpdate)o;
            return Objects.equals(this.counter, that.counter) && Objects.equals(this.updateValue, that.updateValue);
        }

        public int hashCode() {
            return Objects.hash(this.counter, this.updateValue);
        }
    }
}

