/*
 * Decompiled with CFR 0.152.
 */
package com.pushtechnology.diffusion.util.concurrent.threads;

import com.pushtechnology.diffusion.logs.i18n.I18nLogger;
import com.pushtechnology.diffusion.util.concurrent.threads.ExecutionPool;
import com.pushtechnology.diffusion.util.concurrent.threads.ThreadFactoryImpl;
import com.pushtechnology.repackaged.jctools.queues.MpscBlockingConsumerArrayQueue;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;

@ThreadSafe
public final class ExecutionPoolImpl
implements ExecutionPool {
    private static final Logger LOG = I18nLogger.getLogger(ExecutionPoolImpl.class);
    private final ExecutionThread[] pool;
    private final QueueFullHandler queueFullHandler;

    public ExecutionPoolImpl(String name, int size, int queueSize, QueueFullHandler queueFullHandler) {
        this(new ThreadFactoryImpl(name), size, queueSize, queueFullHandler);
    }

    public ExecutionPoolImpl(ThreadFactory threadFactory, int size, int queueSize, QueueFullHandler queueFullHandler) {
        this.queueFullHandler = Objects.requireNonNull(queueFullHandler);
        if (size < 1) {
            throw new IllegalArgumentException("Size: " + size);
        }
        this.pool = new ExecutionThread[size];
        for (int i = 0; i < size; ++i) {
            this.pool[i] = new ExecutionThread(threadFactory, queueSize);
        }
    }

    public void start() {
        this.forEachExecutionThread(ExecutionThread::start);
    }

    public void stop() {
        this.forEachExecutionThread(ExecutionThread::stop);
    }

    public void forEachThread(BiConsumer<Executor, Thread> consumer) {
        this.forEachExecutionThread(et -> consumer.accept((Executor)et, et.thread));
    }

    private void forEachExecutionThread(Consumer<ExecutionThread> consumer) {
        for (ExecutionThread thread : this.pool) {
            consumer.accept(thread);
        }
    }

    @Override
    public void execute(Object affinityKey, Runnable task) {
        int tid = Math.abs(Objects.hashCode(affinityKey) % this.pool.length);
        this.pool[tid].execute(Objects.requireNonNull(task));
    }

    public static interface QueueFullHandler {
        public void handleQueueFull(Runnable var1, BlockingQueue<Runnable> var2, Thread var3);
    }

    private final class ExecutionThread
    implements Executor {
        private final Thread thread;
        private final BlockingQueue<Runnable> queue;
        private volatile boolean stopped = false;

        ExecutionThread(ThreadFactory factory, int queueSize) {
            this.queue = new MpscBlockingConsumerArrayQueue<Runnable>(queueSize);
            this.thread = factory.newThread(this::process);
        }

        @Override
        public void execute(Runnable task) {
            if (this.stopped) {
                throw new RejectedExecutionException(String.valueOf(this.thread) + " stopped");
            }
            if (this.thread == Thread.currentThread()) {
                task.run();
            } else if (!this.queue.offer(task)) {
                ExecutionPoolImpl.this.queueFullHandler.handleQueueFull(task, this.queue, this.thread);
            }
        }

        void start() {
            this.thread.start();
        }

        void stop() {
            this.stopped = true;
            this.thread.interrupt();
        }

        private void process() {
            BlockingQueue<Runnable> q = this.queue;
            while (!this.isStopped()) {
                Runnable task;
                try {
                    task = q.take();
                }
                catch (InterruptedException e) {
                    return;
                }
                this.runTask(task);
            }
        }

        private void runTask(Runnable task) {
            try {
                task.run();
            }
            catch (Exception e) {
                LOG.error("THREADS_UNCAUGHT_EXCEPTION", (Object)this.thread, (Object)e);
            }
        }

        private boolean isStopped() {
            return this.stopped || this.thread.isInterrupted();
        }
    }
}

