當前位置: 妍妍網 > 碼農

ArrayBlockingQueue與LinkedBlockingQueue

2024-04-20碼農

作者:go4it
連結:https://www.jianshu.com/p/5b85c1794351

本文主要簡單介紹下ArrayBlockingQueue與LinkedBlockingQueue。

對比

queue 阻塞與否 是否有界 執行緒安全保障 適用場景 註意事項
ArrayBlockingQueue 阻塞 有界 一把全域鎖 生產消費模型,平衡兩邊處理速度 用於儲存佇列元素的儲存空間是預先分配的,使用過程中記憶體開銷較小(無須動態申請儲存空間)
LinkedBlockingQueue 阻塞 可配置 存取采用2把鎖 生產消費模型,平衡兩邊處理速度 無界的時候註意記憶體溢位問題,用於儲存佇列元素的儲存空間是在其使用過程中動態分配的,因此它可能會增加JVM垃圾回收的負擔。
ConcurrentLinkedQueue 非阻塞 無界 CAS 對全域的集合進行操作的場景 size() 是要遍歷一遍集合,慎用

記憶體方面

  • ArrayBlockingQueue
    用於儲存佇列元素的儲存空間是預先分配的,使用過程中記憶體開銷較小(無須動態申請儲存空間)

  • LinkedBlockingQueue
    用於儲存佇列元素的儲存空間是在其使用過程中動態分配的,因此它可能會增加JVM垃圾回收的負擔。

  • 有界無界

  • ArrayBlockingQueue
    有界,適合已知最大儲存容量的場景

  • LinkedBlockingQueue
    可有界可以無界

  • 吞吐量

    LinkedBlockingQueue在大多數並行的場景下吞吐量比ArrayBlockingQueue,但是效能不穩定。

    Linked queues typically have higher throughput than array-based queues but less predictable performance in most concurrent applications.

    測試結果表明,LinkedBlockingQueue的可伸縮性要高於ArrayBlockingQueue。初看起來,這個結果有些奇怪:連結串列佇列在每次插入元素時,都必須分配一個連結串列節點物件,這似乎比基於陣列的佇列執行了更多的工作。然而,雖然它擁有更好的記憶體分配與GC等開銷,但與基於陣列的佇列相比,連結串列佇列的put和take等方法支持並行性更高的存取,因為一些最佳化後的連結佇列演算法能將佇列頭節點的更新操作與尾節點的更新操作分離開來。由於記憶體分配操作通常是執行緒原生的,因此如果演算法能透過多執行一些記憶體分配操作來降低競爭程度,那麽這種演算法通常具有更高的可伸縮性。

    並行方面

  • ArrayBlockingQueue
    采用一把鎖,兩個condition

  • /** Main lock guarding all access */
    final ReentrantLock lock;
    /** Condition for waiting takes */
    privatefinal Condition notEmpty;
    /** Condition for waiting puts */
    privatefinal Condition notFull;
    /**
    * Inserts element at current put position, advances, and signals.
    * Call only when holding lock.
    */

    private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
    putIndex = 0;
    count++;
    notEmpty.signal();
    }
    /**
    * Extracts element at current take position, advances, and signals.
    * Call only when holding lock.
    */

    private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
    takeIndex = 0;
    count--;
    if (itrs != null)
    itrs.elementDequeued();
    notFull.signal();
    return x;
    }


    此外還支持公平鎖

    /**
    * Creates an {@code ArrayBlockingQueue} with the given (fixed)
    * capacity and the specified access policy.
    *
    @param capacity the capacity of this queue
    @param fair if {@code true} then queue accesses for threads blocked
    * on insertion or removal, are processed in FIFO order;
    * if {@code false} the access order is unspecified.
    @throws IllegalArgumentException if {@code capacity < 1}
    */

    publicArrayBlockingQueue(int capacity, boolean fair){
    if (capacity <= 0)
    thrownew IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull = lock.newCondition();
    }

  • LinkedBlockingQueue
    頭尾各1把鎖

  • /** Lock held by take, poll, etc */
    privatefinal ReentrantLock takeLock = new ReentrantLock();
    /** Wait queue for waiting takes */
    privatefinal Condition notEmpty = takeLock.newCondition();
    /** Lock held by put, offer, etc */
    privatefinal ReentrantLock putLock = new ReentrantLock();
    /** Wait queue for waiting puts */
    privatefinal Condition notFull = putLock.newCondition();
    /**
    * Inserts the specified element at the tail of this queue if it is
    * possible to do so immediately without exceeding the queue's capacity,
    * returning {@code true} upon success and {@code false} if this queue
    * is full.
    * When using a capacity-restricted queue, this method is generally
    * preferable to method {@link BlockingQueue#add add}, which can fail to
    * insert an element only by throwing an exception.
    *
    @throws NullPointerException if the specified element is null
    */

    publicbooleanoffer(E e){
    if (e == nullthrownew NullPointerException();
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
    returnfalse;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
    if (count.get() < capacity) {
    enqueue(node);
    c = count.getAndIncrement();
    if (c + 1 < capacity)
    notFull.signal();
    }
    finally {
    putLock.unlock();
    }
    if (c == 0)
    signalNotEmpty();
    return c >= 0;
    }
    public E poll(long timeout, TimeUnit unit)throws InterruptedException {
    E x = null;
    int c = -1;
    long nanos = unit.toNanos(timeout);
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
    while (count.get() == 0) {
    if (nanos <= 0)
    returnnull;
    nanos = notEmpty.awaitNanos(nanos);
    }
    x = dequeue();
    c = count.getAndDecrement();
    if (c > 1)
    notEmpty.signal();
    finally {
    takeLock.unlock();
    }
    if (c == capacity)
    signalNotFull();
    return x;
    }



    套用例項

    Executors

    裏頭用了LinkedBlockingQueue

    publicstatic ExecutorService newFixedThreadPool(int nThreads){
    returnnew ThreadPoolExecutor(nThreads, nThreads,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>());
    }
    publicstatic ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory){
    returnnew ThreadPoolExecutor(nThreads, nThreads,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>(),
    threadFactory);
    }
    publicstatic ExecutorService newSingleThreadExecutor(){
    returnnew FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(11,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>()));
    }
    publicstatic ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory){
    returnnew FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(11,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>(),
    threadFactory));
    }

    使用LinkedBlockingQueue實作logger

    public classBungeeLoggerextendsLogger{
    privatefinal ColouredWriter writer;
    privatefinal Formatter formatter = new ConciseFormatter();
    // private final LogDispatcher dispatcher = new LogDispatcher(this);
    privatefinal BlockingQueue<LogRecord> queue = new LinkedBlockingQueue<>();
    volatileboolean running = true;
    Thread recvThread = new Thread(){
    @Override
    publicvoidrun(){
    while (!isInterrupted() && running) {
    LogRecord record;
    try {
    record = queue.take();
    catch (InterruptedException ex) {
    continue;
    }
    doLog(record);
    }
    for (LogRecord record : queue) {
    doLog(record);
    }
    }
    };
    publicBungeeLogger()throws IOException {
    super("BungeeCord"null);
    this.writer = new ColouredWriter(new ConsoleReader());
    try {
    FileHandler handler = new FileHandler("proxy.log"1 << 248true);
    handler.setFormatter(formatter);
    addHandler(handler);
    catch (IOException ex) {
    System.err.println("Could not register logger!");
    ex.printStackTrace();
    }
    recvThread.start();
    Runtime.getRuntime().addShutdownHook(new Thread(){
    @Override
    publicvoidrun(){
    running = false;
    }
    });
    }
    @Override
    publicvoidlog(LogRecord record){
    if (running) {
    queue.add(record);
    }
    }
    voiddoLog(LogRecord record){
    super.log(record);
    writer.print(formatter.format(record));
    }
    }