BlockingQueue

OpenJDK 源码阅读之 BlockingQueue

引言

在java.util.Concurrent包中,BlockingQueue很好的解决了在多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。同时,BlockingQueue也用于java自带线程池的缓冲队列中,了解BlockingQueue也有助于理解线程池的工作模型。

BlockingQueue接口

该接口属于队列,所以继承了Queue接口,该接口最重要的五个方法分别是offer方法,poll方法,put方法,take方法和drainTo方法。

offer方法和poll方法分别有一个静态重载方法,分别是offer(E e, long timeout, TimeUnit unit)和poll(long timeout, TimeUnit unit)方法。其意义是在限定时间内存入或取出对象,如果不能存入取出则返回false。

put方法会在当队列存储对象达到限定值时阻塞线程,而在队列不为空时唤醒被take方法所阻塞的线程。take方法是相反的。

drainTo方法可批量获取队列中的元素。

常见的BlockingQueue实现

  • LinkedBlockingQueue

LinkedBlockingQueue是比较常见的BlockingQueue的实现,他是基于链表的阻塞队列。在创建该对象时如果不指定可存储对象个数大小时,默认为Integer.MAX_VALUE。当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时,才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。

LinkedBlockingQueue内部使用了独立的两把锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

put方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}

offer方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
}

这两个方法的区别是put方法在容量达到上限时会阻塞,而offer方法则会直接返回false。

  • ArrayBlockingQueue

ArrayBlockingQueue是基于数组的阻塞队列,除了有一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue。
ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。

  • SynchronousQueue

是一种没有缓冲的阻塞队列,在生产者put的同时必须要有一个消费者进行take,否则就会阻塞。声明一个SynchronousQueue有两种不同的方式。公平模式和非公平模式的区别:如果采用公平模式:SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;但如果是非公平模式(SynchronousQueue默认):SynchronousQueue采用非公平锁,同时配合一个LIFO队列来管理多余的生产者和消费者,而后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理。

  • PriorityBlockingQueue和DelayQueue

PriorityBlockingQueue是基于优先级的阻塞队列,该队列不会阻塞生产者,只会阻塞消费者。
DelayQueue队列存储的对象只有指定的延迟时间到了才能被取出,该队列也不会阻塞生产者。

BlockingQueue的使用

在处理多线程生产者消费者问题时的演示代码:

main()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueTest {
public static void main(String[] args)
{
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1000);
Thread p1 = new Thread(new Producer(queue),"producer1");
Thread p2 = new Thread(new Producer(queue),"producer2");
Thread c1 = new Thread(new Consumer(queue),"consumer1");
Thread c2 = new Thread(new Consumer(queue),"consumer2");
p1.start();
p2.start();
c1.start();
c2.start();
}
}

生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class Producer implements Runnable{
private BlockingQueue<String> queue;
public Producer(BlockingQueue<String> queue) {
this.queue = queue;
}
public void run() {
int i = 0;
while (!Thread.currentThread().isInterrupted())
{
try {
queue.put(Thread.currentThread().getName()+" product "+i);
} catch (InterruptedException e) {
System.err.println(Thread.currentThread().getName() + " error");
}
i++;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
}

消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class Consumer implements Runnable{
private BlockingQueue<String> queue;
public Consumer(BlockingQueue<String> queue) {
this.queue = queue;
}
public void run() {
int i = 0;
while (!Thread.currentThread().isInterrupted())
{
try {
String str = queue.take();
System.out.println(str);
} catch (InterruptedException e) {
}
try {
Thread.sleep(300);
} catch (InterruptedException e) {
}
}
}
}

总结

BlockingQueue在并发编程中扮演着重要的角色,既可以自己用来解决生产者消费者问题,也用于java自带线程池的缓冲队列。

参考:

BlockingQueue

热评文章