前言
上篇我们分析了QMQ delay-server关于存储的部分,这一篇我们会对投递的源码进行分析。
投递
投递的相关内容在WheelTickManager这个类。提前加载schedule_log、wheel根据延迟时间到时进行投递等相关工作都在这里完成。而关于真正进行投递的相关类是在sender这个包里。
wheel
wheel包里一共就三个类文件, HashWheelTimer
、 WheelLoadCursor
、 WheelTickManager
, WheelTickManager
就应该是wheel加载文件,wheel中的消息到时投递的管理器; WheelLoadCursor
应该就是上一篇中提到的schedule_log文件加载到哪里的cursor标识;那么 HashWheelTimer
就是一个辅助工具类,简单理解成Java中的 ScheduledExecutorService
,可理解成是根据延迟消息的延迟时间进行投递的timer,所以这里不对这个工具类做更多解读,我们更关心MQ逻辑。
首先来看提前一定时间加载schedule_log,这里的提前一定时间是多长时间呢?这个是根据需要配置的,比如schedule_log的刻度自定义配置为1h,提前加载时间配置为30min,那么在2019-02-10 17:30就应该加载2019021018这个schedule_log。
1 |
|
recover
这个方法,会根据dispatch log中的投递记录,找到上一次最后投递的位置,在delay-server重启的时候,wheel会根据这个位置恢复投递。
1 | private void recover() { |
恢复基本就是以上的这些内容,接下来看看是如何加载的。
1 | private void load() { |
根据配置的提前加载时间,内存中的wheel会提前加载schedule_log,加载是在一个while循环里,直到加载到until delay segment才退出,如果当前没有until 这个delay segment,那么会在配置的 blockingExitTime
时间退出该循环,而为了避免cpu load过高,这里会在每次循环间隔设置100ms sleep。这里加载为什么是在while循环里?以及为什么sleep 100ms,sleep 500ms 或者1s可不可以?以及为什么要设置个blockingExitTime呢?下面的分析之后,应该就能回答这些问题了。主要考虑两种情况,一种是当之前一直没有delay segment或者delay segment是间隔存在的,比如delay segment刻度为1h,2019031001和2019031004之间的2019031002及2019031003不存在这种之类的delay segment不存在的情况,另一种是当正在加载delay segment的时候,位于该segment的延迟消息正在被加载,这种情况是有可能丢消息的。所以这里加载是在一个循环里,以及设置了两个cursor,即loading cursor,和loaded cursor。一个表示正在加载,一个表示已经加载。此外,上面每次循环sleep 100ms,可不可以sleep 500ms or 1s?答案是可以,只是消息是否能容忍500ms 或者1s的延迟。
1 | private boolean loadUntilInternal(int until) { |
还记得上一篇我们提到过,存储的时候,如果这个消息位于正在被wheel加载segment中,那么这个消息应该是会被加载到wheel中的。
1 |
|
sender
sender包里结构如下图:
通过brokerGroup做分组,根据组批量发送,发送时是多线程发送,每个组互不影响,发送时也会根据实时broker的weight进行选择考虑broker进行发送。
1 |
|
可以看到,投递时是根据server broker进行分组投递。看一下 SenderGroup
这个类
可以看到,每个组的投递是多线程,互不影响,不会存在某个组的server挂掉,导致其他组无法投递。并且这里如果存在某个组无法投递,重试时会选择其它的server broker进行重试。与此同时,在选择组时,会根据每个server broker的weight进行综合考量,即当前server broker有多少消息量要发送。
1 | // 具体发送的地方 |
就是以上这些,关于QMQ的delay-server源码分析就是这些了,如果以后有机会会分析一下QMQ的其他模块源码,谢谢。