QMQ源码分析之Actor

前言

QMQ有关actor的一篇文章阐述了actor的应用场景。即client消费消息的请求会先进入一个RequestQueue,在client消费消息时,往往存在多个主题、多个消费组共享一个RequestQueue消费消息。在这个Queue中,存在不同主题的有不同消费组数量,以及不同消费组有不同consumer数量,那么就会存在抢占资源的情况。举个文章中的例子,一个主题下有两个消费组A和B,A有100个consumer,B有200个consumer,那么在RequestQueue中来自B的请求可能会多于A,这个时候就存在消费unfair的情况,所以需要隔离不同主题不同消费组以保证fair。除此之外,当consumer消费能力不足,造成broker消息堆积,这个时候就会导致consumer所在消费组总在消费”老消息”,影响全局整体的一个消费能力。因为”老消息”不会存在page cache中,这个时候很可能就会从磁盘load,那么表现是RequestQueue中来自消费”老消息”消费组的请求处理时间过长,影响到其他主题消费组的消费,因此这个时候也需要做策略来避免不同消费组的相互影响。所以QMQ就有了actor机制,以消除各个消费组之间因消费能力不同、consumer数量不同而造成的相互影响各自的消费能力。

PullMessageWorker

要了解QMQ的actor模式是如何起作用的,就要先来看看Broker是如何处理消息拉取请求的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class PullMessageWorker implements ActorSystem.Processor<PullMessageProcessor.PullEntry> {
// 消息存储层
private final MessageStoreWrapper store;
// actor
private final ActorSystem actorSystem;

private final ConcurrentMap<String, ConcurrentMap<String, Object>> subscribers;

PullMessageWorker(MessageStoreWrapper store, ActorSystem actorSystem) {
this.store = store;
this.actorSystem = actorSystem;
this.subscribers = new ConcurrentHashMap<>();
}

void pull(PullMessageProcessor.PullEntry pullEntry) {
// subject+group作actor调度粒度
final String actorPath = ConsumerGroupUtils.buildConsumerGroupKey(pullEntry.subject, pullEntry.group);
// actor调度
actorSystem.dispatch(actorPath, pullEntry, this);
}

@Override
public boolean process(PullMessageProcessor.PullEntry entry
, ActorSystem.Actor<PullMessageProcessor.PullEntry> self) {
QMon.pullQueueTime(entry.subject, entry.group, entry.pullBegin);

//开始处理请求的时候就过期了,那么就直接不处理了,也不返回任何东西给客户端,客户端等待超时
//因为出现这种情况一般是server端排队严重,暂时挂起客户端可以避免情况恶化
// deadline机制,如果QMQ认为这个消费请求来不及处理,那么就直接返回,避免雪崩
if (entry.expired()) {
QMon.pullExpiredCountInc(entry.subject, entry.group);
return true;
}

if (entry.isInValid()) {
QMon.pullInValidCountInc(entry.subject, entry.group);
return true;
}

// 存储层find消息
final PullMessageResult pullMessageResult = store.findMessages(entry.pullRequest);

if (pullMessageResult == PullMessageResult.FILTER_EMPTY ||
pullMessageResult.getMessageNum() > 0
|| entry.isPullOnce()
|| entry.isTimeout()) {
entry.processMessageResult(pullMessageResult);
return true;
}

// 没有拉取到消息,那么挂起该actor
self.suspend();
// timer task,在超时前唤醒actor
if (entry.setTimerOnDemand()) {
QMon.suspendRequestCountInc(entry.subject, entry.group);
// 订阅消息,一有消息来就唤醒该actor
subscribe(entry.subject, entry.group);
return false;
}

// 已经超时,那么即刻唤醒调度
self.resume();
entry.processNoMessageResult();
return true;
}

// 订阅
private void subscribe(String subject, String group) {
ConcurrentMap<String, Object> map = subscribers.get(subject);
if (map == null) {
map = new ConcurrentHashMap<>();
map = ObjectUtils.defaultIfNull(subscribers.putIfAbsent(subject, map), map);
}
map.putIfAbsent(group, HOLDER);
}

// 有消息来就唤醒订阅的subscriber
void remindNewMessages(final String subject) {
final ConcurrentMap<String, Object> map = this.subscribers.get(subject);
if (map == null) return;

for (String group : map.keySet()) {
map.remove(group);
this.actorSystem.resume(ConsumerGroupUtils.buildConsumerGroupKey(subject, group));
QMon.resumeActorCountInc(subject, group);
}
}
}

// ActorSystem内定义的处理接口
public interface ActorSystem.Processor<T> {
boolean process(T message, Actor<T> self);
}

能看除在这里起作用的是这个 actorSystemPullMessageWorker 继承了 ActorSystem.Processor ,所以真正处理拉取请求的是这个接口里的 process 方法。请求到达 pullMessageWorker ,worker将该次请求交给 actorSystem 调度,调度到这次请求时,worker还有个根据拉取结果做反应的策略,即如果暂时没有消息,那么 suspend ,以一个timer task定时 resume ;如果在timer task执行之前有消息进来,那么也会即时 resume

ActorSystem

接下来就看看ActorSystem里边是如何做到 公平调度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ActorSystem {
// 内部维护的是一个ConcurrentMap,key即PullMessageWorker里的subject+group
private final ConcurrentMap<String, Actor> actors;
// 执行actor的executor
private final ThreadPoolExecutor executor;

private final AtomicInteger actorsCount;
private final String name;

public ActorSystem(String name) {
this(name, Runtime.getRuntime().availableProcessors() * 4, true);
}

public ActorSystem(String name, int threads, boolean fair) {
this.name = name;
this.actorsCount = new AtomicInteger();
// 这里根据fair参数初始化一个优先级队列作为executor的参数,处理关于前言里说的"老消息"的情况
BlockingQueue<Runnable> queue = fair ? new PriorityBlockingQueue<>() : new LinkedBlockingQueue<>();
this.executor = new ThreadPoolExecutor(threads, threads, 60, TimeUnit.MINUTES, queue, new NamedThreadFactory("actor-sys-" + name));
this.actors = Maps.newConcurrentMap();
QMon.dispatchersGauge(name, actorsCount::doubleValue);
QMon.actorSystemQueueGauge(name, () -> (double) executor.getQueue().size());
}
}

可以看到,用一个线程池处理actor的调度执行,这个线程池里的队列是一个优先级队列。优先级队列存储的元素是Actor。关于Actor我们稍后来看,先来看一下 ActorSystem 的处理调度流程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// PullMessageWorker调用的就是这个方法
public <E> void dispatch(String actorPath, E msg, Processor<E> processor) {
// 取得actor
Actor<E> actor = createOrGet(actorPath, processor);
// 在后文Actor定义里能看到,actor内部维护一个queue,这里actor仅仅是offer(msg)
actor.dispatch(msg);
// 执行调度
schedule(actor, true);
}

// 无消息时,则会挂起
public void suspend(String actorPath) {
Actor actor = actors.get(actorPath);
if (actor == null) return;

actor.suspend();
}

// 有消息则恢复,可以理解成线程的"就绪状态"
public void resume(String actorPath) {
Actor actor = actors.get(actorPath);
if (actor == null) return;

actor.resume();
// 立即调度,可以留意一下那个false
// 当actor是"可调度状态"时,这个actor是否能调度是取决于actor的queue是否有消息
schedule(actor, false);
}

private <E> Actor<E> createOrGet(String actorPath, Processor<E> processor) {
Actor<E> actor = actors.get(actorPath);
if (actor != null) return actor;

Actor<E> add = new Actor<>(this.name, actorPath, this, processor, DEFAULT_QUEUE_SIZE);
Actor<E> old = actors.putIfAbsent(actorPath, add);
if (old == null) {
LOG.info("create actorSystem: {}", actorPath);
actorsCount.incrementAndGet();
return add;
}
return old;
}

// 将actor入队的地方
private <E> boolean schedule(Actor<E> actor, boolean hasMessageHint) {
// 如果actor不能调度,则ret false
if (!actor.canBeSchedule(hasMessageHint)) return false;
// 设置actor为"可调度状态"
if (actor.setAsScheduled()) {
// 提交时间,和actor执行总耗时共同决定在队列里的优先级
actor.submitTs = System.currentTimeMillis();
// 入队,入的是线程池里的优先级队列
this.executor.execute(actor);
return true;
}
// actor.setAsScheduled()里,这里是actor已经是可调度状态,那么没必要再次入队
return false;
}

actorSystem维护一个线程池,线程池队列具有优先级,队列存储元素是actor。actor的粒度是subject+group。Actor是一个Runnable,且因为是优先级队列的存储元素所以需继承Comparable接口(队列并没有传_Comparator参数_),并且actor有四种状态,初始状态、可调度状态、挂起状态、调度状态(这个状态其实不存在,但是暂且这么叫以帮助理解)。
接下来看看Actor这个类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
public static class Actor<E> implements Runnable, Comparable<Actor> {
// 初始状态
private static final int Open = 0;
// 可调度状态
private static final int Scheduled = 2;
// 掩码,二进制表示:11 与Open和Scheduled作&运算
// shouldScheduleMask&currentStatus != Open 则为不可置为调度状态(当currentStatus为挂起状态或调度状态)
private static final int shouldScheduleMask = 3;
private static final int shouldNotProcessMask = ~2;
// 挂起状态
private static final int suspendUnit = 4;
//每个actor至少执行的时间片
private static final int QUOTA = 5;
// status属性内存偏移量,用Unsafe操作
private static long statusOffset;

static {
try {
statusOffset = Unsafe.instance.objectFieldOffset(Actor.class.getDeclaredField("status"));
} catch (Throwable t) {
throw new ExceptionInInitializerError(t);
}
}

final String systemName;
final ActorSystem actorSystem;
// actor内部维护的queue,后文简单分析下
final BoundedNodeQueue<E> queue;
// ActorSystem内部定义接口,PullMessageWorker实现的就是这个接口,用于真正业务逻辑处理的地方
final Processor<E> processor;
private final String name;
// 一个actor执行总耗时
private long total;
// actor执行提交时间,即actor入队时间
private volatile long submitTs;
//通过Unsafe操作
private volatile int status;

Actor(String systemName, String name, ActorSystem actorSystem, Processor<E> processor, final int queueSize) {
this.systemName = systemName;
this.name = name;
this.actorSystem = actorSystem;
this.processor = processor;
this.queue = new BoundedNodeQueue<>(queueSize);

QMon.actorQueueGauge(systemName, name, () -> (double) queue.count());
}

// 入队,是actor内部的队列
boolean dispatch(E message) {
return queue.add(message);
}

// actor执行的地方
@Override
public void run() {
long start = System.currentTimeMillis();
String old = Thread.currentThread().getName();
try {
Thread.currentThread().setName(systemName + "-" + name);
if (shouldProcessMessage()) {
processMessages();
}
} finally {
long duration = System.currentTimeMillis() - start;
// 每次actor执行的耗时累加到total
total += duration;
QMon.actorProcessTime(name, duration);

Thread.currentThread().setName(old);
// 设置为"空闲状态",即初始状态 (currentStatus & ~Scheduled)
setAsIdle();
// 进行下一次调度
this.actorSystem.schedule(this, false);
}
}

void processMessages() {
long deadline = System.currentTimeMillis() + QUOTA;
while (true) {
E message = queue.peek();
if (message == null) return;
// 处理业务逻辑
boolean process = processor.process(message, this);
// 失败,该message不会出队,等待下一次调度
// 如pullMessageWorker中没有消息时将actor挂起
if (!process) return;

// 出队
queue.pollNode();
// 每个actor只有QUOTA个时间片的执行时间
if (System.currentTimeMillis() >= deadline) return;
}
}

final boolean shouldProcessMessage() {
// 能够真正执行业务逻辑的判断
// 一种场景是,针对挂起状态,由于没有拉取到消息该actor置为挂起状态
// 自然就没有抢占时间片的必要了
return (currentStatus() & shouldNotProcessMask) == 0;
}

// 能否调度
private boolean canBeSchedule(boolean hasMessageHint) {
int s = currentStatus();
if (s == Open || s == Scheduled) return hasMessageHint || !queue.isEmpty();
return false;
}

public final boolean resume() {
while (true) {
int s = currentStatus();
int next = s < suspendUnit ? s : s - suspendUnit;
if (updateStatus(s, next)) return next < suspendUnit;
}
}

public final void suspend() {
while (true) {
int s = currentStatus();
if (updateStatus(s, s + suspendUnit)) return;
}
}

final boolean setAsScheduled() {
while (true) {
int s = currentStatus();
// currentStatus为非Open状态,则ret false
if ((s & shouldScheduleMask) != Open) return false;
// 更新actor状态为调度状态
if (updateStatus(s, s | Scheduled)) return true;
}
}

final void setAsIdle() {
while (true) {
int s = currentStatus();
// 更新actor状态位不可调度状态,(这里可以理解为更新为初始状态Open)
if (updateStatus(s, s & ~Scheduled)) return;
}
}

final int currentStatus() {
// 根据status在内存中的偏移量取得status
return Unsafe.instance.getIntVolatile(this, statusOffset);
}

private boolean updateStatus(int oldStatus, int newStatus) {
// Unsafe 原子操作,处理status的轮转变更
return Unsafe.instance.compareAndSwapInt(this, statusOffset, oldStatus, newStatus);
}

// 决定actor在优先级队列里的优先级的地方
// 先看总耗时,以达到动态限速,保证执行"慢"的请求(已经堆积的消息拉取请求)在后执行
// 其次看提交时间,先提交的actor先执行
@Override
public int compareTo(Actor o) {
int result = Long.compare(total, o.total);
return result == 0 ? Long.compare(submitTs, o.submitTs) : result;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Actor<?> actor = (Actor<?>) o;
return Objects.equals(systemName, actor.systemName) &&
Objects.equals(name, actor.name);
}

@Override
public int hashCode() {
return Objects.hash(systemName, name);
}
}

Actor实现了Comparable,在优先级队列里优先级是Actor里的total和submitTs共同决定的。total是actor执行总耗时,submitTs是调度时间。那么对于处理较慢的actor自然就会在队列里相对”尾部”位置,这时就做到了根据actor的执行耗时的一个动态限速。Actor利用Unsafe机制来控制各个状态的轮转原子性更新的,且每个actor执行时间可以简单理解为5个时间片。
其实工作进行到这里就可以结束了,但是抱着研究的态度,不妨接着往下看看。
Actor内部维护一个Queue,这个Queue是自定义的,是一个Lock-free bounded non-blocking multiple-producer single-consumer queue。JDK里的QUEUE多数都是用锁控制,不用锁,猜测也应该是用Unsafe 原子操作实现。那么来看看吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
  private static class BoundedNodeQueue<T> {

// 头结点、尾节点在内存中的偏移量
private final static long enqOffset, deqOffset;

static {
try {
enqOffset = Unsafe.instance.objectFieldOffset(BoundedNodeQueue.class.getDeclaredField("_enqDoNotCallMeDirectly"));
deqOffset = Unsafe.instance.objectFieldOffset(BoundedNodeQueue.class.getDeclaredField("_deqDoNotCallMeDirectly"));
} catch (Throwable t) {
throw new ExceptionInInitializerError(t);
}
}

private final int capacity;
// 尾节点,通过enqOffset操作
private volatile Node<T> _enqDoNotCallMeDirectly;
// 头结点,通过deqOffset操作
private volatile Node<T> _deqDoNotCallMeDirectly;

protected BoundedNodeQueue(final int capacity) {
if (capacity < 0) throw new IllegalArgumentException("AbstractBoundedNodeQueue.capacity must be >= 0");
this.capacity = capacity;
final Node<T> n = new Node<T>();
setDeq(n);
setEnq(n);
}

// 获取尾节点
private Node<T> getEnq() {
// getObjectVolatile这种方式保证拿到的都是最新数据
return (Node<T>) Unsafe.instance.getObjectVolatile(this, enqOffset);
}

// 设置尾节点,仅在初始化时用
private void setEnq(Node<T> n) {
Unsafe.instance.putObjectVolatile(this, enqOffset, n);
}

private boolean casEnq(Node<T> old, Node<T> nju) {
// cas,循环设置,直到成功
return Unsafe.instance.compareAndSwapObject(this, enqOffset, old, nju);
}

// 获取头结点
private Node<T> getDeq() {
return (Node<T>) Unsafe.instance.getObjectVolatile(this, deqOffset);
}

// 仅在初始化时用
private void setDeq(Node<T> n) {
Unsafe.instance.putObjectVolatile(this, deqOffset, n);
}

// cas设置头结点
private boolean casDeq(Node<T> old, Node<T> nju) {
return Unsafe.instance.compareAndSwapObject(this, deqOffset, old, nju);
}

// 与其叫count,不如唤作index,但是是否应该考虑溢出的情况?
public final int count() {
final Node<T> lastNode = getEnq();
final int lastNodeCount = lastNode.count;
return lastNodeCount - getDeq().count;
}

/**
* @return the maximum capacity of this queue
*/
public final int capacity() {
return capacity;
}

public final boolean add(final T value) {
for (Node<T> n = null; ; ) {
final Node<T> lastNode = getEnq();
final int lastNodeCount = lastNode.count;
if (lastNodeCount - getDeq().count < capacity) {
// Trade a branch for avoiding to create a new node if full,
// and to avoid creating multiple nodes on write conflict á la Be Kind to Your GC
if (n == null) {
n = new Node<T>();
n.value = value;
}

n.count = lastNodeCount + 1; // Piggyback on the HB-edge between getEnq() and casEnq()

// Try to putPullLogs the node to the end, if we fail we continue loopin'
// 相当于
// enq -> next = new Node(value); enq = neq -> next;
if (casEnq(lastNode, n)) {
// 注意一下这个Node.setNext方法
lastNode.setNext(n);
return true;
}
} else return false; // Over capacity—couldn't add the node
}
}

public final boolean isEmpty() {
// enq == deq 即为empty
return getEnq() == getDeq();
}

/**
* Removes the first element of this queue if any
*
* @return the value of the first element of the queue, null if empty
*/
public final T poll() {
final Node<T> n = pollNode();
return (n != null) ? n.value : null;
}

public final T peek() {
Node<T> n = peekNode();
return (n != null) ? n.value : null;
}

protected final Node<T> peekNode() {
for (; ; ) {
final Node<T> deq = getDeq();
final Node<T> next = deq.next();
if (next != null || getEnq() == deq)
return next;
}
}

/**
* Removes the first element of this queue if any
*
* @return the `Node` of the first element of the queue, null if empty
*/
public final Node<T> pollNode() {
for (; ; ) {
final Node<T> deq = getDeq();
final Node<T> next = deq.next();
if (next != null) {
if (casDeq(deq, next)) {
deq.value = next.value;
deq.setNext(null);
next.value = null;
return deq;
} // else we retry (concurrent consumers)
// 比较套路的cas操作,就不多说了
} else if (getEnq() == deq) return null; // If we got a null and head meets tail, we are empty
}
}

public static class Node<T> {

private final static long nextOffset;

static {
try {
nextOffset = Unsafe.instance.objectFieldOffset(Node.class.getDeclaredField("_nextDoNotCallMeDirectly"));
} catch (Throwable t) {
throw new ExceptionInInitializerError(t);
}
}

protected T value;
protected int count;
// 也是利用偏移量操作
private volatile Node<T> _nextDoNotCallMeDirectly;

public final Node<T> next() {
return (Node<T>) Unsafe.instance.getObjectVolatile(this, nextOffset);
}

protected final void setNext(final Node<T> newNext) {
// 这里有点讲究,下面分析下
Unsafe.instance.putOrderedObject(this, nextOffset, newNext);
}
}
}

如上代码,是通过属性在内存的偏移量,结合cas原子操作来进行更新赋值等操作,以此来实现lock-free,这是比较常规的套路。值得一说的是Node里的setNext方法,这个方法的调用是在cas节点后,对”上一位置”的next节点进行赋值。而这个方法使用的是 Unsafe.instance.putOrderedObject ,要说这个putOrderedObject ,就不得不说MESI,缓存一致性协议。如volatile,当进行写操作时,它是依靠storeload barrier来实现其他线程对此的可见性。而 putOrderedObject 也是依靠内存屏障,只不过是 storestore barrier 。storestore是比storeload快速的一种内存屏障。在硬件层面,内存屏障分两种:Load-Barrier和Store-Barrier。Load-Barrier是让高速缓存中的数据失效,强制重新从主内存加载数据;Store-Barrier是让写入高速缓存的数据更新写入主内存,对其他线程可见。而java层面的四种内存屏障无非是硬件层面的两种内存屏障的组合而已。那么可见,storestore barrier自然比storeload barrier快速。那么有一个问题,我们可不可以在这里也用cas操作呢?答案是可以,但没必要。你可以想想这里为什么没必要。
谢谢。

-------------The End-------------