写在前面
根据源码总的描述,SynchronousQueue是一个阻塞队列,所有的入队出队操作都是阻塞的。根据作者的描述,这个工具的定位是在线程间同步对象,以达到在线程间传递一些信息、事件、或者任务的目的(They are well suited for handoff designs, in which an object running in one thread must sync up with an object running in another thread in order to hand it some information, event, or task.)。
关于这个工具的描述,作者只介绍了这么多。SynchronousQueue是实现了BlockingQueue,但是有与一般意义上的queue(比如ArrayBlockingQueue)不一样,它内部并没有存放元素的地方。入队阻塞直到出队成功,反之亦然,在线程间同步传递对象,以在线程间同步信息。
实现原理
SynchronousQueue内部并没有容纳元素的数据结构,也就是说SynchronousQueue并不存储元素。实现采用了一种无锁算法,扩展的Dual stack and Dual Queue算法,算法的大概实现是采用链表,用头结点(head)和尾结点(tail)记录队列状态,而队列可以根据头尾以及当前节点状态,在不需要锁的情况的执行入出队操作。此外,竞争时,支持公平竞争和非公平竞争。公平竞争实现采用先进先出队列( FIFO queue);而非公平竞争实现采用先进后出栈(FILO stack)。
内部结构
1 | abstract static class Transferer<E> { |
没有用于存储队列元素内部变量,并且有表示自旋时间的静态变量。内部有个Transferer抽象类,抽象类只有一个transfer方法。分别有TransferStack和TransferQueue实现了这个类,表示非公平和公平两种模式。
transfer
主要逻辑都在这个transfer方法中,包括出入队也都是通过这个方法实现的。
offer
1 | /** |
poll
1 | /** |
TransferStack.transfer
TransferStack是非公平竞争的实现。
1 | E transfer(E e, boolean timed, long nanos) { |
TransferStack总是先进后出,并不保证公平,甚至在一些极端情况会导致一部分请求总得不到调度。
详情:
- 请求到达,如果栈为空,则入栈相对应状态(consumer->request、producer->data)节点
- 如果栈不为空,并且目前节点状态与栈顶结点状态不一致(即并不是都为consumer or producer),那么入栈一fulfill节点
- 匹配过程,匹配栈顶结点为fulfill以及后继节点为头结点的match,成功则出栈两节点
- 最后根据节点状态返回对应的需要同步的数据对象
TransferQueue.transfer
TransferQueue是公平竞争的实现。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143E transfer(E e, boolean timed, long nanos) {
/* Basic algorithm is to loop trying to take either of
* two actions:
*
* 1. If queue apparently empty or holding same-mode nodes,
* try to add node to queue of waiters, wait to be
* fulfilled (or cancelled) and return matching item.
*
* 2. If queue apparently contains waiting items, and this
* call is of complementary mode, try to fulfill by CAS'ing
* item field of waiting node and dequeuing it, and then
* returning matching item.
*
* In each case, along the way, check for and try to help
* advance head and tail on behalf of other stalled/slow
* threads.
*
* The loop starts off with a null check guarding against
* seeing uninitialized head or tail values. This never
* happens in current SynchronousQueue, but could if
* callers held non-volatile/final ref to the
* transferer. The check is here anyway because it places
* null checks at top of loop, which is usually faster
* than having them implicitly interspersed.
*/
QNode s = null; // constructed/reused as needed
boolean isData = (e != null);
for (;;) {
QNode t = tail;
QNode h = head;
// 初始化还未完成
if (t == null || h == null) // saw uninitialized value
continue; // spin
// 队列为空,或当前节点和尾为相同类型节点
if (h == t || t.isData == isData) { // empty or same-mode
QNode tn = t.next;
// 尾巴变了
if (t != tail) // inconsistent read
continue;
// 尾巴变了,但未更新,help一下
if (tn != null) { // lagging tail
advanceTail(t, tn);
continue;
}
if (timed && nanos <= 0) // can't wait
return null;
if (s == null)
// 构造节点入队
s = new QNode(e, isData);
if (!t.casNext(null, s)) // failed to link in
continue;
// 入队
advanceTail(t, s); // swing tail and wait
// 阻塞等待fulfill
Object x = awaitFulfill(s, e, timed, nanos);
// QNode 的item:
// 等于this->cancel;null->consumer;not null -> producer
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}
// s脱离队列,已不在队列中,这个时候需要重置一下队列
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
// 结合下面的else看,在else中会更改s.item
// x != null 表示当前为consumer
// x == null 表示当前为producer
return (x != null) ? (E)x : e;
} else { // complementary-mode
// m为fulfill节点
QNode m = h.next; // node to fulfill
// 队列改变,或者fulfill为空则需要retry
if (t != tail || m == null || h != head)
continue; // inconsistent read
Object x = m.item;
// m的item已经操作过:m.casItem(x,e),即already fulfilled
if (isData == (x != null) || // m already fulfilled
// m节点失效
x == m || // m cancelled
// 相当于TransferStack的匹配过程,将m.item=e
// cas失败,表示others have done,则需要重置头节点retry
!m.casItem(x, e)) { // lost CAS
advanceHead(h, m); // dequeue and retry
continue;
}
// 成功,h出队,head=m
advanceHead(h, m); // successfully fulfilled
// 唤醒m节点阻塞线程
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
}
}
/**
* Spins/blocks until node s is fulfilled.
*/
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
/* Same idea as TransferStack.awaitFulfill */
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())
// QNode.tryCancel:
// s.item = s
s.tryCancel(e);
Object x = s.item;
if (x != e)
return x;
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel(e);
continue;
}
}
// 自旋
if (spins > 0)
--spins;
else if (s.waiter == null)
s.waiter = w;
// 阻塞
else if (!timed)
LockSupport.park(this);
// 同样,需要大于阻塞阈值才会真正阻塞,否则就自旋
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
TransferQueue总是FIFO,保证了公平。
详情:
- 请求到达,如果队列为空,或者尾节点的类型和当前节点相同(同是consumer或producer),则在队列尾部入队当前节点等待,直到被唤醒
- 否则头结点出队,唤醒头结点对应线程,返回对应值