RocketMQ 存储

4. store****

[TOC]

存储是RocketMQ中至关重要的一部分。
DefaultMessageStoreRocketMQ中用于存储消息的入口。一条消息存储过程大致如图所示:

首先通过调用CommitLog.putMessage方法将消息追加到MappedFile文件中,然后再以异步的形式进行构建ConsumeQueue,如果用户设置了Key则同时进行构建Index

我们将从DefaultMessageStore的初始化和其追加消息来分析其工作原理。

初始化

首先进行成员变量的初始化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
//当有新消息来时触发监听器(后面再讲Consumer的长轮询的时候会讲到)
this.messageArrivingListener = messageArrivingListener;
this.brokerConfig = brokerConfig;
this.messageStoreConfig = messageStoreConfig;
//状态管理
this.brokerStatsManager = brokerStatsManager;
//MapppedFile文件空间申请服务
this.allocateMappedFileService = new AllocateMappedFileService(this);
//提交日志,用于存放消息的地方,最关键的部分
this.commitLog = new CommitLog(this);
//消费队列表,用于保存每个topic下,每个queue的消费进度
this.consumeQueueTable = new ConcurrentHashMap<>(32);
//负责将消费队列表进行刷盘的服务
this.flushConsumeQueueService = new FlushConsumeQueueService();
//负责定时清理旧消息文件的后台服务
this.cleanCommitLogService = new CleanCommitLogService();
//负责定时清理旧消费队列文件的后台服务
this.cleanConsumeQueueService = new CleanConsumeQueueService();
// 存储状态的服务
this.storeStatsService = new StoreStatsService();
//索引服务
this.indexService = new IndexService(this);
// 主从同步服务
this.haService = new HAService(this);
//重放消息线程服务,用于从CommitLog中构建新增消息的ConsumeQueue并构建Index
this.reputMessageService = new ReputMessageService();
//延时消息服务
this.scheduleMessageService = new ScheduleMessageService(this);
// 内存字节缓冲区
this.transientStorePool = new TransientStorePool(messageStoreConfig);
if (messageStoreConfig.isTransientStorePoolEnable()) {
this.transientStorePool.init();
}
//开启文件分配后台线程
this.allocateMappedFileService.start();
this.indexService.start();
}

CommitLog是消息存储中最重要的部分,因为其他损失了可以根据此部分来补齐。CommitLog内部也需要实例化成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public CommitLog(final DefaultMessageStore defaultMessageStore) {
this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
this.defaultMessageStore = defaultMessageStore;
// 当开始同步刷盘时,使用GroupCommitService,非同步时使用FlushRealTimeService。
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
this.flushCommitLogService = new GroupCommitService();
} else {
this.flushCommitLogService = new FlushRealTimeService();
}
//当开启内存字节缓冲区时,使用此服务进行刷盘
this.commitLogService = new CommitRealTimeService();
// 日志写入详细内容
this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
}

初始化完成员变量后,会执行一次load()方法。判断是否有历史文件,如果有则加载历史文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public boolean load() {
boolean result = true;
// 判断是否已经有文件存在,即判断是否是重启
boolean lastExitOK = !this.isTempFileExist();
log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
// 延时消息加载
if (null != scheduleMessageService) {
result = result && this.scheduleMessageService.load();
}

// load Commit Log
result = result && this.commitLog.load();

// load Consume Queue
result = result && this.loadConsumeQueue(); // TODO 待读
//
if (result) {
this.storeCheckpoint =
new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
//加载Index文件
this.indexService.load(lastExitOK);
//重新计算FlushOffset、CommittedOffset
this.recover(lastExitOK);
log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
}
if (!result) {
this.allocateMappedFileService.shutdown();
}
return result;
}

初始化并加载完后,开启后台保障线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void start() throws Exception {
//开始消费队列表刷盘服务
this.flushConsumeQueueService.start();
// 开始后台消息日志刷盘服务(同步和异步)
this.commitLog.start();
this.storeStatsService.start();
//开启 延时消息
if (this.scheduleMessageService != null && SLAVE != messageStoreConfig.getBrokerRole()) {
this.scheduleMessageService.start();
}
//开启消息重放服务,建立Index和ConsumeQueue
if (this.getMessageStoreConfig().isDuplicationEnable()) {
this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
} else {
this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
}
this.reputMessageService.start();
//开启主从同步服务
this.haService.start();
this.createTempFile();
//开启周期性清理旧文件服务
this.addScheduleTask();
this.shutdown = false;
}

再细看,CommitLogstart方法:

1
2
3
4
5
6
7
8
public void start() {
//开启后台刷盘服务
this.flushCommitLogService.start();
// 只有开启内存字节缓冲区时,才会开始commitLogService
if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
this.commitLogService.start();
}
}

初始化过程到此就结束了。

消息存储

消息存储入口在DefaultMessageStore.putMessage中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
msg.setStoreTimestamp(System.currentTimeMillis());
// crc校验码,防止文件破损
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
String topic = msg.getTopic();
int queueId = msg.getQueueId();
// 定时消息处理
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE//
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
//延迟消息放到 SCHEDULE_TOPIC_XXXX topic的队列中
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = ScheduleMessageService.SCHEDULE_TOPIC;
// 对于不同延时级别的消息放到不同的队列中 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
// 获取写入映射文件
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
// 获取写入锁
lockForPutMessage(); //spin...
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// beginLockTimestamp 为了保证一个全局的有序性
msg.setStoreTimestamp(beginLockTimestamp);
// 当不存在映射文件时,进行创建
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
// 存储消息,调用DefaultAppendMessageCallback将msg以RocketMQ要求的格式存入MappedFile 的Buffer(writeBuffer或者mappedByteBuffer)中
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE: // 当文件尾时,获取新的映射文件,并进行插入
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create maped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
case ...;
}
eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
// 释放写入锁
releasePutMessageLock();
}
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// 进行同步||异步 flush||commit
GroupCommitRequest request = null;
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
// 同步刷新,将buffer中的内容刷新到磁盘
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (msg.isWaitStoreMsgOK()) {
request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
//同步等待刷新完成
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: " + msg.getTags()
+ " client address: " + msg.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
service.wakeup();
}
} else { // 异步刷新
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
flushCommitLogService.wakeup(); // important:唤醒commitLog线程,进行flush
} else {
//这一步是因为用了缓存池,所以需要先将缓存池的内容写入fileChannel中,再刷到磁盘
commitLogService.wakeup();
}
}
// 如果是同步Master,同步到从节点
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
if (msg.isWaitStoreMsgOK()) {
if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
if (null == request) {
request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
}
service.putRequest(request);
// 唤醒WriteSocketService
service.getWaitNotifyObject().wakeupAll();
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: "
+ msg.getTags() + " client address: " + msg.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
} else {
putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}
}
}
return putMessageResult;
}

mappedFile.appendMessage操作只是将消息写入buffer中,而FlushCommitLogService负责将buffer内容刷到磁盘中。
FlushCommitLogService有三种实现:

  • GroupCommitService: 负责同步刷盘;
  • CommitRealTimeService: 执行周期200ms,将字节缓存区的字节写到mappedFile对应的FileChannel(PageCache)中(FileChannel.write(buffer));
  • FlushRealTimeService:执行周期500ms,将PageCache中的内容刷新到磁盘
    • mappedByteBuffer.fore()(没有字节缓存去时)
    • this.fileChannel.force(false)(有字节缓存区时)

消息回放

写完日志后,再继续看如何构建ConsumeQueueIndexFile:

构建ConsumeQueueIndexFile是由ReputMessageService服务发起,从字面意思也可理解:重放消息服务,意思是重新从CommitLog中捞起消息消费。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
private void doReput() {
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
//
if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
break;
}
// 获取上一次回放偏移reputFromOffset之后的日志内容
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
if (result != null) {
try {
this.reputFromOffset = result.getStartOffset();
// 遍历MappedByteBuffer
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
// 一条条取出消息,并提取日志信息,如偏移,日志大小,存储时间,tagCode 等
DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
int size = dispatchRequest.getMsgSize(); // 消息长度
// 根据请求的结果处理
if (dispatchRequest.isSuccess()) { // 读取成功
if (size > 0) {
// 执行调用请求,将消息的偏移等信息追加到ConsumeQueue中,并建立索引
DefaultMessageStore.this.doDispatch(dispatchRequest);
// 通知有新消息
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
//通知有心的消息到达
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode());
}
// FIXED BUG By shijia
this.reputFromOffset += size;
readSize += size;
// 统计
if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
.addAndGet(dispatchRequest.getMsgSize());
}
} else if (size == 0) { // 读取到MappedFile文件尾
this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
readSize = result.getSize();
}
} else if (!dispatchRequest.isSuccess()) { // 读取失败
if (size > 0) { // 读取到Message却不是Message
log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
this.reputFromOffset += size;
} else { // 读取到Blank却不是Blank
doNext = false;
if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",
this.reputFromOffset);

this.reputFromOffset += result.getSize() - readSize;
}
}
}
}
} finally {
result.release();
}
} else {
doNext = false;
}
}
}

取出reputFromOffset之后的日志,调用doDispatch()方法进行调度请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void doDispatch(DispatchRequest req) {
//当消息为非事务或者事务中提交阶段时,构建ConsumeQueue
final int tranType = MessageSysFlag.getTransactionValue(req.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE: DefaultMessageStore.this.putMessagePositionInfo(req.getTopic(), req.getQueueId(), req.getCommitLogOffset(), req.getMsgSize(),
req.getTagsCode(), req.getStoreTimestamp(), req.getConsumeQueueOffset());
break;
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
}
if (DefaultMessageStore.this.getMessageStoreConfig().isMessageIndexEnable()) {
// 创建索引 DefaultMessageStore.this.indexService.buildIndex(req);
}
}
//将消息位置信息追加到ConsumeQueue中
public void putMessagePositionInfo(String topic, int queueId, long offset, int size, long tagsCode, long storeTimestamp,
long logicOffset) {
//通过topic和queueId查找ConsumeQueue,没找到时创建
ConsumeQueue cq = this.findConsumeQueue(topic, queueId);
cq.putMessagePositionInfoWrapper(offset, size, tagsCode, storeTimestamp, logicOffset);
}

可以看出一个topic下的每个queue为一个文件独立存储。这样读取的时候即为顺序读取。

对于索引IndexFileRocketMQ保存的结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
if (this.indexHeader.getIndexCount() < this.indexNum) {
//生成key的hash值
int keyHash = indexKeyHashMethod(key);
//根据hash值获取其所对应的槽位slot
int slotPos = keyHash % this.hashSlotNum;
//计算slot在磁盘上的物理位移
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
FileLock fileLock = null;
//获取上一个插入的在同一个槽中的索引信息所在位置值
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
slotValue = invalidIndex;
}
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
timeDiff = timeDiff / 1000;
if (this.indexHeader.getBeginTimestamp() <= 0) {
timeDiff = 0;
} else if (timeDiff > Integer.MAX_VALUE) {
timeDiff = Integer.MAX_VALUE;
} else if (timeDiff < 0) {
timeDiff = 0;
}
// 获得当前可写入值的位置
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ this.indexHeader.getIndexCount() * indexSize;
//写入索引值
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
// 将当前记录的索引号替换到之前在hash中的槽值
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
if (this.indexHeader.getIndexCount() <= 1) {
this.indexHeader.setBeginPhyOffset(phyOffset);
this.indexHeader.setBeginTimestamp(storeTimestamp);
}
this.indexHeader.incHashSlotCount();
this.indexHeader.incIndexCount();
this.indexHeader.setEndPhyOffset(phyOffset);
this.indexHeader.setEndTimestamp(storeTimestamp);
return true;
}
return false;
}

消息读取

DefaultMessageStoregetMessage用于读消息。
给定查询offset,首先去ConsumeQueue去查询出offset后所有的查询的Queue的消息对应的位移记录。然后通过这些位移信息去CommitLog中一条条将结果取出并返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums,
final SubscriptionData subscriptionData) {
long beginTime = this.getSystemClock().now();
GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
long nextBeginOffset = offset;
long minOffset = 0;
long maxOffset = 0;
GetMessageResult getResult = new GetMessageResult();
final long maxOffsetPy = this.commitLog.getMaxOffset();
// 根据Topic和queueId找到ConsumeQueue
ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
minOffset = consumeQueue.getMinOffsetInQueue(); // 消费队列 最小队列编号
maxOffset = consumeQueue.getMaxOffsetInQueue(); // 消费队列 最大队列编号
//跟据消费offSet找到该offSet后的所有消费队列buffer
SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
if (bufferConsumeQueue != null) {
try {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
long nextPhyFileStartOffset = Long.MIN_VALUE;
long maxPhyOffsetPulling = 0;
int i = 0;
final int maxFilterMessageCount = 16000;
final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
// 从消费队列中一条条取出消息位置信息 【物理offset、消息长度、消息tagsCode】
long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
maxPhyOffsetPulling = offsetPy;
if (nextPhyFileStartOffset != Long.MIN_VALUE) {
if (offsetPy < nextPhyFileStartOffset)
continue;
}
boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(), isInDisk)) {
break;
}
//筛选出consumer需要的
if (this.messageFilter.isMessageMatched(subscriptionData, tagsCode)) {
//从CommitLog中根据位移取出消息
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
if (selectResult != null) {
this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
getResult.addMessage(selectResult);
status = GetMessageStatus.FOUND;
nextPhyFileStartOffset = Long.MIN_VALUE;
} else {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.MESSAGE_WAS_REMOVING;
}
nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
}
} //...
}
if (diskFallRecorded) {
long fallBehind = maxOffsetPy - maxPhyOffsetPulling;
brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind);
}
nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);
} finally {
bufferConsumeQueue.release();
}
}
// 设置返回结果
getResult.setStatus(status);
getResult.setNextBeginOffset(nextBeginOffset);
getResult.setMaxOffset(maxOffset);
getResult.setMinOffset(minOffset);
return getResult;
}

下图中, 粉红色的线即代表消息读取的过程;绿线代码消息存储的过程。

Reference

您的支持是我创作源源不断的动力