JDK源码阅读之LinkedTransferQueue

前言

An unbounded {*@link *java.util.concurrent.TransferQueue} based on linked nodes.This queue orders elements FIFO (first-in-first-out) with respect to any given producer.
LinkedTransferQueue基于链表实现于TransferQueue接口,是一个遵循FIFO的队列。TransferQueue具有阻塞队列的行为(继承BlockingQueue接口),并且producer也可以阻塞等待consumer从队列中取出该element消费。

源码分析自 JDK 1.8.0_171

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public interface TransferQueue<E> extends BlockingQueue<E> {
/*
* 如果有consumer正在队列阻塞等待获取数据,那么tryTransfer成功,否则失败。
* 该方法并不会往队列里put元素,而是直接交给等待着的consumer。
*/
boolean tryTransfer(E e);

/*
* producer阻塞等待直到该element被consumer消费
*/
void transfer(E e) throws InterruptedException;

/*
* 与第一个方法的区别为,会等待上unit.toMillis(timeout)段时间
*/
boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException;

/*
* 队列中是否有等待的consumer
*/
boolean hasWaitingConsumer();

/*
* 有多少个等待着的consumer
*/
int getWaitingConsumerCount();
}

LinkedTransferQueue除了有BlockingQueue的行为外,还具有以上行为。

概述

LinkedTransferQueue实现了TransferQueue接口,而TransferQueue又继承自BlockingQueue。
众所周知,阻塞队列是生产者往队列放入元素,消费者往队列取出元素进行消费,如果队列无空闲空间/无可用元素则生产者/消费者会相应阻塞。

一般的阻塞队列,生产者和消费者是互不关心的,即两者完全解耦。正常情况下一般是不会互相阻塞(队列有足够的空间,生产者不会因为队列无空闲空间而阻塞;队列有足够的元素,消费者不会因为队列无元素可取而阻塞)。生产者将元素入队就可以离开了,不关心谁取走了它的元素;消费者将元素取出就可以离开了,不关心谁放的这个元素。但是TransferQueue不是,它的生产者和消费者允许互相阻塞。

LinkedTransferQueue的算法采用的是一种Dual Queue的变种,Dual Queues with Slack。
Dual Queue不仅能存放数据节点,还能存放请求节点。一个数据节点想要入队,这个时候队列里正好有请求节点,此时”匹配”,并移除该请求节点。Blocking Dual Queue入队一个未匹配的请求节点时,会阻塞直到匹配节点入队。Dual Synchronous Queue在Blocking Dual Queue基础上,还会在一个未匹配的数据节点入队时也会阻塞。而Dual Transfer Queue支持以上任何一种模式,具体是哪一种取决于调用者,也就是有不同的api支持。

FIFO Dual Queue使用的是名叫M&S的一种lock-free算法(http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf)的变种。

M&S算法维护一个”head”指针和一个”tail”指针。head指向一个匹配节点M,M指向第一个未匹配节点(如果未空则指向null),tail指向未匹配节点。如下
image.png
在Dual Queue内部,由链表组成,对于每个链表节点维护一个match status。在LinkedTransferQueue中为item。对于一个data节点,匹配过程体现在item上就是:non-null -> null,反过来,对于一个request节点,匹配过程就是:null->non null。Dual Queue的这个match status一经改变,就不会再变更它了。也就是说这个item cas操作过后就不会动了。

基于此,现在的Dual Queue的组成可能就是在一个分割线前全是M节点,分割线后全是U节点。假设我们不关心时间和空间,入队和出队的过程就是遍历这个链表找到第一个U和最后一个U,这样我们就不会有cas竞争的问题了。

基于以上的分析,我们就可以有一个这样的tradeoff来减少head tail的cas竞争。如下:
image.png
将head和第一个未匹配的节点U之间的距离叫做”slack”,根据一些经验和实验数据发现,slack在1-3之间时工作的更好,所以,LinkedTransferQueue将其定为2。

源码阅读

关于BlockingQueue的相关行为不做过多解读,主要看继承自TransferQueue的行为。

内部变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
	/**
* head of the queue; null until first enqueue
*/
transient volatile Node head;

/**
* tail of the queue; null until first append
*/
private transient volatile Node tail;

/*
* Possible values for "how" argument in xfer method.
*/
private static final int NOW = 0; // for untimed poll, tryTransfer
private static final int ASYNC = 1; // for offer, put, add
private static final int SYNC = 2; // for transfer, take
private static final int TIMED = 3; // for timed poll, tryTransfer

// Node 类结构
static final class Node {
final boolean isData; // false if this is a request node
// 作者解释说item用Object,没用泛型的原因是利于垃圾回收
// quoto:Uses Object, not E, for items to allow forgetting them after use.
// 没查到为啥利于垃圾回收
volatile Object item; // initially non-null if isData; CASed to match
volatile Node next;
volatile Thread waiter; // null until waiting

/**
* Constructs a new node. Uses relaxed write because item can
* only be seen after publication via casNext.
*/
Node(Object item, boolean isData) {
// 之所以可以用putObject,是因为item为volatile
// 其次Node构造是在pred.casNext之后(casNext之后,构造node才算能够可达)
// 所以可见性是能够保证的
UNSAFE.putObject(this, itemOffset, item); // relaxed write
this.isData = isData;
}

/**
* Links node to itself to avoid garbage retention. Called
* only after CASing head field, so uses relaxed write.
*/
final void forgetNext() {
UNSAFE.putObject(this, nextOffset, this);
}

/**
* Sets item to self and waiter to null, to avoid garbage
* retention after matching or cancelling. Uses relaxed writes
* because order is already constrained in the only calling
* contexts: item is forgotten only after volatile/atomic
* mechanics that extract items. Similarly, clearing waiter
* follows either CAS or return from park (if ever parked;
* else we don't care).
*/
final void forgetContents() {
UNSAFE.putObject(this, itemOffset, this);
UNSAFE.putObject(this, waiterOffset, null);
}

/**
* Returns true if this node has been matched, including the
* case of artificial matches due to cancellation.
*/
// 判断是否该Node是否匹配过
final boolean isMatched() {
Object x = item;
return (x == this) // manual cancel
// matched
|| ((x == null) == isData);
}
}

由以上代码能看到,内部维护的变量相对来说还是比较少的,主要是链表的head 和tail。最下边的四个静态场景是作为xfer方法how参数,以区分不通方法用于不同场景的调用。

  • NOW 用于poll()、tryTransfer(E e)方法调用
  • ASYNC 用于offset()、put()、add()方法调用
  • SYNC 用于transfer()、take()方法调用
  • TIMED 用于poll(long timeout,TimeUnit unit)、tryTransfer(E e,long timeout,TimeUnit unit)方法调用

行为分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/*
* 通过xfer方法实现,同步不同的参数来区分不同的场景。
*/

/**
* producer,要是有等待的consumer成功,否则失败
*/
public boolean tryTransfer(E e) {
return xfer(e, true, NOW, 0) == null;
}

/**
* producer,失败会interrupt,并抛出异常
*/
public void transfer(E e) throws InterruptedException {
if (xfer(e, true, SYNC, 0) != null) {
Thread.interrupted(); // failure possible only due to interrupt
throw new InterruptedException();
}
}

/**
* producer,在timeout时间内,成功,否则
*/
public boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
return true;
if (!Thread.interrupted())
return false;
throw new InterruptedException();
}

/**
* consumer,在unit.toNanos(timeout)时间内成功,否则会抛出异常
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E e = xfer(null, false, TIMED, unit.toNanos(timeout));
if (e != null || !Thread.interrupted())
return e;
throw new InterruptedException();
}

/**
* consumer,队列有即返回,否则null,不会阻塞等待
*/
public E poll() {
return xfer(null, false, NOW, 0);
}

/**
* consumer,阻塞等待直到队列有元素(producer)可取
*/
public E take() throws InterruptedException {
E e = xfer(null, false, SYNC, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}

/**
* 队列中是否有等待的consuemr
*/
public boolean hasWaitingConsumer() {
return firstOfMode(false) != null;
}

/**
* 队列中等待的consumer总数
*/
public int getWaitingConsumerCount() {
return countOfMode(false);
}

以上是主要我们要分析的行为,可以看见,出来 hasWaitingConsumergetWaitingConsumerCount ,其他对队列操作元素的方法,都是通过xfer方法实现的。
先看 hasWaitingConsumergetWaitingConsumerCount

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
   public boolean hasWaitingConsumer() {
return firstOfMode(false) != null;
}

/**
* Returns the first unmatched node of the given mode, or null if
* none. Used by methods isEmpty, hasWaitingConsumer.
*/
private Node firstOfMode(boolean isData) {
for (Node p = head; p != null; p = succ(p)) {
// p.isMatched:
// return (item == this) || ((x == null) == isData);
// item == this 表示手动取消
// ((x == null) == isData) 表示已匹配
if (!p.isMatched())
// 注意,队列里,未匹配的所有节点一定都是同一种类型的节点
// p是队列中第一个未匹配的节点
// 无非分两种情况分析:isData为true 和false
// 1.true 如果p.isData == isData ,那么p为producer,那么返回当前节点,否则返回null。
// 2.false 如果p.isData == isData,那么p为consumer....
return (p.isData == isData) ? p : null;
}
return null;
}

/**
* Returns the successor of p, or the head node if p.next has been
* linked to self, which will only be true if traversing with a
* stale pointer that is now off the list.
*/
final Node succ(Node p) {
Node next = p.next;
return (p == next) ? head : next;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public int getWaitingConsumerCount() {
return countOfMode(false);
}

/**
* Traverses and counts unmatched nodes of the given mode.
* Used by methods size and getWaitingConsumerCount.
*/
private int countOfMode(boolean data) {
int count = 0;
for (Node p = head; p != null; ) {
// p.isMatched:
// return (item == this) || ((x == null) == isData);
// item == this 表示手动取消
// ((x == null) == isData) 表示已匹配
if (!p.isMatched()) {
// 如果不等,那么后面的节点就不能有p.isData类型的节点了。
if (p.isData != data)
return 0;
if (++count == Integer.MAX_VALUE) // saturated
break;
}
Node n = p.next;
if (n != p)
p = n;
else {
count = 0;
p = head;
}
}
return count;
}

以上两个方法还是挺清晰的,主要是要注意一个地方,就是第一个未匹配节点的类型,决定了后边未匹配节点的类型。
接下来看看 xfer 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
private E xfer(E e, boolean haveData, int how, long nanos) {
// 不允许,说自己有货,口袋确实空的,这谁都顶不住的
if (haveData && (e == null))
throw new NullPointerException();
Node s = null; // the node to append, if needed

retry:
for (; ; ) { // restart on append race

for (Node h = head, p = h; p != null; ) { // find & match first node
boolean isData = p.isData;
Object item = p.item;
// 未匹配过
// 如果p.item == p || (p.item == null) == p.isData 则已匹配 p.item == p cancel的情况
// match consumer:p.item -> itself producer:p.item->null
if (item != p && (item != null) == isData) { // unmatched
// 同类型,则break,在下一个循环里进行入队操作
if (isData == haveData) // can't match
break;
// match 过程:
// consumer:p.item -> data; producer:p.item -> null
// 试图匹配,item->e
if (p.casItem(item, e)) { // match
for (Node q = p; q != h; ) {
Node n = q.next; // update by 2 unless singleton
// 移动head,如果q.next == null 则head->q,否则head->n
if (head == h && casHead(h, n == null ? q : n)) {
// 更新head成功,将其next指针指向自己,以作清理
h.forgetNext();
break;
} // advance and retry
// 失败:1.head改变;2.cas竞争失败。总之就是有其他线程已经移动了head

if ((h = head) == null ||
// 即 slack >= 2,需要重试,以调整slack的值在[0,2)范围内
(q = h.next) == null || !q.isMatched())
break; // unless slack < 2
}
LockSupport.unpark(p.waiter);
return LinkedTransferQueue.<E>cast(item);
}
// 失败了,已经有其他线程捷足先登,重试
}
// 前边都是匹配过的节点,接着往后找
Node n = p.next;
p = (p != n) ? n : (h = head); // Use head if p offlist
}

// 当how == NOW时,匹配失败立即返回,不会入队
if (how != NOW) { // No matches available
if (s == null)
s = new Node(e, haveData);
Node pred = tryAppend(s, haveData);
// 当pred == null时,表示当前队列的"分割线"后节点的mode,已经时过境迁了。
if (pred == null)
continue retry; // lost race vs opposite mode
// how != ASYNC 需要阻塞等待
if (how != ASYNC)
return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
return e; // not waiting
}
}

private Node tryAppend(Node s, boolean haveData) {
for (Node t = tail, p = t; ; ) { // move p to last node and append
Node n, u; // temps for reads of next & tail
if (p == null && (p = head) == null) { // 队列为空
// head并不需要dummy node
if (casHead(null, s))
return s; // initialize
} else if (p.cannotPrecede(haveData))
// haveData mode的节点s并不能成为p的后继
return null; // lost race vs opposite mode
// tail并不是"tail",继续向后
else if ((n = p.next) != null) // not last; keep traversing
p = p != t && t != (u = tail) ? (t = u) : // stale tail
(p != n) ? n : null; // restart if off list
// cas失败,已经有其他线程已经先行cas成功
else if (!p.casNext(null, s))
p = p.next; // re-read on CAS failure
else {
// tail后还有其他节点
if (p != t) { // update if slack now >= 2
while ((tail != t || !casTail(t, s)) && // tail已经改变或者移动tail失败
(t = tail) != null &&
// (s = t.next) != null && (s = s.next) != null means slack >= 2
(s = t.next) != null && // advance and retry
(s = s.next) != null && s != t);
}
return p;
}
}
}

private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
// how = timed == false ? SYNC : TIMED
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
int spins = -1; // initialized after first item and cancel checks
java.util.concurrent.ThreadLocalRandom randomYields = null; // bound if needed

for (; ; ) {
Object item = s.item;
if (item != e) { // matched
// assert item != s;
s.forgetContents(); // avoid garbage
// 匹配完成
return LinkedTransferQueue.<E>cast(item);
}
if ((w.isInterrupted() || (timed && nanos <= 0)) &&
s.casItem(e, s)) { // cancel
// 这里 mark sweep
unsplice(pred, s);
return e;
}

// 自旋
if (spins < 0) { // establish spins at/near front
if ((spins = spinsFor(pred, s.isData)) > 0)
randomYields = ThreadLocalRandom.current();
} else if (spins > 0) { // spin
--spins;
if (randomYields.nextInt(CHAINED_SPINS) == 0)
Thread.yield(); // occasionally yield
} else if (s.waiter == null) {
s.waiter = w; // request unpark then recheck
// 阻塞等待,等待唤醒
} else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos > 0L)
LockSupport.parkNanos(this, nanos);
} else {
LockSupport.park(this);
}
}
}

结合xfer注释,对照着前边的几个操作队列元素的方法,多理解几遍,效果更佳。

总结

  • 内部采用了Dual Stack With Slack的结构,slack根据实践数据采用了slack=2
  • 操作队列的元素的各个方法通过xfer方法实现,具体是根据不同的how参数来区分不同场景
  • 对于一个data节点,匹配过程体现在item上就是:non-null -> null,反过来,对于一个request节点,匹配过程就是:null->non null。
  • 一个可阻塞的节点入队时,如果队列中有等待可匹配的节点,则匹配并移除该节点,否则,入队自旋等待或阻塞等待直至匹配成功或唤醒。
-------------The End-------------