CountDownLatch

CountDownLatchCyclicBarrier是jdk concurrent包下非常有用的两个并发工具类,它们提供了一种控制并发流程的手段。

CountDownLatch

CountDownLatch允许一个或多个线程等待其他线程完成操作。

CountDownLatch可以把它看作一个计数器,只不过这个计数器的操作是原子操作,同时只能有一个线程去操作这个计数器,也就是同时只能有一个线程去减这个计数器里面的值。

应用场景

有一个任务想要往下执行,但必须要等到其他的任务执行完毕后才可以继续往下执行。假如我们这个想要继续往下执行的任务调用一个CountDownLatch对象的await()方法,其他的任务执行完自己的任务后调用同一个CountDownLatch对象上的countDown()方法则计数减1,这个调用await()方法的任务将一直阻塞等待,直到这个CountDownLatch对象的计数值减到0为止。

案例

比如:有2个工人在为老板干活,当2个工人把一天的活都干完了的时候,老板就来检查所有工人所干的活。记住这个条件:三个工人先全部干完活,老板才检查。所以在这里设计两个类,Worker代表工人,Boss代表老板。代码如下:

  • Worker.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.zsr.test.countDownLatch;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class Worker implements Runnable {
private CountDownLatch downLatch;
private String name;
public Worker(CountDownLatch downLatch, String name) {
this.downLatch = downLatch;
this.name = name;
}
@Override
public void run() {
this.doWork();
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException ie) {
}
System.out.println(this.name + "活干完了!");
this.downLatch.countDown();
}
private void doWork() {
System.out.println(this.name + "正在干活!");
}
}
  • Boss.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.zsr.test.countDownLatch;
import java.util.concurrent.CountDownLatch;
public class Boss implements Runnable {
private CountDownLatch downLatch;
public Boss(CountDownLatch downLatch) {
this.downLatch = downLatch;
}
@Override
public void run() {
System.out.println("老板正在等所有的工人干完活......");
try {
this.downLatch.await();
} catch (InterruptedException e) {
}
System.out.println("工人活都干完了,老板开始检查了!");
}
}
  • CountDownLatchTest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.zsr.test.countDownLatch;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CountDownLatchTest {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
CountDownLatch latch = new CountDownLatch(2);
Worker w1 = new Worker(latch, "张三");
Worker w2 = new Worker(latch, "李四");
Boss boss = new Boss(latch);
executor.execute(w2);
executor.execute(w1);
executor.execute(boss);
executor.shutdown();
}
}

CountDownLatch源码分析

  • 自定义同步器Sync实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
  • 构造方法
1
2
3
4
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

从构造方法的具体实现可以看出,通过构造方法传入的int型参数count其实就是同步器的状态。

  • countDown方法
1
2
3
public void countDown() {
sync.releaseShared(1);
}

整个countDown只做了一件事情,释放同步状态,同步状态在这里的实际意义也就是需要等待的完成的点的数量,只要每完成一个点,就调用countDown方法释放同步状态。

  • await方法

CountDownLatch提供带超时时间的await和不带超时时间的await

1
2
3
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

await的实质是在获取同步状态,同步状态state == 0成立,当前等待完成的点均已完成,主线程继续往下执行,否则,主线程进入等待队列自旋等待直到同步状态释放后state == 0。有些时候主线程是不能一直自旋等待,这个时候带超时时间的await就派上用场了,设置超时时间,如果在指定时间内N个点都未完成,返回false,主线程不再等待,继续往下执行。

CountDownLatch总结

CountDownLatch实质上就是一个AQS计数器,通过AQS来实现线程的等待与唤醒。

热评文章