QMQ源码分析之delay-server篇【三】

前言

上篇我们分析了QMQ delay-server关于存储的部分,这一篇我们会对投递的源码进行分析。

投递

投递的相关内容在WheelTickManager这个类。提前加载schedule_log、wheel根据延迟时间到时进行投递等相关工作都在这里完成。而关于真正进行投递的相关类是在sender这个包里。

wheel

wheel包里一共就三个类文件, HashWheelTimerWheelLoadCursorWheelTickManagerWheelTickManager 就应该是wheel加载文件,wheel中的消息到时投递的管理器; WheelLoadCursor 应该就是上一篇中提到的schedule_log文件加载到哪里的cursor标识;那么 HashWheelTimer 就是一个辅助工具类,简单理解成Java中的 ScheduledExecutorService ,可理解成是根据延迟消息的延迟时间进行投递的timer,所以这里不对这个工具类做更多解读,我们更关心MQ逻辑。
首先来看提前一定时间加载schedule_log,这里的提前一定时间是多长时间呢?这个是根据需要配置的,比如schedule_log的刻度自定义配置为1h,提前加载时间配置为30min,那么在2019-02-10 17:30就应该加载2019021018这个schedule_log。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public void start() {
if (!isStarted()) {
sender.init();
// hash wheel timer,内存中的wheel
timer.start();
started.set(true);
// 根据dispatch log,从上次投递结束的地方恢复开始投递
recover();
// 加载线程,用于加载schedule_log
loadScheduler.scheduleWithFixedDelay(this::load, 0, config.getLoadSegmentDelayMinutes(), TimeUnit.MINUTES);
LOGGER.info("wheel started.");
}
}

recover 这个方法,会根据dispatch log中的投递记录,找到上一次最后投递的位置,在delay-server重启的时候,wheel会根据这个位置恢复投递。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private void recover() {
LOGGER.info("wheel recover...");
// 最新的dispatch log segment
DispatchLogSegment currentDispatchedSegment = facade.latestDispatchSegment();
if (currentDispatchedSegment == null) {
LOGGER.warn("load latest dispatch segment null");
return;
}

int latestOffset = currentDispatchedSegment.getSegmentBaseOffset();
DispatchLogSegment lastSegment = facade.lowerDispatchSegment(latestOffset);
if (null != lastSegment) doRecover(lastSegment);

// 根据最新的dispatch log segment进行恢复投递
doRecover(currentDispatchedSegment);
LOGGER.info("wheel recover done. currentOffset:{}", latestOffset);
}

private void doRecover(DispatchLogSegment dispatchLogSegment) {
int segmentBaseOffset = dispatchLogSegment.getSegmentBaseOffset();
ScheduleSetSegment setSegment = facade.loadScheduleLogSegment(segmentBaseOffset);
if (setSegment == null) {
LOGGER.error("load schedule index error,dispatch segment:{}", segmentBaseOffset);
return;
}

// 得到一个关于已投递记录的set
LongHashSet dispatchedSet = loadDispatchLog(dispatchLogSegment);
// 根据这个set,将最新的dispatch log segment中未投递的消息add in wheel。
WheelLoadCursor.Cursor loadCursor = facade.loadUnDispatch(setSegment, dispatchedSet, this::refresh);
int baseOffset = loadCursor.getBaseOffset();
// 记录cursor
loadingCursor.shiftCursor(baseOffset, loadCursor.getOffset());
loadedCursor.shiftCursor(baseOffset);
}

恢复基本就是以上的这些内容,接下来看看是如何加载的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
  private void load() {
// 提前一定时间加载到下一 delay segment
long next = System.currentTimeMillis() + config.getLoadInAdvanceTimesInMillis();
int prepareLoadBaseOffset = resolveSegment(next);
try {
// 加载到prepareLoadBaseOffset这个delay segment
loadUntil(prepareLoadBaseOffset);
} catch (InterruptedException ignored) {
LOGGER.debug("load segment interrupted");
}
}

private void loadUntil(int until) throws InterruptedException {
// 当前wheel已加载到baseOffset
int loadedBaseOffset = loadedCursor.baseOffset();
// 如已加载到until,则break
// have loaded
if (loadedBaseOffset > until) return;

do {
// 加载失败,则break
// wait next turn when loaded error.
if (!loadUntilInternal(until)) break;

// 当前并没有until这个delay segment,即loading cursor小于until
// load successfully(no error happened) and current wheel loading cursor < until
if (loadingCursor.baseOffset() < until) {
// 阻塞,直到thresholdTime+blockingExitTime
// 即如果提前blockingExitTime还未有until这个delay segment的消息进来,则退出
long thresholdTime = System.currentTimeMillis() + config.getLoadBlockingExitTimesInMillis();
// exit in a few minutes in advance
if (resolveSegment(thresholdTime) >= until) {
loadingCursor.shiftCursor(until);
loadedCursor.shiftCursor(until);
break;
}
}

// 避免cpu load过高
Thread.sleep(100);
} while (loadedCursor.baseOffset() < until);

LOGGER.info("wheel load until {} <= {}", loadedCursor.baseOffset(), until);
}

根据配置的提前加载时间,内存中的wheel会提前加载schedule_log,加载是在一个while循环里,直到加载到until delay segment才退出,如果当前没有until 这个delay segment,那么会在配置的 blockingExitTime 时间退出该循环,而为了避免cpu load过高,这里会在每次循环间隔设置100ms sleep。这里加载为什么是在while循环里?以及为什么sleep 100ms,sleep 500ms 或者1s可不可以?以及为什么要设置个blockingExitTime呢?下面的分析之后,应该就能回答这些问题了。主要考虑两种情况,一种是当之前一直没有delay segment或者delay segment是间隔存在的,比如delay segment刻度为1h,2019031001和2019031004之间的2019031002及2019031003不存在这种之类的delay segment不存在的情况,另一种是当正在加载delay segment的时候,位于该segment的延迟消息正在被加载,这种情况是有可能丢消息的。所以这里加载是在一个循环里,以及设置了两个cursor,即loading cursor,和loaded cursor。一个表示正在加载,一个表示已经加载。此外,上面每次循环sleep 100ms,可不可以sleep 500ms or 1s?答案是可以,只是消息是否能容忍500ms 或者1s的延迟。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
private boolean loadUntilInternal(int until) {
int index = resolveStartIndex();
if (index < 0) return true;

try {
while (index <= until) {
ScheduleSetSegment segment = facade.loadScheduleLogSegment(index);
if (segment == null) {
int nextIndex = facade.higherScheduleBaseOffset(index);
if (nextIndex < 0) return true;
index = nextIndex;
continue;
}

// 具体加载某个segment的地方
loadSegment(segment);
int nextIndex = facade.higherScheduleBaseOffset(index);
if (nextIndex < 0) return true;

index = nextIndex;
}
} catch (Throwable e) {
LOGGER.error("wheel load segment failed,currentSegmentOffset:{} until:{}", loadedCursor.baseOffset(), until, e);
QMon.loadSegmentFailed();
return false;
}

return true;
}

private void loadSegment(ScheduleSetSegment segment) {
final long start = System.currentTimeMillis();
try {
int baseOffset = segment.getSegmentBaseOffset();
long offset = segment.getWrotePosition();
if (!loadingCursor.shiftCursor(baseOffset, offset)) {
LOGGER.error("doLoadSegment error,shift loadingCursor failed,from {}-{} to {}-{}", loadingCursor.baseOffset(), loadingCursor.offset(), baseOffset, offset);
return;
}

WheelLoadCursor.Cursor loadedCursorEntry = loadedCursor.cursor();
// have loaded
// 已经加载
if (baseOffset < loadedCursorEntry.getBaseOffset()) return;

long startOffset = 0;
// last load action happened error
// 如果上次加载失败,则从上一次的位置恢复加载
if (baseOffset == loadedCursorEntry.getBaseOffset() && loadedCursorEntry.getOffset() > -1)
startOffset = loadedCursorEntry.getOffset();

LogVisitor<ScheduleIndex> visitor = segment.newVisitor(startOffset, config.getSingleMessageLimitSize());
try {
loadedCursor.shiftCursor(baseOffset, startOffset);

long currentOffset = startOffset;
// 考虑一种情况,当前delay segment正在append消息,所以是while,而loaded cursor的offset也是没加载一个消息更新的
while (currentOffset < offset) {
Optional<ScheduleIndex> recordOptional = visitor.nextRecord();
if (!recordOptional.isPresent()) break;
ScheduleIndex index = recordOptional.get();
currentOffset = index.getOffset() + index.getSize();
refresh(index);
loadedCursor.shiftOffset(currentOffset);
}
loadedCursor.shiftCursor(baseOffset);
LOGGER.info("loaded segment:{} {}", loadedCursor.baseOffset(), currentOffset);
} finally {
visitor.close();
}
} finally {
Metrics.timer("loadSegmentTimer").update(System.currentTimeMillis() - start, TimeUnit.MILLISECONDS);
}
}

还记得上一篇我们提到过,存储的时候,如果这个消息位于正在被wheel加载segment中,那么这个消息应该是会被加载到wheel中的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

private boolean iterateCallback(final ScheduleIndex index) {
long scheduleTime = index.getScheduleTime();
long offset = index.getOffset();
// 主要看一下这个canAdd
if (wheelTickManager.canAdd(scheduleTime, offset)) {
wheelTickManager.addWHeel(index);
return true;
}

return false;
}

// 就是cursor起作用的地方了
public boolean canAdd(long scheduleTime, long offset) {
WheelLoadCursor.Cursor currentCursor = loadingCursor.cursor();
int currentBaseOffset = currentCursor.getBaseOffset();
long currentOffset = currentCursor.getOffset();

// 根据延迟时间确定该消息位于哪个segment
int baseOffset = resolveSegment(scheduleTime);
// 小于当前loading cursor,则put int wheel
if (baseOffset < currentBaseOffset) return true;

// 正在加载
if (baseOffset == currentBaseOffset) {
// 根据cursor的offset判断
return currentOffset <= offset;
}
return false;
}

sender

sender包里结构如下图:

image.png
通过brokerGroup做分组,根据组批量发送,发送时是多线程发送,每个组互不影响,发送时也会根据实时broker的weight进行选择考虑broker进行发送。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

@Override
public void send(ScheduleIndex index) {
if (!BrokerRoleManager.isDelayMaster()) {
return;
}

boolean add;
try {
long waitTime = Math.abs(sendWaitTime);
// 入队
if (waitTime > 0) {
add = batchExecutor.addItem(index, waitTime, TimeUnit.MILLISECONDS);
} else {
add = batchExecutor.addItem(index);
}
} catch (InterruptedException e) {
return;
}
if (!add) {
reject(index);
}
}

@Override
public void process(List<ScheduleIndex> indexList) {
try {
// 发送处理逻辑在senderExecutor里
senderExecutor.execute(indexList, this, brokerService);
} catch (Exception e) {
LOGGER.error("send message failed,messageSize:{} will retry", indexList.size(), e);
retry(indexList);
}
}

// 以下为senderExecutor内容
void execute(final List<ScheduleIndex> indexList, final SenderGroup.ResultHandler handler, final BrokerService brokerService) {
// 分组
Map<SenderGroup, List<ScheduleIndex>> groups = groupByBroker(indexList, brokerService);
for (Map.Entry<SenderGroup, List<ScheduleIndex>> entry : groups.entrySet()) {
doExecute(entry.getKey(), entry.getValue(), handler);
}
}

private void doExecute(final SenderGroup group, final List<ScheduleIndex> list, final SenderGroup.ResultHandler handler) {
// 分组发送
group.send(list, sender, handler);
}

可以看到,投递时是根据server broker进行分组投递。看一下 SenderGroup 这个类

image.png
可以看到,每个组的投递是多线程,互不影响,不会存在某个组的server挂掉,导致其他组无法投递。并且这里如果存在某个组无法投递,重试时会选择其它的server broker进行重试。与此同时,在选择组时,会根据每个server broker的weight进行综合考量,即当前server broker有多少消息量要发送。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// 具体发送的地方
private void send(Sender sender, ResultHandler handler, BrokerGroupInfo groupInfo, String groupName, List<ScheduleIndex> list) {
try {
long start = System.currentTimeMillis();
List<ScheduleSetRecord> records = store.recoverLogRecord(list);
QMon.loadMsgTime(System.currentTimeMillis() - start);

Datagram response = sendMessages(records, sender);
release(records);
monitor(list, groupName);
if (response == null) {
handler.fail(list);
} else {
final int responseCode = response.getHeader().getCode();
final Map<String, SendResult> resultMap = getSendResult(response);

if (resultMap == null || responseCode != CommandCode.SUCCESS) {
if (responseCode == CommandCode.BROKER_REJECT || responseCode == CommandCode.BROKER_ERROR) {
// 该组熔断
groupInfo.markFailed();
}

monitorSendFail(list, groupInfo.getGroupName());

// 重试
handler.fail(list);
return;
}

Set<String> failedMessageIds = new HashSet<>();
boolean brokerRefreshed = false;
for (Map.Entry<String, SendResult> entry : resultMap.entrySet()) {
int resultCode = entry.getValue().getCode();
if (resultCode != MessageProducerCode.SUCCESS) {
failedMessageIds.add(entry.getKey());
}
if (!brokerRefreshed && resultCode == MessageProducerCode.BROKER_READ_ONLY) {
groupInfo.markFailed();
brokerRefreshed = true;
}
}
if (!brokerRefreshed) groupInfo.markSuccess();

// dispatch log 记录在这里产生
handler.success(records, failedMessageIds);
}
} catch (Throwable e) {
LOGGER.error("sender group send batch failed,broker:{},batch size:{}", groupName, list.size(), e);
handler.fail(list);
}
}

就是以上这些,关于QMQ的delay-server源码分析就是这些了,如果以后有机会会分析一下QMQ的其他模块源码,谢谢。

-------------The End-------------