JDK源码阅读之StampedLock

写在前面

作者Doug Lea_如此描述这个类:_A capability-based lock with three modes for controlling read/write access.
这是一个有三种模式的锁。具体哪三种模式,请看源码中的示例。

源码分析自 JDK 1.8.0_171

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/**
* 三种模式用法
* reading & writing & optimistic read
*/
class Point {
private double x, y;
private final StampedLock sl = new StampedLock();

// writing mode
void move(double deltaX, double deltaY) { // an exclusively locked method
long stamp = sl.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
sl.unlockWrite(stamp);
}
}

// Optimistic Reading mode
double distanceFromOrigin() { // A read-only method
long stamp = sl.tryOptimisticRead();
double currentX = x, currentY = y;
// 会有一个验证的步骤
if (!sl.validate(stamp)) {
stamp = sl.readLock();
try {
currentX = x;
currentY = y;
} finally {
sl.unlockRead(stamp);
}
}
return Math.sqrt(currentX currentX + currentY currentY);
}

// lock upgrade
void moveIfAtOrigin(double newX, double newY) { // upgrade
// Could instead start with optimistic, not read mode
long stamp = sl.readLock();
try {
while (x == 0.0 && y == 0.0) {
long ws = sl.tryConvertToWriteLock(stamp);
if (ws != 0L) {
stamp = ws;
x = newX;
y = newY;
break;
} else {
sl.unlockRead(stamp);
stamp = sl.writeLock();
}
}
} finally {
sl.unlock(stamp);
}
}
}

ReentrantReadWriteLock中我们知道,如果在读并发比较高的情况下,那么可能会导致写线程饥饿。而StampedLock并不会发生写饥饿。另外,它也有锁升级的特性(ReentrantReadWriteLock只有锁降级)。那么看看它是怎么实现的吧。

实现原理

锁算法是借鉴了序列锁(linux内核,参考文章1文章2)和顺序读写锁(参考文章)。具体算法细节还可以参考源码中的描述。
简单描述一下,用一个long类型(state)的变量表示锁状态(写锁、读锁),低7位表示读锁状态,其他位表示写锁(第8位是否未1表示是否持有写锁,前面说的sequence lock),如果低7位不够表示读锁位,那么会用一个int值表示”溢出”的读锁。那么写锁可以表示为:state+=2^7;读锁可以表示为state+=1。此外,源码中还用了大量自旋来减少饥饿的概率,头部节点表示的线程不会阻塞,而是会一直自旋等待锁释放。
结合内部的变量声明理解一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
   /** The number of bits to use for reader count before overflowing */
// 在state上用低7位表示reader count
private static final int LG_READERS = 7;

// Values for lock state and stamp operations
// 表示一个reader
private static final long RUNIT = 1L;
// 表示一个writer
private static final long WBIT = 1L << LG_READERS;
// reader掩码
private static final long RBITS = WBIT - 1L;
// state上能表示的最大reader count
private static final long RFULL = RBITS - 1L;
// state上表示锁的所有位的掩码
private static final long ABITS = RBITS | WBIT;
// 反码
private static final long SBITS = ~RBITS; // note overlap with ABITS

// Values for node status; order matters
// 表示等待
private static final int WAITING = -1;
// 表示取消
private static final int CANCELLED = 1;

// Modes for nodes (int not boolean to allow arithmetic)
// 读
private static final int RMODE = 0;
// 写
private static final int WMODE = 1;

/** Wait nodes */
static final class WNode {
volatile WNode prev;
volatile WNode next;
// 表示当前节点上的reader
volatile WNode cowait; // list of linked readers
volatile Thread thread; // non-null while possibly parked
// 相当于CLH上的那个locked
volatile int status; // 0, WAITING, or CANCELLED
final int mode; // RMODE or WMODE
WNode(int m, WNode p) { mode = m; prev = p; }
}

/** Head of CLH queue */
private transient volatile WNode whead;
/** Tail (last) of CLH queue */
private transient volatile WNode wtail;

/** Lock sequence/state */
// 锁状态,或者叫锁序列
private transient volatile long state;
/** extra reader count when state read count saturated */
// state上只有低7位用来表示reader,超出的部分用这个int值表示
// 注意这个变量并不是volatile
// 改变这个值时,都是cas操作state,并且改变之后也会对state赋值,可见性和原子性通过这两个操作保证的
// 即 s=state; cmp(state,RBITS);++readerOverflow(or readerOverflow -= 1);state=s
// 自旋锁保证原子性,内存屏障保证可见性
private transient int readerOverflow;

总结一下,用一个long型变量表示锁的计数,读锁计数用低7位表示,超出部分用一个int值表示。其他位表示写锁状态,第8位如果为1则表示写锁被持有,为0表示未被持有,也就是序列锁,这样即保证了锁(包括读写)的原子性,也能提高效率(操作读或写锁要保证互斥)。用一个CLH队列表示等待的节点,避免饥饿现象,节点首先会采用自旋的方式尝试获取锁。
接下来就跟着3个示例解析一下加解锁的流程。

Reading Mode

经过上面的分析,state上的低7位是用来表示reader count,而溢出部分,则用readerOverflow表示。

readLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
   /**
* 非互斥获取读锁,如果暂时不可取,则会阻塞直到可取
*/
public long readLock() {
long s = state, next; // bypass acquireRead on common uncontended case
// 队列为空
return ((whead == wtail
// 并且reader count 小于RFULL(2^7 - 2=126)
&& (s & ABITS) < RFULL
// cas操作state,state += 1
&& U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
// 获取成功返回最新state值,否则,进入acquireRead(false,0L)
next : acquireRead(false, 0L));
}

/**
* 方法有点长,这个方法主要做的是在一定时间内通过自旋去请求锁,如果仍没请求到,则进入阻塞状态
*/
private long acquireRead(boolean interruptible, long deadline) {
WNode node = null, p;
for (int spins = -1;;) {
WNode h;
// 队列为空(or仅有一个节点)
if ((h = whead) == (p = wtail)) {
for (long m, s, ns;;) {
// 写锁未被占有,并且读锁数量小于RFULL(126),ABITS=255,2^8-1
if ((m = (s = state) & ABITS) < RFULL ?
// cas尝试获取读锁,state+=1
U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
// 写锁未被占用,读锁数量大于RFULL,执行tryIncReaderOverflow方法
// 关于tryIncReaderOverflow见下该方法解析
(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))
// 更新成功,始终返回当前最新锁状态
return ns;
// 到这里那写锁就被占用了
else if (m >= WBIT) {
// 自旋等待
if (spins > 0) {
// 这个随机是几个意思?分散线程状态聚集在一个状态?避免都在一个时间点阻塞or唤醒?
if (LockSupport.nextSecondarySeed() >= 0)
--spins;
}
else {
// 自旋结束
if (spins == 0) {
WNode nh = whead, np = wtail;
// 头尾未改变
if ((nh == h && np == p)
// 头尾改变且队列不再为空
|| (h = nh) != (p = np))
break;
}
// 因为在头部,上一轮自旋未拿到锁那紧接着下一轮自旋
// SPINS = (NCPU > 1) ? 1 << 6 : 0;
spins = SPINS;
}
}
}
}
// 尾巴为空,即 队列为空,需要初始化队列
if (p == null) { // initialize queue
WNode hd = new WNode(WMODE, null);
if (U.compareAndSwapObject(this, WHEAD, null, hd))
wtail = hd;
}
// 构造节点入队
else if (node == null)
node = new WNode(RMODE, p);
// 头尾相等或尾节点表示非读锁节点(也就是尾节点是写锁请求线程节点)
else if (h == p || p.mode != RMODE) {
// 队列尾巴改变
if (node.prev != p)
// 重新置尾巴
node.prev = p;
// 往队列追加节点
else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
p.next = node;
// 跳出当前自旋循环,进入下一个for自旋循环
// 唯一跳出本次for循环的地方,也就是只有在往队列新增了个节点的时候出去进入下一个for自旋循环
break;
}
}
// 将当前节点放在尾巴(是读节点或者头节点)cowait(表示读锁等待节点)链表位置头部
else if (!U.compareAndSwapObject(p, WCOWAIT,
node.cowait = p.cowait, node))
// 失败
node.cowait = null;
// cas成功
else {
// 阻塞等待读锁,直到唤醒时,前驱已为头节点
for (;;) {
WNode pp, c; Thread w;
// 唤醒头结点上的第一个wcowait
if ((h = whead) != null && (c = h.cowait) != null &&
U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null) // help release
// 唤醒头结点上的cowait读节点线程
U.unpark(w);
// 前驱为头结点,或当前节点为头结点,或前驱的前驱为空
// 也就是在队列的头部位置
// 则一直自旋,尝试获取读锁,除非已经有写锁被占有
if (h == (pp = p.prev) || h == p || pp == null) {
long m, s, ns;
do {
// 获取读锁
if ((m = (s = state) & ABITS) < RFULL ?
U.compareAndSwapLong(this, STATE, s,
ns = s + RUNIT) :
(m < WBIT &&
(ns = tryIncReaderOverflow(s)) != 0L))
return ns;
// 如果是写锁已被占有则跳出
} while (m < WBIT);
}
// 前驱未改变(未cancel)以及头节点未改变(即未有锁的释放获取)
if (whead == h && p.prev == pp) {
long time;
// 前驱的前驱为空,或前驱节点为头结点,或节点已经cancel
// 当目前节点是头节点,则跳出当前for,进入下一个for循环,自旋获取锁
if (pp == null || h == p || p.status > 0) {
node = null; // throw away
break;
}
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
// 将node.status=CANCELLED
// 将p的cowait链表上取消的节点剔除
return cancelWaiter(node, p, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
// 前驱不为头结点
if ((h != pp
// 或写锁已经被其它线程获取且当前头结点未改变且前驱未改变
|| (state & ABITS) == WBIT) && whead == h && p.prev == pp)
// 阻塞当前线程等待唤醒
U.park(false, time);
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted())
return cancelWaiter(node, p, true);
}
}
}
}

// 这个循环就是头节点"进一步的深度"自旋,"其他"节点则阻塞
for (int spins = -1;;) {
WNode h, np, pp; int ps;
// 队列为空,即当前节点在头部位置
if ((h = whead) == p) {
if (spins < 0)
// HEAD_SPINS = (NCPU > 1) ? 1 << 10 : 0
// 首次自旋2^10
spins = HEAD_SPINS;
// (NCPU > 1) ? 1 << 16 : 0
else if (spins < MAX_HEAD_SPINS)
// 指数型递增
spins <<= 1;
// 自旋等待锁,注意这里是在头部自旋
for (int k = spins;;) { // spin at head
long m, s, ns;
// 下面是获取读锁
if ((m = (s = state) & ABITS) < RFULL ?
U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
// 获得读锁
WNode c; Thread w;
whead = node;
node.prev = null;
// 唤醒所有在node上的wcowait读锁等待线程
while ((c = node.cowait) != null) {
if (U.compareAndSwapObject(node, WCOWAIT,
c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
return ns;
}
// 已经有写锁被占用了,且自旋结束,结束此次自旋
else if (m >= WBIT &&
LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
break;
}
}
// 队列不为空,且头节点部位空
else if (h != null) {
WNode c; Thread w;
// 依次唤醒头结点上的读锁等待线程,如果有cowait的话
while ((c = h.cowait) != null) {
if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
}
// 头节点未改变
if (whead == h) {
// 即前驱变了,如cancel了
if ((np = node.prev) != p) {
if (np != null)
(p = np).next = node; // stale
}
else if ((ps = p.status) == 0)
// 更改前驱节点状态为WAITING
U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
// 取消的节点要剔除掉
else if (ps == CANCELLED) {
if ((pp = p.prev) != null) {
node.prev = pp;
pp.next = node;
}
}
else {
long time;
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
// 这个cancelWaiter跟前边的参数不一样
// 这里是除了前边的cancelWaiter的功能,还需要做唤醒node上的cowait
// 并且将cowait链表的第一个节点唤醒,且用这个cowait替换掉node的位置
return cancelWaiter(node, node, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
// 前驱节点状态为WAITING
if (p.status < 0 &&
// 前驱不为头结点,或当前写锁被其它线程持有
(p != h || (state & ABITS) == WBIT) &&
whead == h && node.prev == p)
// 阻塞在这里
U.park(false, time);
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted())
return cancelWaiter(node, node, true);
}
}
}
}

/**
* 相当于是把自旋锁对state更新
* 此外,readerOverflow并不是volatile,但是它的可见性和原子性是怎么保证的?
* 可见性:state是volatile,可以借助它的内存屏障,因为readerOverflow是在state前操作的
* 原子性:读锁在state上最大只有126个,当超过126是将state低位置为127,操作结束后将其置回126
*/
private long tryIncReaderOverflow(long s) {
// assert (s & ABITS) >= RFULL;
if ((s & ABITS) == RFULL) {
// RBITS = 2^7 - 1
if (U.compareAndSwapLong(this, STATE, s, s | RBITS)) {
++readerOverflow;
state = s;
return s;
}
}
else if ((LockSupport.nextSecondarySeed() &
OVERFLOW_YIELD_RATE) == 0)
Thread.yield();
return 0L;
}

首先明确获取读锁并不是互斥的,而是通过cas操作去尝试获取。其次是通过自旋去尝试获取锁,对于头部位置的节点总是在自旋等待锁。

unlockRead

接下来看一下时如何释放读锁的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/**
* If the lock state matches the given stamp, releases the
* non-exclusive lock.
*
* @param stamp a stamp returned by a read-lock operation
* @throws IllegalMonitorStateException if the stamp does
* not match the current state of this lock
*/
public void unlockRead(long stamp) {
long s, m; WNode h;
for (;;) {
// SBITS=~(2^7-1),低8位的掩码
// 如果写锁的状态发生过变更
if (((s = state) & SBITS) != (stamp & SBITS) ||
// 或stamp目前表示没有锁,或state表示目前没有锁
(stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
throw new IllegalMonitorStateException();
// 读锁小于RFULL=126
if (m < RFULL) {
// cas state-=1
if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
// 最后一把锁,且队列头节点不为空,头节点状态也不是初始状态(WAITING状态)
if (m == RUNIT && (h = whead) != null && h.status != 0)
// 则释放h.next等待的线程节点
release(h);
break;
}
}
// 目前的写锁状态时溢出的,需要对readerOverflow进行处理,看下面该方法的解析
else if (tryDecReaderOverflow(s) != 0L)
break;
}
}

/**
* Tries to decrement readerOverflow.
*
* @param s a reader overflow stamp: (s & ABITS) >= RFULL
* @return new stamp on success, else zero
*/
private long tryDecReaderOverflow(long s) {
// assert (s & ABITS) >= RFULL;
// state读锁状态满了
if ((s & ABITS) == RFULL) {
// 获取spinlock,即"自旋锁",正常应该时state=127
if (U.compareAndSwapLong(this, STATE, s, s | RBITS)) {
int r; long next;
if ((r = readerOverflow) > 0) {
readerOverflow = r - 1;
next = s;
}
else
next = s - RUNIT;
// state为volatile,有barrier,加上上边那个127
//所以readerOverflow的可见性原子性得以保证
// state=126
state = next;
return next;
}
}
// 可能会放弃CPU时钟
else if ((LockSupport.nextSecondarySeed() &
OVERFLOW_YIELD_RATE) == 0)
Thread.yield();
return 0L;
}

Writing Mode

writeLock

读锁状态也是也是在state上表示,除了表示读锁状态的低7位,剩下的高25位都表示写锁状态。第8位为1表示写锁被占用,否则表示未被占用,写锁状态时往上递增的,也就是说获取读锁state需要+WBIT,释放也是+WBIT。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
   public long writeLock() {
long s, next; // bypass acquireWrite in fully unlocked case only
// ABITS=2^8-1=255
return ((((s = state) & ABITS) == 0L &&
// WBIT=2^7=128
U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
// 如果低8位为0,且cas state+=WBIT成功,即表示获取读锁成功
// 否则进入acquireWrite(false,0L),这个方法也是在无锁状态下才会获取写锁成功
next : acquireWrite(false, 0L));
}

// 这个方法时跟acquireRead一样,较长,也比较复杂,不过做的事情比较简单
// 就是自旋等待写锁,或者入队阻塞等待唤醒
private long acquireWrite(boolean interruptible, long deadline) {
WNode node = null, p;
for (int spins = -1;;) { // spin while enqueuing
long m, s, ns;
// 无锁状态,则尝试获取写锁
if ((m = (s = state) & ABITS) == 0L) {
// state += WBIT(2^7=128)
if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
return ns;
}
// 自旋等待
else if (spins < 0)
// 如果是写锁且当前队列仅有一个写锁节点(也就是这个节点持有锁),则自旋尝试获取锁
// 否则就入队囖
spins = (m == WBIT && wtail == whead) ? SPINS : 0;
// 自旋
else if (spins > 0) {
if (LockSupport.nextSecondarySeed() >= 0)
// 随机,原因应该是上边提过的那个
--spins;
}
// 自旋结束,如队列为空则初始化队列
else if ((p = wtail) == null) { // initialize queue
WNode hd = new WNode(WMODE, null);
if (U.compareAndSwapObject(this, WHEAD, null, hd))
wtail = hd;
}
// 构造当前节点入队
else if (node == null)
node = new WNode(WMODE, p);
else if (node.prev != p)
// CLH队列,需要指定一下prev
node.prev = p;
// 入队
else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
p.next = node;
// 成功入队,就需要进入下面的for循环,进入"深度自旋"
break;
}
}

for (int spins = -1;;) {
WNode h, np, pp; int ps;
// 前驱就是头结点,那么就一直在这儿自旋尝试获取锁
if ((h = whead) == p) {
// 自旋
if (spins < 0)
// HEAD_SPINS = 2^10,先自旋少一点意思意思
spins = HEAD_SPINS;
// MAX_HEAD_SPINS = 2^16
else if (spins < MAX_HEAD_SPINS)
// 如果一直没有获取到锁,自旋的时间是指数型增长,直到MAX_HEAD_SPINS
spins <<= 1;
// 一轮自旋
for (int k = spins;;) { // spin at head
long s, ns;
// state上并无锁状态
if (((s = state) & ABITS) == 0L) {
// 则去尝试获取,cas state+=WBIT
if (U.compareAndSwapLong(this, STATE, s,
ns = s + WBIT)) {
// 获取成功,将当前节点node置为带头大哥
whead = node;
// GC
node.prev = null;
return ns;
}
}
// 随机递减
else if (LockSupport.nextSecondarySeed() >= 0 &&
--k <= 0)
break;
}
}
// 当前节点并不是太子,登不了基,前边排着队呢
// 头节点上的cowait,有些可能过时了,可以尝试唤醒
else if (h != null) { // help release stale waiters
WNode c; Thread w;
// 挨个唤醒在head上的reader链表,看看当前能不能获取读锁
while ((c = h.cowait) != null) {
if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
}
// 如果当前头结点未改变
if (whead == h) {
// 前驱改变,清空已经失效的节点
if ((np = node.prev) != p) {
if (np != null)
(p = np).next = node; // stale
}
// 更新前驱节点状态为WAITING,0->WAITING
else if ((ps = p.status) == 0)
U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
// 清空已经取消的节点
else if (ps == CANCELLED) {
if ((pp = p.prev) != null) {
node.prev = pp;
pp.next = node;
}
}
else {
long time; // 0 argument to park means no timeout
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
// 同acquireRead里的解析
return cancelWaiter(node, node, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
// 未被cancel
if (p.status < 0
// 前驱节点不为头结点,或锁状态为仍然被持有,当锁置为可获取状态时,如果不是队列头部的节点,那么也可以进入park
&& (p != h || (state & ABITS) != 0L)
// 头结点未改变
&& whead == h
// 前驱仍是p
&& node.prev == p)
// 则park阻塞
U.park(false, time); // emulate LockSupport.park
// 从阻塞中唤醒
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted())
return cancelWaiter(node, node, true);
}
}
}
}

由此可见,写锁的获取也是通过自旋,如果队列里有排队的节点,那么入队,保证个公平公正,这也是CLH队列的作用的地方。并不会有开始说的写锁饥饿的现象。

unlockWrite

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 如果stamp是当前锁状态,并且表示持有写锁,那么释放写锁
*/
public void unlockWrite(long stamp) {
WNode h;
if (state != stamp || (stamp & WBIT) == 0L)
throw new IllegalMonitorStateException();
// ORIGIN = 128 << 1 = 256
state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
if ((h = whead) != null && h.status != 0)
// status由WATING->0,剔除队列中cancel的节点
// 并且唤醒下一等待节点线程,如果有的话
release(h);
}

upgrade

文章开头的示例中,第三个示例就是读锁升级写锁的例子。在ReentranReadWriteLock中,写锁可以降级为读锁,而StampedLock可以由读锁能直接升级为写锁。首先是需要持有读锁(readLock),接着会尝试升级写锁(tryConvertToWriteLock),如果升级成功,则直接操作业务并在最后释放锁(unlock),否则需要释放读锁(unlockRead)获取写锁(writeLock)。我们主要来看_tryConvertToWriteLock_和unlock,其它的逻辑都已经在上边讨论过了。来看看具体是怎么实现的吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
   /**
* If the lock state matches the given stamp, performs one of
* the following actions. If the stamp represents holding a write
* lock, returns it. Or, if a read lock, if the write lock is
* available, releases the read lock and returns a write stamp.
* Or, if an optimistic read, returns a write stamp only if
* immediately available. This method returns zero in all other
* cases.
*
* @param stamp a stamp
* @return a valid write stamp, or zero on failure
*/
/**
* 上边说的很明白了:
* 1.如果stamp已经表示写锁,则直接返回
* 2.如果是读锁,会看看写锁是不是可获取,如果可以则会释放读锁返回写锁的stamp
* 3.如果是乐观读锁,会立即返回写锁当且仅当目前时间点写锁可取
* 4.其他情况就都只会返回0
*/
public long tryConvertToWriteLock(long stamp) {
long a = stamp & ABITS, m, s, next;
// stamp和state表示写锁状态相同
while (((s = state) & SBITS) == (stamp & SBITS)) {
// 锁状态(低8位)目前并不持有任何锁
if ((m = s & ABITS) == 0L) {
// stamp表示有读锁,与state不一致,则整个方法返回0
if (a != 0L)
break;
// 尝试获取写锁(目前state并无锁)
if (U.compareAndSwapLong(this, STATE, s, next = s + WBIT))
return next;
}
// 写锁位已被占用,即写锁已被占用
else if (m == WBIT) {
// 并不是当前stamp获取的当前写锁
// 即写锁被其他线程占有
if (a != m)
break;
// 如果已持有写锁,则直接返回
return stamp;
}
// 当前读锁升级为写锁
// 目前state上仅有一个读锁状态位,且stamp上是有锁额
else if (m == RUNIT && a != 0L) {
// 则尝试释放读锁,加写锁
if (U.compareAndSwapLong(this, STATE, s,
next = s - RUNIT + WBIT))
return next;
}
else
// 其他情况一概返回0
break;
}
return 0L;
}

/**
* If the lock state matches the given stamp, releases the
* corresponding mode of the lock.
*
* @param stamp a stamp returned by a lock operation
* @throws IllegalMonitorStateException if the stamp does
* not match the current state of this lock
*/
/**
* 如果stamp确实是当前的"邮戳"(相当于邮票上的邮戳,与state上对比),则释放对应的锁
*/
public void unlock(long stamp) {
long a = stamp & ABITS, m, s; WNode h;
// 写锁状态未改变
while (((s = state) & SBITS) == (stamp & SBITS)) {
// 低8位,等于0表示未获取任何锁
if ((m = s & ABITS) == 0L)
break;
// 写锁被获取
else if (m == WBIT) {
// 改变
if (a != m)
break;
state = (s += WBIT) == 0L ? ORIGIN : s;
// 唤醒队列
if ((h = whead) != null && h.status != 0)
// unpark h.next
release(h);
return;
}
// 未持有任何锁,a>=WBIT表示stamp表示持有写锁,但是目前m!=WBIT,表示写锁已经被释放
else if (a == 0L || a >= WBIT)
break;
// 接下来就是读锁的释放了
else if (m < RFULL) {
if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
if (m == RUNIT && (h = whead) != null && h.status != 0)
release(h);
return;
}
}
else if (tryDecReaderOverflow(s) != 0L)
return;
}
throw new IllegalMonitorStateException();
}

Optimistic Reading Mode

示例中的第二个例子就是乐观读。先尝试获取读锁(tryOptimisticRead),接着校验(validate)下stamp是否变更,如果校验通过未发生变更则直接进行下一步,否则需要获取读锁(readLock)并操作完成之后释放(unlockRead)读锁。主要来看看_tryOptimisticRead和_validate,其它的流程逻辑参考前边的讨论。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
   /**
* Returns a stamp that can later be validated, or zero
* if exclusively locked.
*
* @return a stamp, or zero if exclusively locked
*/
/**
* 如果当前并无任何写锁被持有,则返回写锁状态(高25位),否则返回0
*/
public long tryOptimisticRead() {
long s;
// SBITS = ~(2^7 - 1) WBIT = 2^8
return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}

/**
* Returns true if the lock has not been exclusively acquired
* since issuance of the given stamp. Always returns false if the
* stamp is zero. Always returns true if the stamp represents a
* currently held lock. Invoking this method with a value not
* obtained from {@link #tryOptimisticRead} or a locking method
* for this lock has no defined effect or result.
*
* @param stamp a stamp
* @return {@code true} if the lock has not been exclusively acquired
* since issuance of the given stamp; else false
*/
/**
* 方法体很小,但是方法注释倒是挺长的嘛
* 其实就是验证stamp和当前state的锁状态是否一致
*/
public boolean validate(long stamp) {
// load barrier,保证state的可见性
U.loadFence();
// SBITS = ~(2^7 - 1)
return (stamp & SBITS) == (state & SBITS);
}

如上,tryOptimisticRead并不会去尝试获取读锁(即不会更改state),而是通过validate验证写锁状态是否在这期间改变过,如果未改变,则可以认为可以共享读锁,否则(即写线程操作过数据)需要实际去获取读锁。

总结

有以下几点值得我们注意一下:

  • 读写锁状态在一个volatile long型变量上表示,低7位表示读锁状态,溢出的读锁状态用readerOverflow表示,高25位表示写锁,并且写锁是序列递增的;
  • 申请锁时,使用了大量自旋操作。如果是在队列头部位置的等待线程节点,会一致自旋下去。否则当前线程节点会入队进行阻塞等待,虽然读和写具体的入队方式可能有点差别。通过这样的方式,不仅避免了饥饿现象,一定程度行还体现了公平性;
  • 读锁是共享的,写锁是互斥的。此外读锁还有种乐观读的方式,即尝试获取锁时,并不会更改当前读锁状态,而是通过验证期间写锁状态是否被更改的方式保证数据一致。此外还有一个读锁升级的功能,这是跟ReentrantReadWriteLock的区别。
-------------The End-------------