JDK源码阅读之ReentrantLok

写在前面

作者Doug Lea_如此描述这个类:_A reentrant mutual exclusion {@link **java.util.concurrent.locks.Lock} with the same basic behavior and semantics as the implicit monitor lock accessed using {@code **synchronized} methods and statements, but with extended capabilities.

分析自 JDK 1.8.0_171
ReentrantLock是继承 _java.util.concurrent.locks.Lock _的可重入互斥锁,它具有跟隐式监视器锁(Synchronized)同样语义,用于锁定一个方法或代码块,除此之外,它还有一些额外的功能。
ReentrantLock锁,只能被一个线程持有,如果该线程持有的同时尝试去获取该ReentrantLock,会立即返回。当一个线程获取ReentrantLock,即调用lock时,只有当该锁未被其它线程持有时才能成功。
看一下源码中的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
class X {
private final ReentrantLock lock = new ReentrantLock();
// ...

public void m() {
lock.lock(); // block until condition holds
try {
// ... method body
} finally {
lock.unlock()
}
}
}

另外,还有个lock.newCondition api可以使用,看下面用法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class Y {
private final ReentrantLock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();

public void waitOn() {
lock.lock();
try {
condition.await();
} catch (InterruptedException e) {
// ...
} finally {
lock.unlock();
}
}

public void signal() {
lock.lock();
try{
condition.signal();
// or condition.signalAll()
} finally {
lock.unlock();
}
}
}

注意,condition使用需在持有lock时。

ReentrantLock.lock

还是结合示例,看看整个流程是怎么串起来的。
有如下两个构造函数:

1
2
3
4
5
6
7
public ReentrantLock() {
sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

构造时通过传入一个boolean参数,可以实例化一个公平Lock。而这里的 FairSyncNonfairSync 是继承自内部类Sync,而Sync则继承自AQS。如下代码:
Sync类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;

/**
* Performs {@link java.util.concurrent.locks.Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
// 子类实现这个方法,以提供公平/非公平锁的功能
abstract void lock();

/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
// 非公平地尝试获取锁
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 利用AQS中的state变量表示锁的获取次数
int c = getState();
if (c == 0) {
// 目前所无持有者
if (compareAndSetState(0, acquires)) {
// 设置当前owner线程
// setExclusiveOwnerThread方法为AbstractOwnableSynchronizer类的方法
// 该类就一个私有变量Thread exclusiveOwnerThread
// 用于表示互斥资源的互斥持有线程
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 当前线程为锁的持有线程,即可重入
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 直接设置state
setState(nextc);
return true;
}
// 否则获取失败
return false;
}

// 尝试释放
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// 只有state==0时,才能认为锁可以被释放
free = true;
// 设置owner线程为null
setExclusiveOwnerThread(null);
}
// 设置state
setState(c);
return free;
}

protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}

// for api ReentranLock.newCondition
final ConditionObject newCondition() {
return new ConditionObject();
}

// Methods relayed from outer class

final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}

final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}

final boolean isLocked() {
// 锁是否被持有,即需判断state是否等于0
return getState() != 0;
}

/**
* Reconstitutes the instance from a stream (that is, deserializes it).
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}

内部抽象类 Sync 继承AQS,实现了一些基础功能,内部有个抽象方法lock。子类实现这个方法可提供公平/非公平锁的功能。
NonFairSync类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
   static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
// cas 操作state
if (compareAndSetState(0, 1))
// 获取成功,设置当前线程为owner线程
setExclusiveOwnerThread(Thread.currentThread());
else
// state 不等于0, 锁已经被其他线程持有
// acquire为AQS内方法,如下解析
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
// non fair模式,是直接调用Sync类的nonfairTryAcquire方法,如上Sync类的解析
return nonfairTryAcquire(acquires);
}
}

// from AQS
public final void acquire(int arg) {
// tryAcquire为FairSync/NonfairSync重写方法
if (!tryAcquire(arg) &&
// 构造一个node入链表
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

// from AQS
// 给定mode构建node节点,入链表,返回当前节点
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

// from AQS
// 获取锁
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 前驱为头结点
// tryAcquire为FairSync/NonfairSync重写方法
if (p == head && tryAcquire(arg)) {
// 已获取,设置当前node为head
// 唤醒总是唤醒头结点的下一节点线程
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
// 这里阻塞 LockSupport.park
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

FairSync类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

final void lock() {
// 与Non fair的区别
// 调用AQS的acquire
// 即先判断锁是否被持有,有没有前驱,如果持有是不是当前线程
// 然后再根据判断情况构造节点加入链表尾部,并阻塞。
acquire(1);
}

/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 与non fair 区别:是否有前驱,如有则获取失败
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

以上代码分析,非公平锁,通过cas操作state去获取锁,即0->1,如果获取失败,则再次尝试(cas操作state,持有者是否为当前线程),如果仍然获取失败,则构造一个node并接入链表尾部,并阻塞当前线程直到被唤醒。公平锁则不会直接去获取锁,而是在非公平锁基础上,会先查看链表是否有前驱,有则阻塞并构造新节点加入链表尾部。
辅助下面的图看源码可能会有帮助。
非公平锁:
image.png公平锁:
image.png

ReentrantLock.unlock

释放锁的逻辑相对获取锁要简短很多。unlock方法就是调用AQS的release方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public final boolean release(int arg) {
// 尝试释放锁 Sync实现tryRelease,见如上Sync类解析
// 即state减arg,如果state减为0,则释放锁(owner线程置null)
// 另外,这里AQS的state表示持有锁的次数
if (tryRelease(arg)) {
// 释放成功
Node h = head;
// 头结点不为空,说明当前线程入过链表,所以并且头结点的状态应该是SIGNAL
// shouldParkAfterFailedAcquire这里修改node的状态
if (h != null && h.waitStatus != 0)
// 唤醒下一节点线程,还记得获取锁的时候,线程节点入链表之后阻塞的位置是哪里吗?
// 在AQS里的acquireQueued这个方法
unparkSuccessor(h);
return true;
}
return false;
}

ReentrantLock.newCondition

接下来看一下newCondition这个api的内部流程是什么样的。
ReentrantLock.newCondition 方法内部调用Sync类实现的 newCondition 方法,而这个方法是实例化一个AQS的 ConditionObject 类对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
final ConditionObject newCondition() {
return new ConditionObject();
}

public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 添加一个condition节点,见下面该方法解析
Node node = addConditionWaiter();
// 释放所有获取次数记录,置state为0,见下面该方法解析
// 注意这里,调用condition.await时,是在lock块内
int savedState = fullyRelease(node);
int interruptMode = 0;
// isOnSyncQueue判断是否在Sync队列里,见下面该方法解析
while (!isOnSyncQueue(node)) {
// 阻塞在这里,直到signal唤醒
// signal唤醒后,isOnSyncQueue返回true,调出循环
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// acquireQueued见上面Sync对应方法解析,方法里的这里的queue指Sync的队列
// 这个队列会在signal唤醒时构造
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}

// 删除一些取消的节点
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
// 上一节点
trail = t;
t = next;
}
}

public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}

public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}

// ... 省略其他代码

}

// from AQS
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
// 置state为0
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}

public final boolean release(int arg) {
// 尝试释放,置state -= arg,结合上下文,也就是state=0
if (tryRelease(arg)) {
Node h = head;
// 这里h为null,waitState为CONDITION
// 因为condition的api使用要求都在lock块内,所以h必定为null
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

// node是否在Sync等待队列里(也就是调用了condition.signal,将waitStatus置为SIGNAL)
// 根据waitStatus以及队列元素比较
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}

await方法的比较复杂,需要仔细梳理下,并且结合signal的流程才能清晰起来。
以下为signal的流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class ConditionObject implements Condition, java.io.Serializable {
public final void signal() {
// 如果本线程不是持有锁线程,则throw
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 还记得吗,firstWaiter为condition队列里的头结点
Node first = firstWaiter;
if (first != null)
// 唤醒,结合上边await的流程看
// 就是改变waitStatus,以及入Sync队列
doSignal(first);
}

private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// transferForSIgnal改变first状态,并入Sync队列
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
}

// from AQS
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
// 入队,Sync的队列
Node p = enq(node);
int ws = p.waitStatus;
// 将waitStatus置为SIGNAL
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

signal流程大致就是这样,signalAll其实就是从Condition头结点firstWaiter开始依次调用transferForSignal方法,大同小异。

-------------The End-------------