JDK源码阅读之SynchronousQueue

写在前面

根据源码总的描述,SynchronousQueue是一个阻塞队列,所有的入队出队操作都是阻塞的。根据作者的描述,这个工具的定位是在线程间同步对象,以达到在线程间传递一些信息、事件、或者任务的目的(They are well suited for handoff designs, in which an object running in one thread must sync up with an object running in another thread in order to hand it some information, event, or task.)。
关于这个工具的描述,作者只介绍了这么多。SynchronousQueue是实现了BlockingQueue,但是有与一般意义上的queue(比如ArrayBlockingQueue)不一样,它内部并没有存放元素的地方。入队阻塞直到出队成功,反之亦然,在线程间同步传递对象,以在线程间同步信息。

源码分析自 JDK 1.8.0_171

实现原理

SynchronousQueue内部并没有容纳元素的数据结构,也就是说SynchronousQueue并不存储元素。实现采用了一种无锁算法,扩展的Dual stack and Dual Queue算法,算法的大概实现是采用链表,用头结点(head)和尾结点(tail)记录队列状态,而队列可以根据头尾以及当前节点状态,在不需要锁的情况的执行入出队操作。此外,竞争时,支持公平竞争和非公平竞争。公平竞争实现采用先进先出队列( FIFO queue);而非公平竞争实现采用先进后出栈(FILO stack)。

内部结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
 abstract static class Transferer<E> {
abstract E transfer(E e, boolean timed, long nanos);
}

/** The number of CPUs, for spin control */
static final int NCPUS = Runtime.getRuntime().availableProcessors();

static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32

static final int maxUntimedSpins = maxTimedSpins * 16;

/**
* The number of nanoseconds for which it is faster to spin
* rather than to use timed park. A rough estimate suffices.
*/
static final long spinForTimeoutThreshold = 1000L;

static final class TransferStack<E> extends Transferer<E> {
/** Node represents an unfulfilled consumer */
// 表示consumer,获取元素的请求
static final int REQUEST = 0;
/** Node represents an unfulfilled producer */
// 表示producer,插入元素的请求
static final int DATA = 1;
/** Node is fulfilling another unfulfilled DATA or REQUEST */
// 这个状态就是匹配上了队列中的某一个节点
static final int FULFILLING = 2;

/** Node class for TransferStacks. */
static final class SNode {
volatile SNode next; // next node in stack
// 匹配上了某个节点
volatile SNode match; // the node matched to this
volatile Thread waiter; // to control park/unpark
Object item; // data; or null for REQUESTs
int mode;

//...省略其他代码
}

/**
* Puts or takes an item.
*
E transfer(E e, boolean timed, long nanos){
// ...
}

//... 省略其他代码
}

static final class TransferQueue<E> extends Transferer<E>{
static final class QNode {
volatile QNode next; // next node in queue
// 注意区别SNode的item是volatile的
volatile Object item; // CAS'ed to or from null
volatile Thread waiter; // to control park/unpark
final boolean isData;

//...
}

/** Head of queue */
transient volatile QNode head;
/** Tail of queue */
transient volatile QNode tail;
/**
* Reference to a cancelled node that might not yet have been
* unlinked from queue because it was the last inserted node
* when it was cancelled.
*/
transient volatile QNode cleanMe;

/**
* Puts or takes an item.
*
E transfer(E e, boolean timed, long nanos){
//...
}
}

没有用于存储队列元素内部变量,并且有表示自旋时间的静态变量。内部有个Transferer抽象类,抽象类只有一个transfer方法。分别有TransferStack和TransferQueue实现了这个类,表示非公平和公平两种模式。

transfer

主要逻辑都在这个transfer方法中,包括出入队也都是通过这个方法实现的。

offer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Inserts the specified element into this queue, waiting if necessary
* up to the specified wait time for another thread to receive it.
*
* 插入元素,阻塞给定时间,直到另一线程接收到插入元素
*/
public boolean offer(E e, long timeout, java.util.concurrent.TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
// 方法都是一个方法,区别在于第一个参数是否为null
if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
return true;
if (!Thread.interrupted())
return false;
throw new InterruptedException();
}

poll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
   /**
* Retrieves and removes the head of this queue, waiting
* if necessary up to the specified wait time, for another thread
* to insert it.
*
* 获取和移除队列头部元素,阻塞给定时间,直到另一线程往队列插入元素
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 方法都是一个方法,区别在于第一个参数是否为null
E e = transferer.transfer(null, true, unit.toNanos(timeout));
if (e != null || !Thread.interrupted())
return e;
throw new InterruptedException();
}

TransferStack.transfer

TransferStack是非公平竞争的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
      E transfer(E e, boolean timed, long nanos) {
/*
* Basic algorithm is to loop trying one of three actions:
*
* 1. If apparently empty or already containing nodes of same
* mode, try to push node on stack and wait for a match,
* returning it, or null if cancelled.
*
* 2. If apparently containing node of complementary mode,
* try to push a fulfilling node on to stack, match
* with corresponding waiting node, pop both from
* stack, and return matched item. The matching or
* unlinking might not actually be necessary because of
* other threads performing action 3:
*
* 3. If top of stack already holds another fulfilling node,
* help it out by doing its match and/or pop
* operations, and then continue. The code for helping
* is essentially the same as for fulfilling, except
* that it doesn't return the item.
*/

SNode s = null; // constructed/reused as needed
// 根据第一个参数决定具体是哪一种请求(poll or offer)
int mode = (e == null) ? REQUEST : DATA;

for (;;) {
SNode h = head;
// 队列目前为空 或者 队列中全是同一种类型的节点
if (h == null || h.mode == mode) { // empty or same-mode
if (timed && nanos <= 0) { // can't wait
// 这种直接返回null
if (h != null && h.isCancelled())
casHead(h, h.next); // pop cancelled node
else
return null;
// 构造一个节点入队
// s.next=h
} else if (casHead(h, s = snode(s, e, h, mode))) {
// 自旋或阻塞等待直到fulfill匹配
SNode m = awaitFulfill(s, timed, nanos);
// 对于TransferStack,取消(超时)的节点,会赋值match为this
// 当match等于自身的时候就是该clean的节点,说明等待足够长时间了
if (m == s) { // wait was cancelled
clean(s);
return null;
}
// fulfill,匹配
// h -> fulfill node ; s -> match node;
if ((h = head) != null && h.next == s)
// s节点出队(或者该说出栈)
casHead(h, s.next); // help s's fulfiller
// 如果是consumer的话,那么返回的值该是fulfill节点m的值,否则就是s节点的值
return (E) ((mode == REQUEST) ? m.item : s.item);
}
// 那么这里就是fulfill操作了
// h节点并不是fulfill
} else if (!isFulfilling(h.mode)) { // try to fulfill
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
// 入队一个fulfill节点
// s.next = h; h也应该是s的匹配节点
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
// for循环 以防刚好待匹配节点因为时间到了失效了
for (;;) { // loop until matched or waiters disappear
SNode m = s.next; // m is s's match
// 队列中已经没有等待节点
if (m == null) { // all waiters are gone
// 这个时候就不该插fulfill节点了,所以pop刚插的fulfill
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
SNode mn = m.next;
// 即 m.match = s; unpark(m.thread);
if (m.tryMatch(s)) {
casHead(s, mn); // pop both s and m
// 同上
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // lost match
// 匹配节点m已经cancel失效,则移除m节点
s.casNext(m, mn); // help unlink
}
}
// h节点是fulfill节点,总会有这种情况的出现
} else { // help a fulfiller
SNode m = h.next; // m is h's match
if (m == null) // waiter is gone
casHead(h, null); // pop fulfilling node
else {
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink
}
}
}
}

// 构造一个SNode节点
// next一般是h
static SNode snode(SNode s, Object e, SNode next, int mode) {
if (s == null) s = new SNode(e);
s.mode = mode;
s.next = next;
return s;
}

// Spins/blocks until node s is matched by a fulfill operation.
// 自旋,直到fulfill操作匹配节点
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
// 如果s在队列头,或者队列中有一有效的fulfill节点,那么将采用自旋
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())
// SNode.tryCancel:
// s.match = s
s.tryCancel();
// 节点s的匹配节点
SNode m = s.match;
if (m != null)
return m;
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel();
continue;
}
}
// 自旋
if (spins > 0)
spins = shouldSpin(s) ? (spins-1) : 0;
else if (s.waiter == null)
s.waiter = w; // establish waiter so can park next iter
// 阻塞
else if (!timed)
LockSupport.park(this);
// 给的时间大于阈值才会进入阻塞状态
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}

/**
* Returns true if node s is at head or there is an active
* fulfiller.
*/
boolean shouldSpin(SNode s) {
SNode h = head;
return (h == s || h == null || isFulfilling(h.mode));
}

TransferStack总是先进后出,并不保证公平,甚至在一些极端情况会导致一部分请求总得不到调度。
详情:

  • 请求到达,如果栈为空,则入栈相对应状态(consumer->request、producer->data)节点
  • 如果栈不为空,并且目前节点状态与栈顶结点状态不一致(即并不是都为consumer or producer),那么入栈一fulfill节点
  • 匹配过程,匹配栈顶结点为fulfill以及后继节点为头结点的match,成功则出栈两节点
  • 最后根据节点状态返回对应的需要同步的数据对象

    TransferQueue.transfer

    TransferQueue是公平竞争的实现。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
          E transfer(E e, boolean timed, long nanos) {
    /* Basic algorithm is to loop trying to take either of
    * two actions:
    *
    * 1. If queue apparently empty or holding same-mode nodes,
    * try to add node to queue of waiters, wait to be
    * fulfilled (or cancelled) and return matching item.
    *
    * 2. If queue apparently contains waiting items, and this
    * call is of complementary mode, try to fulfill by CAS'ing
    * item field of waiting node and dequeuing it, and then
    * returning matching item.
    *
    * In each case, along the way, check for and try to help
    * advance head and tail on behalf of other stalled/slow
    * threads.
    *
    * The loop starts off with a null check guarding against
    * seeing uninitialized head or tail values. This never
    * happens in current SynchronousQueue, but could if
    * callers held non-volatile/final ref to the
    * transferer. The check is here anyway because it places
    * null checks at top of loop, which is usually faster
    * than having them implicitly interspersed.
    */

    QNode s = null; // constructed/reused as needed
    boolean isData = (e != null);

    for (;;) {
    QNode t = tail;
    QNode h = head;
    // 初始化还未完成
    if (t == null || h == null) // saw uninitialized value
    continue; // spin

    // 队列为空,或当前节点和尾为相同类型节点
    if (h == t || t.isData == isData) { // empty or same-mode
    QNode tn = t.next;
    // 尾巴变了
    if (t != tail) // inconsistent read
    continue;
    // 尾巴变了,但未更新,help一下
    if (tn != null) { // lagging tail
    advanceTail(t, tn);
    continue;
    }
    if (timed && nanos <= 0) // can't wait
    return null;
    if (s == null)
    // 构造节点入队
    s = new QNode(e, isData);
    if (!t.casNext(null, s)) // failed to link in
    continue;

    // 入队
    advanceTail(t, s); // swing tail and wait
    // 阻塞等待fulfill
    Object x = awaitFulfill(s, e, timed, nanos);
    // QNode 的item:
    // 等于this->cancel;null->consumer;not null -> producer
    if (x == s) { // wait was cancelled
    clean(t, s);
    return null;
    }

    // s脱离队列,已不在队列中,这个时候需要重置一下队列
    if (!s.isOffList()) { // not already unlinked
    advanceHead(t, s); // unlink if head
    if (x != null) // and forget fields
    s.item = s;
    s.waiter = null;
    }
    // 结合下面的else看,在else中会更改s.item
    // x != null 表示当前为consumer
    // x == null 表示当前为producer
    return (x != null) ? (E)x : e;

    } else { // complementary-mode
    // m为fulfill节点
    QNode m = h.next; // node to fulfill
    // 队列改变,或者fulfill为空则需要retry
    if (t != tail || m == null || h != head)
    continue; // inconsistent read

    Object x = m.item;
    // m的item已经操作过:m.casItem(x,e),即already fulfilled
    if (isData == (x != null) || // m already fulfilled
    // m节点失效
    x == m || // m cancelled
    // 相当于TransferStack的匹配过程,将m.item=e
    // cas失败,表示others have done,则需要重置头节点retry
    !m.casItem(x, e)) { // lost CAS
    advanceHead(h, m); // dequeue and retry
    continue;
    }

    // 成功,h出队,head=m
    advanceHead(h, m); // successfully fulfilled
    // 唤醒m节点阻塞线程
    LockSupport.unpark(m.waiter);
    return (x != null) ? (E)x : e;
    }
    }
    }

    /**
    * Spins/blocks until node s is fulfilled.
    */
    Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
    /* Same idea as TransferStack.awaitFulfill */
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    int spins = ((head.next == s) ?
    (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
    if (w.isInterrupted())
    // QNode.tryCancel:
    // s.item = s
    s.tryCancel(e);
    Object x = s.item;
    if (x != e)
    return x;
    if (timed) {
    nanos = deadline - System.nanoTime();
    if (nanos <= 0L) {
    s.tryCancel(e);
    continue;
    }
    }
    // 自旋
    if (spins > 0)
    --spins;
    else if (s.waiter == null)
    s.waiter = w;
    // 阻塞
    else if (!timed)
    LockSupport.park(this);
    // 同样,需要大于阻塞阈值才会真正阻塞,否则就自旋
    else if (nanos > spinForTimeoutThreshold)
    LockSupport.parkNanos(this, nanos);
    }
    }

TransferQueue总是FIFO,保证了公平。
详情:

  • 请求到达,如果队列为空,或者尾节点的类型和当前节点相同(同是consumer或producer),则在队列尾部入队当前节点等待,直到被唤醒
  • 否则头结点出队,唤醒头结点对应线程,返回对应值
-------------The End-------------