QMQ源码分析之delay-server篇【二】

前言

本来是固定时间周六更博,但是昨天临时失恋了,所以心情不好,晚了一天。那么上一篇我们梳理了下QMQ延迟消息的主要功能,这一篇在此基础上,对照着功能分析一下源码。

整体结构

要了解delay-server源码的一个整体结构,需要我们跟着源码,从初始化开始简单先过一遍。重试工作都在startup这个包里,而这个包只有一个ServerWrapper类。image.png
结合上一篇的内容,通过这个类就基本能看到delay的一个源码结构。delay-server基于netty,init方法完成初始化工作(端口默认为20801、心跳、wheel等),register方法是向meta-server发起请求,获取自己自己的角色delay ,并开始和meta-server的心跳。startServer方法是开始HashWheel的转动,从上次结束的位置继续message_log的回放,开启netty server。另外在做准备工作时知道QMQ是基于一主一从一备的方式,关于这个sync方法,是开启监听一个端口回应同步拉取动作,如果是从节点还要开始向主节点发起同步拉取动作。当这一切都完成了,那么online方法就执行,表示delay开始上线提供服务了。总结一下两个要点,QMQ是基于netty进行通信,并且采用一主一从一备的方式。

存储

关于存储在之前我们也讨论了,delay-server接收到延迟消息,会顺序append到message_log,之后再对message_log进行回放,以生成schedule_log。所以关于存储我们需要关注两个东西,一个是message_log的存储,另一个是schedule_log的生成。

message_log

其实 message_log 的生成很简单,就是顺序append。主要逻辑在 qunar.tc.qmq.delay.receiver.Receiver 这个类里,大致流程就是关于QMQ自定义协议的一个反序列化,然后再对序列化的单个消息进行存储。如图:image.png
主要逻辑在途中标红方法 doInvoke 中。

1
2
3
4
5
6
7
8
9
10
11
private void doInvoke(ReceivedDelayMessage message) {
// ...

try {
// 注:这里是进行append的地方
ReceivedResult result = facade.appendMessageLog(message);
offer(message, result);
} catch (Throwable t) {
error(message, t);
}
}

delay存储层相关逻辑都在facade这个类里,初始化时类似消息的校验等工作也都在这里,而message_log的相关操作都在 messageLog 里。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
@Override
public AppendMessageRecordResult append(RawMessageExtend record) {
AppendMessageResult<Long> result;
// 注:当前最新的一个segment
LogSegment segment = logManager.latestSegment();
if (null == segment) {
segment = logManager.allocNextSegment();
}

if (null == segment) {
return new AppendMessageRecordResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, null);
}

// 注:真正进行append的动作是messageAppender
result = segment.append(record, messageAppender);
switch (result.getStatus()) {
case MESSAGE_SIZE_EXCEEDED:
return new AppendMessageRecordResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
case END_OF_FILE:
if (null == logManager.allocNextSegment()) {
return new AppendMessageRecordResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, null);
}
return append(record);
case SUCCESS:
return new AppendMessageRecordResult(PutMessageStatus.SUCCESS, result);
default:
return new AppendMessageRecordResult(PutMessageStatus.UNKNOWN_ERROR, result);
}
}

// 看一下这个appender,也可以通过这里能看到QMQ的delay message 格式定义
private class DelayRawMessageAppender implements MessageAppender<RawMessageExtend, Long> {
private final ReentrantLock lock = new ReentrantLock();
private final ByteBuffer workingBuffer = ByteBuffer.allocate(1024);

@Override
public AppendMessageResult<Long> doAppend(long baseOffset, ByteBuffer targetBuffer, int freeSpace, RawMessageExtend message) {
// 这个lock这里影响不大
lock.lock();
try {
workingBuffer.clear();

final String messageId = message.getHeader().getMessageId();
final byte[] messageIdBytes = messageId.getBytes(StandardCharsets.UTF_8);
final String subject = message.getHeader().getSubject();
final byte[] subjectBytes = subject.getBytes(StandardCharsets.UTF_8);

final long startWroteOffset = baseOffset + targetBuffer.position();
final int recordSize = recordSizeWithCrc(messageIdBytes.length, subjectBytes.length, message.getBodySize());

if (recordSize > config.getSingleMessageLimitSize()) {
return new AppendMessageResult<>(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED, startWroteOffset, freeSpace, null);
}

workingBuffer.flip();
if (recordSize != freeSpace && recordSize + MIN_RECORD_BYTES > freeSpace) {
// 填充
workingBuffer.limit(freeSpace);
workingBuffer.putInt(MESSAGE_LOG_MAGIC_V1);
workingBuffer.put(MessageLogAttrEnum.ATTR_EMPTY_RECORD.getCode());
workingBuffer.putLong(System.currentTimeMillis());
targetBuffer.put(workingBuffer.array(), 0, freeSpace);
return new AppendMessageResult<>(AppendMessageStatus.END_OF_FILE, startWroteOffset, freeSpace, null);
} else {
int headerSize = recordSize - message.getBodySize();
workingBuffer.limit(headerSize);
workingBuffer.putInt(MESSAGE_LOG_MAGIC_V2);
workingBuffer.put(MessageLogAttrEnum.ATTR_MESSAGE_RECORD.getCode());
workingBuffer.putLong(System.currentTimeMillis());
// 注意这里,是schedule_time ,即延迟时间
workingBuffer.putLong(message.getScheduleTime());
// sequence,每个brokerGroup应该是唯一的
workingBuffer.putLong(sequence.incrementAndGet());
workingBuffer.putInt(messageIdBytes.length);
workingBuffer.put(messageIdBytes);
workingBuffer.putInt(subjectBytes.length);
workingBuffer.put(subjectBytes);
workingBuffer.putLong(message.getHeader().getBodyCrc());
workingBuffer.putInt(message.getBodySize());
targetBuffer.put(workingBuffer.array(), 0, headerSize);
targetBuffer.put(message.getBody().nioBuffer());

final long payloadOffset = startWroteOffset + headerSize;
return new AppendMessageResult<>(AppendMessageStatus.SUCCESS, startWroteOffset, recordSize, payloadOffset);
}
} finally {
lock.unlock();
}
}
}

以上基本就是message_log的存储部分,接下来我们来看message_log的回放生成schedule_log。

schedule_log

MessageLogReplayer这个类就是控制回放的地方。那么考虑一个问题,下一次重启的时候,我们该从哪里进行回放?QMQ是会有一个回放的offset,这个offset会定时刷盘,下次重启的时候会从这个offset位置开始回放。细节可以看一下下面这段代码块。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
final LogVisitor<LogRecord> visitor = facade.newMessageLogVisitor(iterateFrom.longValue());
adjustOffset(visitor);

while (true) {
final Optional<LogRecord> recordOptional = visitor.nextRecord();
if (recordOptional.isPresent() && recordOptional.get() == DelayMessageLogVisitor.EMPTY_LOG_RECORD) {
break;
}

recordOptional.ifPresent((record) -> {
// post以进行存储
dispatcher.post(record);
long checkpoint = record.getStartWroteOffset() + record.getRecordSize();
this.cursor.addAndGet(record.getRecordSize());
facade.updateIterateOffset(checkpoint);
});
}
iterateFrom.add(visitor.visitedBufferSize());

try {
TimeUnit.MILLISECONDS.sleep(5);
} catch (InterruptedException e) {
LOGGER.warn("message log iterate sleep interrupted");
}

注意这里除了offset还有个cursor,这是为了防止回放失败,sleep 5ms后再次回放的时候从cursor位置开始,避免重复消息。那么我们看一下dispatcher.post这个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
  @Override
public void post(LogRecord event) {
// 这里是schedule_log
AppendLogResult<ScheduleIndex> result = facade.appendScheduleLog(event);
int code = result.getCode();
if (MessageProducerCode.SUCCESS != code) {
LOGGER.error("appendMessageLog schedule log error,log:{} {},code:{}", event.getSubject(), event.getMessageId(), code);
throw new AppendException("appendScheduleLogError");
}

// 先看这里
iterateCallback.apply(result.getAdditional());
}

如以上代码,我们看略过schedule_log的存储,看一下那个callback是几个意思:

1
2
3
4
5
6
7
8
9
10
11
12
13
private boolean iterateCallback(final ScheduleIndex index) {
// 延迟时间
long scheduleTime = index.getScheduleTime();
// 这个offset是startOffset,即在delay_segment中的这个消息的起始位置
long offset = index.getOffset();
// 是否add到内存中的HashWheel
if (wheelTickManager.canAdd(scheduleTime, offset)) {
wheelTickManager.addWHeel(index);
return true;
}

return false;
}

这里的意思是,delay-server接收到消息,会判断一下这个消息是否需要add到内存中的wheel中,以防止丢消息。大家记着有这个事情,在投递小节中我们回过头来再说这里。那么回到 facade.appendScheduleLog 这个方法,schedule_log相关操作在scheduleLog里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public RecordResult<T> append(LogRecord record) {
long scheduleTime = record.getScheduleTime();
// 这里是根据延迟时间定位对应的delaySegment的
DelaySegment<T> segment = locateSegment(scheduleTime);
if (null == segment) {
segment = allocNewSegment(scheduleTime);
}

if (null == segment) {
return new NopeRecordResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED);
}

// 具体动作在append里
return retResult(segment.append(record, appender));
}

留意 locateSegment 这个方法,它是根据延迟时间定位 DelaySegment ,比如如果延迟时间是2019-03-03 16:00:00,那么就会定位到201903031600这个DelaySegment(注:这里贴的代码不是最新的,最新的是 DelaySegment 的刻度是可以配置,到分钟级别)。同样,具体动作也是 appender 做的,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Override
public AppendRecordResult<ScheduleSetSequence> appendLog(LogRecord log) {
workingBuffer.clear();
workingBuffer.flip();
final byte[] subjectBytes = log.getSubject().getBytes(StandardCharsets.UTF_8);
final byte[] messageIdBytes = log.getMessageId().getBytes(StandardCharsets.UTF_8);
int recordSize = getRecordSize(log, subjectBytes.length, messageIdBytes.length);
workingBuffer.limit(recordSize);

long scheduleTime = log.getScheduleTime();
long sequence = log.getSequence();
workingBuffer.putLong(scheduleTime);
// message_log中的sequence
workingBuffer.putLong(sequence);
workingBuffer.putInt(log.getPayloadSize());
workingBuffer.putInt(messageIdBytes.length);
workingBuffer.put(messageIdBytes);
workingBuffer.putInt(subjectBytes.length);
workingBuffer.put(subjectBytes);
workingBuffer.put(log.getRecord());
workingBuffer.flip();
ScheduleSetSequence record = new ScheduleSetSequence(scheduleTime, sequence);
return new AppendRecordResult<>(AppendMessageStatus.SUCCESS, 0, recordSize, workingBuffer, record);
}

这里也能看到schedule_log的消息格式。

发现就写了个存储篇幅就已经挺大了,投递涉及到的内容可能更多,那么关于投递就开个下一篇吧。

-------------The End-------------