CyclicBarrier

CountDownLatchCyclicBarrier是jdk concurrent包下非常有用的两个并发工具类,它们提供了一种控制并发流程的手段。

CyclicBarrier

CyclicBarrier,让一组线程到达一个同步点后再一起继续运行,其中任意一个线程未达到同步点,其他到达的线程均会被阻塞。

  • CountDownLatch : 一个或者多个线程,等待其他多个线程完成某件事情之后才能执行;
  • CyclicBarrier: 多个线程互相等待,直到到达同一个同步点,再继续一起执行。

对于CountDownLatch来说,重点是“一个线程(多个线程)等待”,而其他的N个线程在完成“某件事情”之后,可以终止,也可以等待。而对于CyclicBarrier,重点是多个线程,在任意一个线程没有完成,所有的线程都必须等待。

CountDownLatch是计数器,线程完成一个记录一个,只不过计数不是递增而是递减,而CyclicBarrier更像是一个阀门,需要所有线程都到达,阀门才能打开,然后继续执行。

案例

假设有只有的一个场景:每个线程代表一个跑步运动员,当运动员都准备好后,才一起出发,只要有一个人没有准备好,大家都等待.

  • Runner.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.zsr.test.cyclicBarrier;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class Runner implements Runnable {
private CyclicBarrier barrier;
private String name;
public Runner(CyclicBarrier barrier, String name) {
super();
this.barrier = barrier;
this.name = name;
}
@Override
public void run() {
try {
Thread.sleep(100);
System.out.println(name + " 准备OK.");
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(name + " Go!!");
}
}
  • CyclicBarrierTest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.zsr.test.cyclicBarrier;
import java.io.IOException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CyclicBarrierTest {
public static void main(String[] args) throws IOException, InterruptedException {
CyclicBarrier barrier = new CyclicBarrier(3);
ExecutorService executor = Executors.newFixedThreadPool(3);
executor.submit(new Thread(new Runner(barrier, "zhangsan")));
executor.submit(new Thread(new Runner(barrier, "lisi")));
executor.submit(new Thread(new Runner(barrier, "wangwu")));
executor.shutdown();
}
}

CyclicBarrier源码分析

  • 构造方法

CyclicBarrier提供两个构造方法CyclicBarrier(int parties)CyclicBarrier(int parties, Runnable barrierAction)

1
2
3
public CyclicBarrier(int parties) {
this(parties, null);
}

默认构造方法,参数表示拦截的线程数量。

1
2
3
4
5
6
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

由于线程之前的调度是由CPU决定的,所以默认的构造方法无法设置线程执行优先级,CyclicBarrier提供一个更高级的构造函数CyclicBarrier(int parties, Runnable barrierAction),用于在线程到达同步点时,优先执行线程barrierAction,这样可以更加方便的处理一些负责的业务场景。

  • await方法

创建CyclicBarrier后,每个线程调用await方法告诉CyclicBarrier自己已经到达同步点,然后当前线程被阻塞。接下来我们来看看await方法的具体实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen;
}
}
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}

CyclicBarrier同样提供带超时时间的await和不带超时时间的await

  • dowait方法
  1. dowait的前段部分,主要完成了当所有线程都到达同步点(barrier)时,唤醒所有的等待线程,一起往下继续运行,可根据参数barrierAction决定优先执行的线程。
  2. dowait的实现后半部分,主要实现了线程未到达同步点(barrier)时,线程进入Condition自旋等待,直到等待超时或者所有线程都到达barrier时被唤醒。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

在整个dowait:

  1. 使用ReentrantLock保证每一次操作线程安全;
  2. 线程等待/唤醒使用Lock配合Condition来实现;
  3. 线程被唤醒的条件:等待超时或者所有线程都到达barrier。

总结:

CyclicBarrier就是一个栅栏,等待所有线程到达后再执行相关的操作。CyclicBarrier在释放等待线程后可以重用。

热评文章