4. store****
[TOC]
存储是RocketMQ
中至关重要的一部分。DefaultMessageStore
是RocketMQ
中用于存储消息的入口。一条消息存储过程大致如图所示:
首先通过调用CommitLog.putMessage
方法将消息追加到MappedFile
文件中,然后再以异步的形式进行构建ConsumeQueue
,如果用户设置了Key则同时进行构建Index
。
我们将从DefaultMessageStore
的初始化和其追加消息来分析其工作原理。
初始化
首先进行成员变量的初始化:
1 | public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager, |
CommitLog
是消息存储中最重要的部分,因为其他损失了可以根据此部分来补齐。CommitLog
内部也需要实例化成员变量
1 | public CommitLog(final DefaultMessageStore defaultMessageStore) { |
初始化完成员变量后,会执行一次load()
方法。判断是否有历史文件,如果有则加载历史文件。
1 | public boolean load() { |
初始化并加载完后,开启后台保障线程:
1 | public void start() throws Exception { |
再细看,CommitLog
的start
方法:
1 | public void start() { |
初始化过程到此就结束了。
消息存储
消息存储入口在DefaultMessageStore.putMessage
中:
1 | public PutMessageResult putMessage(final MessageExtBrokerInner msg) { |
mappedFile.appendMessage
操作只是将消息写入buffer
中,而FlushCommitLogService
负责将buffer
内容刷到磁盘中。FlushCommitLogService
有三种实现:
- GroupCommitService: 负责同步刷盘;
- CommitRealTimeService: 执行周期200ms,将字节缓存区的字节写到
mappedFile
对应的FileChannel
中(PageCache)
中(FileChannel.write(buffer)); - FlushRealTimeService:执行周期500ms,将
PageCache
中的内容刷新到磁盘mappedByteBuffer.fore()
(没有字节缓存去时)this.fileChannel.force(false)
(有字节缓存区时)
消息回放
写完日志后,再继续看如何构建ConsumeQueue
和IndexFile
:
构建ConsumeQueue
和IndexFile
是由ReputMessageService
服务发起,从字面意思也可理解:重放消息服务
,意思是重新从CommitLog
中捞起消息消费。
1 | private void doReput() { |
取出reputFromOffset
之后的日志,调用doDispatch()
方法进行调度请求:
1 | public void doDispatch(DispatchRequest req) { |
可以看出一个topic下的每个queue为一个文件独立存储。这样读取的时候即为顺序读取。
对于索引IndexFile
,RocketMQ
保存的结构如下:
1 | public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) { |
消息读取
DefaultMessageStore
的getMessage
用于读消息。
给定查询offset
,首先去ConsumeQueue
去查询出offset
后所有的查询的Queue的消息对应的位移记录。然后通过这些位移信息去CommitLog
中一条条将结果取出并返回。
1 | public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums, |
下图中, 粉红色的线即代表消息读取的过程;绿线代码消息存储的过程。