前言
本来是固定时间周六更博,但是昨天临时失恋了,所以心情不好,晚了一天。那么上一篇我们梳理了下QMQ延迟消息的主要功能,这一篇在此基础上,对照着功能分析一下源码。
整体结构
要了解delay-server源码的一个整体结构,需要我们跟着源码,从初始化开始简单先过一遍。重试工作都在startup这个包里,而这个包只有一个ServerWrapper类。
结合上一篇的内容,通过这个类就基本能看到delay的一个源码结构。delay-server基于netty,init方法完成初始化工作(端口默认为20801、心跳、wheel等),register方法是向meta-server发起请求,获取自己自己的角色
为 delay
,并开始和meta-server的心跳。startServer方法是开始HashWheel的转动,从上次结束的位置继续message_log的回放,开启netty server。另外在做准备工作时知道QMQ是基于一主一从一备的方式,关于这个sync方法,是开启监听一个端口回应同步拉取
动作,如果是从节点还要开始向主节点发起同步拉取
动作。当这一切都完成了,那么online方法就执行,表示delay开始上线提供服务了。总结一下两个要点,QMQ是基于netty进行通信,并且采用一主一从一备的方式。
存储
关于存储在之前我们也讨论了,delay-server接收到延迟消息,会顺序append到message_log,之后再对message_log进行回放,以生成schedule_log。所以关于存储我们需要关注两个东西,一个是message_log的存储,另一个是schedule_log的生成。
message_log
其实 message_log
的生成很简单,就是顺序append。主要逻辑在 qunar.tc.qmq.delay.receiver.Receiver
这个类里,大致流程就是关于QMQ自定义协议的一个反序列化,然后再对序列化的单个消息进行存储。如图:
主要逻辑在途中标红方法 doInvoke
中。
1 | private void doInvoke(ReceivedDelayMessage message) { |
delay存储层相关逻辑都在facade这个类里,初始化时类似消息的校验等工作也都在这里,而message_log的相关操作都在 messageLog
里。
1 |
|
以上基本就是message_log的存储部分,接下来我们来看message_log的回放生成schedule_log。
schedule_log
MessageLogReplayer这个类就是控制回放的地方。那么考虑一个问题,下一次重启的时候,我们该从哪里进行回放?QMQ是会有一个回放的offset,这个offset会定时刷盘,下次重启的时候会从这个offset位置开始回放。细节可以看一下下面这段代码块。
1 | final LogVisitor<LogRecord> visitor = facade.newMessageLogVisitor(iterateFrom.longValue()); |
注意这里除了offset还有个cursor,这是为了防止回放失败,sleep 5ms后再次回放的时候从cursor位置开始,避免重复消息。那么我们看一下dispatcher.post这个方法:
1 |
|
如以上代码,我们看略过schedule_log的存储,看一下那个callback是几个意思:
1 | private boolean iterateCallback(final ScheduleIndex index) { |
这里的意思是,delay-server接收到消息,会判断一下这个消息是否需要add到内存中的wheel中,以防止丢消息。大家记着有这个事情,在投递小节中我们回过头来再说这里。那么回到 facade.appendScheduleLog
这个方法,schedule_log相关操作在scheduleLog里:
1 |
|
留意 locateSegment
这个方法,它是根据延迟时间定位 DelaySegment
,比如如果延迟时间是2019-03-03 16:00:00,那么就会定位到201903031600这个DelaySegment(注:这里贴的代码不是最新的,最新的是 DelaySegment
的刻度是可以配置,到分钟级别)。同样,具体动作也是 appender
做的,如下:
1 |
|
这里也能看到schedule_log的消息格式。
发现就写了个存储篇幅就已经挺大了,投递涉及到的内容可能更多,那么关于投递就开个下一篇吧。