JDK源码阅读之Semaphore

写在前面

作者_Doug Lea_如此描述这个类:A counting semaphore. Conceptually, a semaphore maintains a set of permits.

分析自JDK 1.8.0_171
顾名思义。计数信号量,它维护许可数量。acquire一个许可阻塞至池里有可用许可,release一个许可即往池里添加一个许可。
如下为源码中示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class Pool {
private static final int MAX_AVAILABLE = 100;
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);

public Object getItem() throws InterruptedException {
available.acquire();
return getNextAvailableItem();
}

public void putItem(Object x) {
if (markAsUnused(x))
available.release();
}

// Not a particularly efficient data structure; just for demo

protected Object[] items = ... whatever kinds of items being managed
protected boolean[] used = new boolean[MAX_AVAILABLE];

protected synchronized Object getNextAvailableItem() {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (!used[i]) {
used[i] = true;
return items[i];
}
}
return null; // not reached
}

protected synchronized boolean markAsUnused(Object item) {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (item == items[i]) {
if (used[i]) {
used[i] = false;
return true;
} else
return false;
}
}
return false;
}
}
}

如上,示例中用到的api就两个,即acquire和release,意为获取一个许可及释放一个许可。

Sync变量

内部抽象类Sync继承自AQS,用AQS中的volatile int state变量表示许可数量。Sync的子类有两个版本,fair和nonfair。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;

Sync(int permits) {
// state 表示当前许可数量
setState(permits);
}

final int getPermits() {
return getState();
}

// 非公平式获取许可,cas操作,state减去acquires
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
// 注意这里remaining < 0
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

// 释放许可就是state + releases
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}

final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}

final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}

static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;

NonfairSync(int permits) {
super(permits);
}

protected int tryAcquireShared(int acquires) {
// 非公平式获取许可,调用父类(Sync)的nonfairTryAcquireShared
return nonfairTryAcquireShared(acquires);
}
}

static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;

FairSync(int permits) {
super(permits);
}

protected int tryAcquireShared(int acquires) {
for (;;) {
// 先查看有没有前驱在阻塞等着获取许可,如果有,当前线程获取失败
// 这就是跟非公平的区别
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}

Sync的两个子类,NonfairSync和FairSync,分别表示在获取许可时是非公平式(抢占式)和公平式。

Semaphore.acquire

获取许可,调用Sync.acquireSharedInterruptibly(1)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 调用内部实现类tryAcquireShared
if (tryAcquireShared(arg) < 0)
// 池中许可数量小于0,即state<0
doAcquireSharedInterruptibly(arg);
}

private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 在链表尾部添加一个node表示当前阻塞的节点
// 注意头结点为一个标识节点,如下addWaiter方法
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 前驱
final Node p = node.predecessor();
if (p == head) {
// 调用内部类(fair or nonfair)实现tryAcquireShared,获取许可
int r = tryAcquireShared(arg);
if (r >= 0) {
// 许可数量恢复>0,设置当前节点为头节点并且唤醒下一节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 没有许可可获取,阻塞在这里,等待唤醒
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 链表中没有前驱
enq(node);
return node;
}

private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
// 先要设置一个"空"头结点
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

需要注意,当没有阻塞的节点时,即链表为空,这时往链表添加节点时是往一个”空”头节点后添加。唤醒时,在阻塞位置恢复再次循环,如果前驱是头结点且当前池中有许可,那么设置当前节点为头结点,并唤醒下一节点,否则再次阻塞。

Semaphore.release

调用Sync.releaseShared(1)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public final boolean releaseShared(int arg) {
// sync中的tryReleaseShared, state += arg
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
// 如果有阻塞的节点
if (h != null && h != tail) {
int ws = h.waitStatus;
// 阻塞时,在shouldParkAfterFailedAcquire这个方法里,将node的前驱已经设置为SIGNAL
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒h的下一节点,如下unparkSuccessor分析
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 从后往前找,去掉已经cancel的节点,见AQS类waitStatus的可取类型
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒node的下一可用节点
LockSupport.unpark(s.thread);
}

唤醒时总是唤醒头结点的下一节点。注意waitStatus这个状态,在阻塞时,会在shouldParkAfterFailedAcquire这个方法里,将当前阻塞节点的前缀设置为SIGNAL。

-------------The End-------------