当前位置: 欣欣网 > 码农

ArrayBlockingQueue与LinkedBlockingQueue

2024-04-20码农

作者:go4it
链接:https://www.jianshu.com/p/5b85c1794351

本文主要简单介绍下ArrayBlockingQueue与LinkedBlockingQueue。

对比

queue 阻塞与否 是否有界 线程安全保障 适用场景 注意事项
ArrayBlockingQueue 阻塞 有界 一把全局锁 生产消费模型,平衡两边处理速度 用于存储队列元素的存储空间是预先分配的,使用过程中内存开销较小(无须动态申请存储空间)
LinkedBlockingQueue 阻塞 可配置 存取采用2把锁 生产消费模型,平衡两边处理速度 无界的时候注意内存溢出问题,用于存储队列元素的存储空间是在其使用过程中动态分配的,因此它可能会增加JVM垃圾回收的负担。
ConcurrentLinkedQueue 非阻塞 无界 CAS 对全局的集合进行操作的场景 size() 是要遍历一遍集合,慎用

内存方面

  • ArrayBlockingQueue
    用于存储队列元素的存储空间是预先分配的,使用过程中内存开销较小(无须动态申请存储空间)

  • LinkedBlockingQueue
    用于存储队列元素的存储空间是在其使用过程中动态分配的,因此它可能会增加JVM垃圾回收的负担。

  • 有界无界

  • ArrayBlockingQueue
    有界,适合已知最大存储容量的场景

  • LinkedBlockingQueue
    可有界可以无界

  • 吞吐量

    LinkedBlockingQueue在大多数并发的场景下吞吐量比ArrayBlockingQueue,但是性能不稳定。

    Linked queues typically have higher throughput than array-based queues but less predictable performance in most concurrent applications.

    测试结果表明,LinkedBlockingQueue的可伸缩性要高于ArrayBlockingQueue。初看起来,这个结果有些奇怪:链表队列在每次插入元素时,都必须分配一个链表节点对象,这似乎比基于数组的队列执行了更多的工作。然而,虽然它拥有更好的内存分配与GC等开销,但与基于数组的队列相比,链表队列的put和take等方法支持并发性更高的访问,因为一些优化后的链接队列算法能将队列头节点的更新操作与尾节点的更新操作分离开来。由于内存分配操作通常是线程本地的,因此如果算法能通过多执行一些内存分配操作来降低竞争程度,那么这种算法通常具有更高的可伸缩性。

    并发方面

  • ArrayBlockingQueue
    采用一把锁,两个condition

  • /** Main lock guarding all access */
    final ReentrantLock lock;
    /** Condition for waiting takes */
    privatefinal Condition notEmpty;
    /** Condition for waiting puts */
    privatefinal Condition notFull;
    /**
    * Inserts element at current put position, advances, and signals.
    * Call only when holding lock.
    */

    private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
    putIndex = 0;
    count++;
    notEmpty.signal();
    }
    /**
    * Extracts element at current take position, advances, and signals.
    * Call only when holding lock.
    */

    private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
    takeIndex = 0;
    count--;
    if (itrs != null)
    itrs.elementDequeued();
    notFull.signal();
    return x;
    }


    此外还支持公平锁

    /**
    * Creates an {@code ArrayBlockingQueue} with the given (fixed)
    * capacity and the specified access policy.
    *
    @param capacity the capacity of this queue
    @param fair if {@code true} then queue accesses for threads blocked
    * on insertion or removal, are processed in FIFO order;
    * if {@code false} the access order is unspecified.
    @throws IllegalArgumentException if {@code capacity < 1}
    */

    publicArrayBlockingQueue(int capacity, boolean fair){
    if (capacity <= 0)
    thrownew IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull = lock.newCondition();
    }

  • LinkedBlockingQueue
    头尾各1把锁

  • /** Lock held by take, poll, etc */
    privatefinal ReentrantLock takeLock = new ReentrantLock();
    /** Wait queue for waiting takes */
    privatefinal Condition notEmpty = takeLock.newCondition();
    /** Lock held by put, offer, etc */
    privatefinal ReentrantLock putLock = new ReentrantLock();
    /** Wait queue for waiting puts */
    privatefinal Condition notFull = putLock.newCondition();
    /**
    * Inserts the specified element at the tail of this queue if it is
    * possible to do so immediately without exceeding the queue's capacity,
    * returning {@code true} upon success and {@code false} if this queue
    * is full.
    * When using a capacity-restricted queue, this method is generally
    * preferable to method {@link BlockingQueue#add add}, which can fail to
    * insert an element only by throwing an exception.
    *
    @throws NullPointerException if the specified element is null
    */

    publicbooleanoffer(E e){
    if (e == nullthrownew NullPointerException();
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
    returnfalse;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
    if (count.get() < capacity) {
    enqueue(node);
    c = count.getAndIncrement();
    if (c + 1 < capacity)
    notFull.signal();
    }
    finally {
    putLock.unlock();
    }
    if (c == 0)
    signalNotEmpty();
    return c >= 0;
    }
    public E poll(long timeout, TimeUnit unit)throws InterruptedException {
    E x = null;
    int c = -1;
    long nanos = unit.toNanos(timeout);
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
    while (count.get() == 0) {
    if (nanos <= 0)
    returnnull;
    nanos = notEmpty.awaitNanos(nanos);
    }
    x = dequeue();
    c = count.getAndDecrement();
    if (c > 1)
    notEmpty.signal();
    finally {
    takeLock.unlock();
    }
    if (c == capacity)
    signalNotFull();
    return x;
    }



    应用实例

    Executors

    里头用了LinkedBlockingQueue

    publicstatic ExecutorService newFixedThreadPool(int nThreads){
    returnnew ThreadPoolExecutor(nThreads, nThreads,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>());
    }
    publicstatic ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory){
    returnnew ThreadPoolExecutor(nThreads, nThreads,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>(),
    threadFactory);
    }
    publicstatic ExecutorService newSingleThreadExecutor(){
    returnnew FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(11,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>()));
    }
    publicstatic ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory){
    returnnew FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(11,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>(),
    threadFactory));
    }

    使用LinkedBlockingQueue实现logger

    public classBungeeLoggerextendsLogger{
    privatefinal ColouredWriter writer;
    privatefinal Formatter formatter = new ConciseFormatter();
    // private final LogDispatcher dispatcher = new LogDispatcher(this);
    privatefinal BlockingQueue<LogRecord> queue = new LinkedBlockingQueue<>();
    volatileboolean running = true;
    Thread recvThread = new Thread(){
    @Override
    publicvoidrun(){
    while (!isInterrupted() && running) {
    LogRecord record;
    try {
    record = queue.take();
    catch (InterruptedException ex) {
    continue;
    }
    doLog(record);
    }
    for (LogRecord record : queue) {
    doLog(record);
    }
    }
    };
    publicBungeeLogger()throws IOException {
    super("BungeeCord"null);
    this.writer = new ColouredWriter(new ConsoleReader());
    try {
    FileHandler handler = new FileHandler("proxy.log"1 << 248true);
    handler.setFormatter(formatter);
    addHandler(handler);
    catch (IOException ex) {
    System.err.println("Could not register logger!");
    ex.printStackTrace();
    }
    recvThread.start();
    Runtime.getRuntime().addShutdownHook(new Thread(){
    @Override
    publicvoidrun(){
    running = false;
    }
    });
    }
    @Override
    publicvoidlog(LogRecord record){
    if (running) {
    queue.add(record);
    }
    }
    voiddoLog(LogRecord record){
    super.log(record);
    writer.print(formatter.format(record));
    }
    }