JDK源码阅读之CyclicBarrier

写在前面

_作者Doug Lea_如此描述这个类:A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.

分析自JDK 1.8.0_171
这也是一个多线程协调的辅助工具类。barrier可翻译为栅栏,顾名思义,这个类控制先到的线程则在”栅栏”处等待其他线程,直到所有线程都到达,再接着往下执行。此外, CyclicBarrier 如其名,是循环可复用的。
如下是源码中给出的示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class Solver {
final int N;
final float[][] data;
final CyclicBarrier barrier;

class Worker implements Runnable {
int myRow;
Worker(int row) { myRow = row; }
public void run() {
while (!done()) {
processRow(myRow);

try {
// 等待一行处理结束
barrier.await();
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
}

public Solver(float[][] matrix) {
data = matrix;
N = matrix.length;
// 所有行处理完,再merge
Runnable barrierAction =
new Runnable() { public void run() { mergeRows(...); }};
barrier = new CyclicBarrier(N, barrierAction);

List<Thread> threads = new ArrayList<Thread>(N);
for (int i = 0; i < N; i++) {
Thread thread = new Thread(new Worker(i));
threads.add(thread);
thread.start();
}

// wait until done
for (Thread thread : threads)
thread.join();
}
}
}

如以上示例,并行处理每行矩阵元素,待所有行处理结束再对每行处理结果进行合并。

内部变量

// 可重入锁控制对barrier的访问
private final ReentrantLock lock = new ReentrantLock();
// 控制线程阻塞,直到所有线程”到达”
private final Condition trip = lock.newCondition();
// 多少个参与方(线程)
private final int parties;
// 到达栅栏后执行的线程
private final Runnable barrierCommand;
// 内部类表示目前是哪一代
private Generation generation = new Generation()
// 还有几个参与方(线程)在未到达
private int count;

Generation 为内部类,当触发栅栏或者重置,generation就会改变。

1
2
3
4
private static class Generation {
// 标识栅栏有没有被"踢翻"
boolean broken = false;
}

# CyclicBarrier.await 由源码中的示例代码,await是CyclicBarrier的主要起作用的方法。
首先先看一下构造方法中对内部变量的初始化:
1
2
3
4
5
6
7
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
// 初始parties个线程
this.count = parties;
this.barrierCommand = barrierAction;
}

构造方法里就初始化了三个变量,分别是表示多少个线程的parties、还有多少个线程未到达的count、后置线程barrierCommand。
await方法是调用内部私有方法dowait:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
try {
// 当前代
final Generation g = generation;

if (g.broken)
// 抱歉,栅栏已经被踢翻了
throw new BrokenBarrierException();

if (Thread.interrupted()) {
// 线程中断了,需要唤醒所有线程
breakBarrier();
throw new InterruptedException();
}

int index = --count;
if (index == 0) { // tripped
// 所有线程已到达
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
// 后置线程
command.run();
ranAction = true;
// 重置状态并唤醒所有线程
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}

// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
// condition.await 调用AQS里的Condition实现类
trip.await();
else if (nanos > 0L)
// 允许只等待一定时间
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}

if (g.broken)
throw new BrokenBarrierException();

if (g != generation)
// 物是人非,已经不是睡之前的时代了
return index;

if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

CyclicBarrier利用ReentrantLock控制对barrier的加锁访问,ReentrantLock.condition控制线程的阻塞唤醒。内部类Generation表示栅栏的一次生命周期,而每次栅栏被踢翻,generation要换代,即CyclicBarrier是可循环复用的。

-------------The End-------------