/*
 * Decompiled with CFR 0.152.
 */
package com.pushtechnology.diffusion.client.features.impl.update.stream;

import com.pushtechnology.diffusion.client.features.TopicCreationResult;
import com.pushtechnology.diffusion.client.features.impl.update.stream.InternalUpdateStream;
import com.pushtechnology.diffusion.client.features.impl.update.stream.InvalidSetStream;
import com.pushtechnology.diffusion.client.features.impl.update.stream.UpdateStreamImpl;
import com.pushtechnology.diffusion.command.sender.ServiceReference;
import com.pushtechnology.diffusion.io.bytes.IBytes;
import com.pushtechnology.diffusion.topics.update.update.stream.CreateUpdateStreamResponse;
import com.pushtechnology.diffusion.topics.update.update.stream.UpdateStreamId;
import com.pushtechnology.diffusion.topics.update.update.stream.UpdateStreamRequest;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import net.jcip.annotations.ThreadSafe;

@ThreadSafe
final class SetStream<T>
implements InternalUpdateStream<T> {
    private final UpdateStreamImpl<T> stream;
    private final CreateUpdateStreamResponse createUpdateStreamResponse;
    private final ServiceReference<UpdateStreamId, Void> validateService;
    private final ServiceReference<UpdateStreamRequest, Void> updateService;
    private final BiFunction<T, T, IBytes> toBytes;
    private volatile T cache;

    SetStream(UpdateStreamImpl<T> stream, CreateUpdateStreamResponse createUpdateStreamResponse, ServiceReference<UpdateStreamId, Void> validateService, ServiceReference<UpdateStreamRequest, Void> updateService, BiFunction<T, T, IBytes> toBytes, T cache) {
        this.stream = stream;
        this.createUpdateStreamResponse = createUpdateStreamResponse;
        this.validateService = validateService;
        this.updateService = updateService;
        this.toBytes = toBytes;
        this.cache = cache;
    }

    @Override
    public T get() {
        return this.cache;
    }

    @Override
    public CompletableFuture<TopicCreationResult> set(T value) {
        IBytes bytes = this.toBytes.apply(this.cache, value);
        this.cache = value;
        CompletableFuture<Void> serverResponse = this.updateService.sendCommand(new UpdateStreamRequest(this.createUpdateStreamResponse.getStreamId(), bytes));
        serverResponse.whenComplete((id, ex) -> {
            if (ex != null) {
                this.stream.onSetFailed((Throwable)ex);
            } else {
                this.stream.onSetComplete(this.createUpdateStreamResponse);
            }
        });
        return serverResponse.thenApply(x -> TopicCreationResult.EXISTS);
    }

    @Override
    public boolean onSetComplete(CreateUpdateStreamResponse response) {
        return true;
    }

    @Override
    public boolean onSetFailed(Throwable ex) {
        return this.stream.casDelegate(this, new InvalidSetStream<T>(ex, this.cache));
    }

    @Override
    public CompletableFuture<TopicCreationResult> validate() {
        CompletableFuture<Void> serverResponse = this.validateService.sendCommand(this.createUpdateStreamResponse.getStreamId());
        serverResponse.whenComplete((id, ex) -> {
            if (ex != null) {
                this.stream.onValidateFailed((Throwable)ex);
            } else {
                this.stream.onValidateComplete(this.createUpdateStreamResponse);
            }
        });
        return serverResponse.thenApply(x -> TopicCreationResult.EXISTS);
    }

    @Override
    public boolean onValidateComplete(CreateUpdateStreamResponse response) {
        return true;
    }

    @Override
    public boolean onValidateFailed(Throwable ex) {
        return this.stream.casDelegate(this, new InvalidSetStream<T>(ex, this.cache));
    }
}

