package com.ibm.nosql.json.api;

import com.ibm.nosql.json.internal.Debug;
import com.ibm.nosql.json.internal.NoSQLProperties;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:com/ibm/nosql/json/api/AsyncDispatcher.class */
public class AsyncDispatcher {
    BlockingQueue<WriteResult> recordQueue_;
    AsyncInserter asyncInserter_;
    int commitFrequency_;
    volatile boolean shutdown_;
    volatile Exception exception_;
    static final int MAX_QUE_CAPACITY = Integer.parseInt(NoSQLProperties.getProperty(NoSQLProperties.ASYNC_MAX_BATCH_SIZE));
    static final WriteResult SHUTDOWN_REQUEST__ = new WriteResult(null, null, null);
    private static int asyncMaxThreadCount__ = Integer.parseInt(NoSQLProperties.getProperty(NoSQLProperties.ASYNC_MAX_THREAD_COUNT));
    Thread dataDispatcher_ = null;
    ThreadLocal<Counter> counter_ = new ThreadLocal<Counter>() { // from class: com.ibm.nosql.json.api.AsyncDispatcher.1
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.lang.ThreadLocal
        public Counter initialValue() {
            return new Counter();
        }
    };
    boolean done_ = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/ibm/nosql/json/api/AsyncDispatcher$InsertWorker.class */
    public class InsertWorker implements Runnable {
        private final BlockingQueue<WriteResult> queue_;
        private int maxBatchSize_;
        long committedCount_;

        InsertWorker(BlockingQueue<WriteResult> blockingQueue, int i) {
            this.queue_ = blockingQueue;
            this.maxBatchSize_ = i;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                AsyncDispatcher.this.shutdown_ = false;
                while (!AsyncDispatcher.this.shutdown_) {
                    consumeQueue();
                }
            } catch (InterruptedException e) {
            } catch (Throwable th) {
                synchronized (AsyncDispatcher.this.dataDispatcher_) {
                    AsyncDispatcher.this.done_ = true;
                    AsyncDispatcher.this.dataDispatcher_.notify();
                    throw th;
                }
            }
            synchronized (AsyncDispatcher.this.dataDispatcher_) {
                AsyncDispatcher.this.done_ = true;
                AsyncDispatcher.this.dataDispatcher_.notify();
            }
        }

        void consumeQueue() throws InterruptedException {
            ArrayList<WriteResult> arrayList = new ArrayList<>(this.maxBatchSize_);
            WriteResult take = this.queue_.take();
            if (take == AsyncDispatcher.SHUTDOWN_REQUEST__) {
                AsyncDispatcher.this.shutdown_ = true;
                return;
            }
            arrayList.add(take);
            this.queue_.drainTo(arrayList, this.maxBatchSize_ - 1);
            if (arrayList.get(arrayList.size() - 1) == AsyncDispatcher.SHUTDOWN_REQUEST__) {
                arrayList.remove(arrayList.size() - 1);
                AsyncDispatcher.this.shutdown_ = true;
                if (arrayList.size() == 0) {
                    return;
                }
            }
            try {
                AsyncDispatcher.this.asyncInserter_.insertList(arrayList);
            } catch (Exception e) {
                AsyncDispatcher.this.exception_ = e;
                Debug.out.println(e.getMessage());
            }
        }
    }

    public void init(int i, DB db) {
        this.exception_ = null;
        this.shutdown_ = false;
        if (this.dataDispatcher_ == null) {
            this.asyncInserter_ = new AsyncInserter();
            this.asyncInserter_.init(i, db, asyncMaxThreadCount__, asyncMaxThreadCount__);
            this.recordQueue_ = new ArrayBlockingQueue(MAX_QUE_CAPACITY);
            this.dataDispatcher_ = (Thread) AccessController.doPrivileged(new PrivilegedAction<Thread>() { // from class: com.ibm.nosql.json.api.AsyncDispatcher.2
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.security.PrivilegedAction
                public Thread run() {
                    Thread thread = new Thread(new InsertWorker(AsyncDispatcher.this.recordQueue_, AsyncDispatcher.MAX_QUE_CAPACITY + 1), "nosql-dispatch");
                    thread.setDaemon(true);
                    return thread;
                }
            });
            this.done_ = false;
            this.dataDispatcher_.start();
        }
    }

    public void stop() throws InterruptedException {
        Debug.out.println("Shutdown...");
        this.recordQueue_.put(SHUTDOWN_REQUEST__);
        synchronized (this.dataDispatcher_) {
            if (!this.done_) {
                Debug.out.println("Waiting...");
                try {
                    this.dataDispatcher_.wait();
                } catch (Exception e) {
                }
            }
        }
        this.asyncInserter_.executor_.shutdown();
        this.asyncInserter_.executor_.awaitTermination(60L, TimeUnit.SECONDS);
        Debug.out.println("Total records: " + AsyncInserter.totalRecords_);
        Debug.out.println("Total workers: " + AsyncInserter.totalWorkers_);
        if (AsyncInserter.totalWorkers_ > 0) {
            Debug.out.println("Average records per worker: " + (AsyncInserter.totalRecords_ / AsyncInserter.totalWorkers_));
        }
    }

    public void insertList(List<WriteResult> list) throws InterruptedException {
        Counter counter = this.counter_.get();
        for (WriteResult writeResult : list) {
            writeResult.counter_ = counter;
            synchronized (counter) {
                counter.queueCount_++;
            }
            this.recordQueue_.put(writeResult);
        }
    }

    public void insert(WriteResult writeResult) throws InterruptedException {
        Counter counter = this.counter_.get();
        writeResult.counter_ = counter;
        synchronized (counter) {
            counter.queueCount_++;
        }
        this.recordQueue_.put(writeResult);
    }

    public Exception getLastError() {
        return this.exception_;
    }

    int getThreadCount() {
        return 10;
    }
}
