JDK源码阅读之ReentrantReadWriteLock

写在前面

作者Doug Lea_如此描述这个类:An implementation of {@link java.util.concurrent.locks.ReadWriteLock} supporting similar semantics to {@link java.util.concurrent.locks.ReentrantLock}.
ReentratReadWriteLock是一个可重入的读写锁,实现了ReadWriteLock接口,具有与ReentrantLock同样的语义。此外,ReentrantReadWriteLock只支持_65535个可重入写锁和65535个读锁,以及其他一些特性,如下示例中的特性。

源码分析自 JDK 1.8.0_171
接着看一下使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// 锁降级,write -> read
class CachedData {
Object data;
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// Must release read lock before acquiring write lock
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// Recheck state because another thread might have
// acquired write lock and changed state before we did.
if (!cacheValid) {
data = ...
cacheValid = true;
}
// Downgrade by acquiring read lock before releasing write lock
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock(); // Unlock write, still hold read
}
}

try {
use(data);
} finally {
rwl.readLock().unlock();
}
}
}

// for a large collection,and concurrently accessed.
class RWDictionary {
private final Map<String, Data> m = new TreeMap<String, Data>();
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock();
private final Lock w = rwl.writeLock();

public Data get(String key) {
r.lock();
try { return m.get(key); }
finally { r.unlock(); }
}

public String[] allKeys() {
r.lock();
try { return m.keySet().toArray(); }
finally { r.unlock(); }
}

public Data put(String key, Data value) {
w.lock();
try { return m.put(key, value); }
finally { w.unlock(); }
}

public void clear() {
w.lock();
try { m.clear(); }
finally { w.unlock(); }
}
}

以上示例就是ReentrantReadWriteLock的基础用法了。另外的一些用法即特性,如WriteLock也有newCondition的api,写锁降级,可重入,写锁线程能获取读锁但反过来却不行,等等特性。

内部变量

内部共有三个变量,实现 java.util.concurrent.locks.Lock 接口的 ReadLockWriteLock ;以及继承自AQS的抽象类Sync实例, FairSyncNonfairSync 继承自Sync,表示该锁具备公平/非公平语义,有一个带boolean参数的构造函数,根据这个boolean参数决定sync为哪个子类实例。
在展开加解锁流程前,先看一下上述提起的内部类。

Sync

内部类Sync继承自AQS,主要功能是通过这个类提供:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
abstract static class Sync extends AbstractQueuedSynchronizer {
// 高16位才表示共享锁个数,即读锁个数,低位表示写锁
static final int SHARED_SHIFT = 16;
// 一个SHARED_UNIT表示一个读锁,加读锁即:state+=SHARED_UNIT
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// 最大的读锁个数
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
// 互斥锁掩码,即写锁掩码
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

/** Returns the number of shared holds represented in count */
// 读锁个数,无符号右移,高位值
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }

/** Returns the number of exclusive holds represented in count */
// 写锁个数,低位值
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

/**
* A counter for per-thread read hold counts.
* Maintained as a ThreadLocal; cached in cachedHoldCounter
*/
// 线程持有锁个数
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}

/**
* ThreadLocal subclass. Easiest to explicitly define for sake
* of deserialization mechanics.
*/
// 注意是继承自TheadLocal,各个线程维护各自的holdCounter
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}

private transient ThreadLocalHoldCounter readHolds;

// 缓存上一线程的holder,便于release,大概率会节省ThreadLocal的检索次数
// 因为一般情况下是,上一线程release,下一线程acquire
private transient HoldCounter cachedHoldCounter;

//
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;

Sync() {
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // ensures visibility of readHolds
}

abstract boolean readerShouldBlock();

abstract boolean writerShouldBlock();

protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
// 所有写锁已经完全释放
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}

protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
// w == 0说明已经有读锁存在了
// current != getExclusiveOwnerThread() 说明写锁已经被其他线程持有
// 以上两种情况都会失败
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
// 超过了最大锁限制
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
// 可重入
setState(c + acquires);
return true;
}
// 子类(FairSync or NonfairSync)实现writerShouldBlock
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
// 设置当前线程为owner
setExclusiveOwnerThread(current);
return true;
}

protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
// 已经全部释放
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
// 该线程不再持有读锁,那么需要清理ThreadLocal
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
// count 减1
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
// cas state -= SHARED_UNIT
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
// 如果读并发过大,这里会始终return false,最终造成写线程饥饿
return nextc == 0;
}
}

protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
// 写锁已经被其他线程持有
// 如果被当前线程持有,则往下接着执行
return -1;
int r = sharedCount(c);
// readerShouldBlock有Sync子类(FairSync or NonfairSync)实现
// 注意这里,公平锁的时候,读写锁都是公平的,先来后到
if (!readerShouldBlock() &&
r < MAX_COUNT &&
// cas state += SHARED_UNIT
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
// 初始获取读锁
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// 如果cas失败
return fullTryAcquireShared(current);
}

final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
// 写锁已经被持有
if (getExclusiveOwnerThread() != current)
// 并不是当前线程持有写锁
return -1;
// 否则那么是当前线程持有写锁,往下接着执行
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// 队列里已经有其他线程节点,具体情况,取决于是Fair 还是 NonFair
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// 相当于重入
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
// 不再持有读锁的线程节点,需要清理ThreadLocal
if (rh.count == 0)
readHolds.remove();
}
}
// 已经清理了,那么需要重新入队
// 否则向下执行(重入)
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
// 超过了最大读锁限制
throw new Error("Maximum lock count exceeded");
// cas state += SHARED_UNIT
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 未有线程持有读锁
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}

// ... 省略其他代码
}

static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
// 抢占式
return false; // writers can always barge
}
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
// from AQS
return apparentlyFirstQueuedIsExclusive();
}
}

// from AQS 当前queue下一节点是否互斥节点
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}

static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
// from AQS,解析见下
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}

// from AQS
// 是否有前驱节点
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
// h.next 为null,可以理解为有前驱,因为head为上一刚刚释放锁的最后节点
// 其实也可以理解为目前队列里没有节点
((s = h.next) == null || s.thread != Thread.currentThread());
}

同样,也是利用AQS的state变量来标识锁的个数,不同的是,ReentrantReadWriteLock利用高16位表示读锁,低16位表示写锁。两个抽象方法 writerShouldBlockReaderShouldBlock ,子类 FairSyncNonfairSync 通过实现这两个方法来提供公平/非公平锁的功能。

ReadLock

实现 java.util.concurrent.locks.Lock 接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public static class ReadLock implements java.util.concurrent.locks.Lock, java.io.Serializable {
private final Sync sync;

protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}

/**
* Acquires the read lock.
*
* <p>Acquires the read lock if the write lock is not held by
* another thread and returns immediately.
*
* <p>If the write lock is held by another thread then
* the current thread becomes disabled for thread scheduling
* purposes and lies dormant until the read lock has been acquired.
*/
public void lock() {
// 见上Sync的该方法的解析
sync.acquireShared(1);
}

/**
* Attempts to release this lock.
*
* <p>If the number of readers is now zero then the lock
* is made available for write lock attempts.
*/
public void unlock() {
// 见上Sync的该方法的解析
sync.releaseShared(1);
}

public java.util.concurrent.locks.Condition newCondition() {
throw new UnsupportedOperationException();
}

// 省略其他代码,有兴趣可以自行研读源码
// like lockInterruptibly tryLock tryLock(long timeout, TimeUnit unit)
}

ReadLock是不支持 newCondition 这一api的。

WriteLock

同样,WriteLock也是实现了java.util.concurrent.locks.Lock接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static class WriteLock implements java.util.concurrent.locks.Lock, java.io.Serializable {
private final Sync sync;

protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}

public void lock() {
// 见上Sync解析
sync.acquire(1);
}

public void unlock() {
// 见上Sync解析
sync.release(1);
}

public java.util.concurrent.locks.Condition newCondition() {
return sync.newCondition();
}

// 省略其他代码
}

WriteLock是支持newCondition api的,这一api是构造一个AQS的内部类实例,具体可以看我之前的文章

WriteLock.lock

先看一下写锁的加锁流程。
WriteLock.lock() 调用内部类Sync的 acquire(1) 方法,acquire方法AQS提供的一个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
   // from AQS
public final void acquire(int arg) {
// tryAcquire from Sync 如果已经有读锁或者写锁已经被其他线程持有,则返回false
if (!tryAcquire(arg) &&
// 尝试获取写锁失败,则入队一个EXCLUSIVE互斥节点
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

// from AQS
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// tryAcquire from Sync,解析见上
if (p == head && tryAcquire(arg)) {
// 设置当前节点为头结点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 更改node的status为SIGNAL
if (shouldParkAfterFailedAcquire(p, node) &&
// park的地方
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

可见写锁加锁过程就是对state变量进行操作的过程,公平/非公平锁主要是通过 writerShouldBlock 这个方法,非公平这个方法直接返回false,也就是抢占式地去获取锁,而公平则是会查看队列里是否有前驱,如有则失败。

WriteLock.unlock

接着是写锁的解锁过程。
同样,也是调用Sync内部方法 release(1) , release也为AQS的一个方法:

1
2
3
4
5
6
7
8
9
10
11
12
public final boolean release(int arg) {
// tryRelease from Sync,见上解析
if (tryRelease(arg)) {
Node h = head;
// 在acquire的阻塞位置已经将waitStatus更新为SIGNAL了
if (h != null && h.waitStatus != 0)
// 这个方法其实真正唤醒的是h.next
unparkSuccessor(h);
return true;
}
return false;
}

ReadLock.lock

方法内部调用的是AQS的 acquireShared(1) 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
  public final void acquireShared(int arg) {
// tryAcquireShared见上Sync解析
if (tryAcquireShared(arg) < 0)
// 见下
doAcquireShared(arg);
}

// from AQS
private void doAcquireShared(int arg) {
// 在队列尾部添加一SHARED节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 见上Sync解析
int r = tryAcquireShared(arg);
if (r >= 0) {
// 设置当前节点为头头结点
// 并唤醒下一节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 更新前驱节点waitStatus为SIGNAL
if (shouldParkAfterFailedAcquire(p, node) &&
// park
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

结合Sync的解析,读锁的获取过程就是对state这一变量的操作过程。解析还是很清晰的,具体过程可以看一下解析。

ReadLock.unlock

unlock调用的仍是AQS内部方法, releaseShared(1) :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
  public final boolean releaseShared(int arg) {
// tryReleaseShared from Sync,见上解析
// 如果读线程并发大,那么tryReleaseShared总会返回false,则始终不会执行到doReleaseShared,则造成写线程饥饿
if (tryReleaseShared(arg)) {
// 见下
doReleaseShared();
return true;
}
return false;
}

// from AQS
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 阻塞前已经将waitStatus更新为SIGNAL
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒h.next
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

可见,解锁读锁,是对state操作,state-=_SHARED_UNIT,_接着再查看队列是否有等待节点,如有则需唤醒。

总结

  1. state内部变量的高16位表示所有线程持有读锁个数,低16位表示持有写锁个数
  2. 读写锁都为可重入的
  3. 写锁为互斥锁,同时持有线程持有读锁。加解锁过程均为cas操作state变量。加锁过程state+=1,解锁过程state-=1。加锁时如果其它线程持有锁,则往队列里添加一个EXCLUSIVE节点表示当前线程等待写锁,并在当前位置park住等待唤醒,解锁时,如果队列里有等待节点则需要唤醒节点对应线程
  4. 读锁为共享锁,多个线程能同时获得读锁。加锁过程即为state+=SHARED_UNIT(2^16)。加解锁过程是cas操作state变量高16位。加锁过程为state+=2^16,解锁过程state-=2^16。加锁时如果其它线程持有写锁,则往队列里添加一个SHARED节点标识当前线程等待读锁,并在当前位置park住等待唤醒,解锁时如果队列里有等待节点则需唤醒对应节点线程
  5. 能看到,无论Fari还是Nonfair,写锁的获取都有可能因为读锁阻塞,在一定情况下,造成了获取写锁的线程饥饿的现象
-------------The End-------------