前言
QMQ有关actor的一篇文章阐述了actor的应用场景。即client消费消息的请求会先进入一个RequestQueue,在client消费消息时,往往存在多个主题、多个消费组共享一个RequestQueue消费消息。在这个Queue中,存在不同主题的有不同消费组数量,以及不同消费组有不同consumer数量,那么就会存在抢占资源的情况。举个文章中的例子,一个主题下有两个消费组A和B,A有100个consumer,B有200个consumer,那么在RequestQueue中来自B的请求可能会多于A,这个时候就存在消费unfair的情况,所以需要隔离不同主题不同消费组以保证fair。除此之外,当consumer消费能力不足,造成broker消息堆积,这个时候就会导致consumer所在消费组总在消费”老消息”,影响全局整体的一个消费能力。因为”老消息”不会存在page cache中,这个时候很可能就会从磁盘load,那么表现是RequestQueue中来自消费”老消息”消费组的请求处理时间过长,影响到其他主题消费组的消费,因此这个时候也需要做策略来避免不同消费组的相互影响。所以QMQ就有了actor机制,以消除各个消费组之间因消费能力不同、consumer数量不同而造成的相互影响各自的消费能力。
PullMessageWorker
要了解QMQ的actor模式是如何起作用的,就要先来看看Broker是如何处理消息拉取请求的。
1 | class PullMessageWorker implements ActorSystem.Processor<PullMessageProcessor.PullEntry> { |
能看除在这里起作用的是这个 actorSystem
。 PullMessageWorker
继承了 ActorSystem.Processor
,所以真正处理拉取请求的是这个接口里的 process
方法。请求到达 pullMessageWorker
,worker将该次请求交给 actorSystem
调度,调度到这次请求时,worker还有个根据拉取结果做反应的策略,即如果暂时没有消息,那么 suspend
,以一个timer task定时 resume
;如果在timer task执行之前有消息进来,那么也会即时 resume
。
ActorSystem
接下来就看看ActorSystem里边是如何做到 公平调度 。
1 | public class ActorSystem { |
可以看到,用一个线程池处理actor的调度执行,这个线程池里的队列是一个优先级队列。优先级队列存储的元素是Actor。关于Actor我们稍后来看,先来看一下 ActorSystem
的处理调度流程。
1 | // PullMessageWorker调用的就是这个方法 |
actorSystem维护一个线程池,线程池队列具有优先级,队列存储元素是actor。actor的粒度是subject+group。Actor是一个Runnable,且因为是优先级队列的存储元素所以需继承Comparable接口(队列并没有传_Comparator参数_),并且actor有四种状态,初始状态、可调度状态、挂起状态、调度状态(这个状态其实不存在,但是暂且这么叫以帮助理解)。
接下来看看Actor这个类:
1 | public static class Actor<E> implements Runnable, Comparable<Actor> { |
Actor实现了Comparable
,在优先级队列里优先级是Actor里的total和submitTs共同决定的。total是actor执行总耗时,submitTs是调度时间。那么对于处理较慢的actor自然就会在队列里相对”尾部”位置,这时就做到了根据actor的执行耗时的一个动态限速。Actor利用Unsafe机制来控制各个状态的轮转原子性更新的,且每个actor执行时间可以简单理解为5个时间片。
其实工作进行到这里就可以结束了,但是抱着研究的态度,不妨接着往下看看。
Actor内部维护一个Queue,这个Queue是自定义的,是一个Lock-free bounded non-blocking multiple-producer single-consumer queue。JDK里的QUEUE多数都是用锁控制,不用锁,猜测也应该是用Unsafe 原子操作实现。那么来看看吧:
1 | private static class BoundedNodeQueue<T> { |
如上代码,是通过属性在内存的偏移量,结合cas原子操作来进行更新赋值等操作,以此来实现lock-free,这是比较常规的套路。值得一说的是Node里的setNext方法,这个方法的调用是在cas节点后,对”上一位置”的next节点进行赋值。而这个方法使用的是 Unsafe.instance.putOrderedObject
,要说这个putOrderedObject ,就不得不说MESI,缓存一致性协议。如volatile,当进行写操作时,它是依靠storeload barrier来实现其他线程对此的可见性。而 putOrderedObject
也是依靠内存屏障,只不过是 storestore barrier
。storestore是比storeload快速的一种内存屏障。在硬件层面,内存屏障分两种:Load-Barrier和Store-Barrier。Load-Barrier是让高速缓存中的数据失效,强制重新从主内存加载数据;Store-Barrier是让写入高速缓存的数据更新写入主内存,对其他线程可见。而java层面的四种内存屏障无非是硬件层面的两种内存屏障的组合而已。那么可见,storestore barrier自然比storeload barrier快速。那么有一个问题,我们可不可以在这里也用cas操作呢?答案是可以,但没必要。你可以想想这里为什么没必要。
谢谢。