4. store****
[TOC]
存储是RocketMQ中至关重要的一部分。DefaultMessageStore是RocketMQ中用于存储消息的入口。一条消息存储过程大致如图所示:
首先通过调用CommitLog.putMessage方法将消息追加到MappedFile文件中,然后再以异步的形式进行构建ConsumeQueue,如果用户设置了Key则同时进行构建Index。
我们将从DefaultMessageStore的初始化和其追加消息来分析其工作原理。
初始化
首先进行成员变量的初始化:
1 | public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager, |
CommitLog是消息存储中最重要的部分,因为其他损失了可以根据此部分来补齐。CommitLog内部也需要实例化成员变量
1 | public CommitLog(final DefaultMessageStore defaultMessageStore) { |
初始化完成员变量后,会执行一次load()方法。判断是否有历史文件,如果有则加载历史文件。
1 | public boolean load() { |
初始化并加载完后,开启后台保障线程:
1 | public void start() throws Exception { |
再细看,CommitLog的start方法:
1 | public void start() { |
初始化过程到此就结束了。
消息存储
消息存储入口在DefaultMessageStore.putMessage中:
1 | public PutMessageResult putMessage(final MessageExtBrokerInner msg) { |
mappedFile.appendMessage操作只是将消息写入buffer中,而FlushCommitLogService负责将buffer内容刷到磁盘中。FlushCommitLogService有三种实现:
- GroupCommitService: 负责同步刷盘;
- CommitRealTimeService: 执行周期200ms,将字节缓存区的字节写到
mappedFile对应的FileChannel中(PageCache)中(FileChannel.write(buffer)); - FlushRealTimeService:执行周期500ms,将
PageCache中的内容刷新到磁盘mappedByteBuffer.fore()(没有字节缓存去时)this.fileChannel.force(false)(有字节缓存区时)
消息回放
写完日志后,再继续看如何构建ConsumeQueue和IndexFile:
构建ConsumeQueue和IndexFile是由ReputMessageService服务发起,从字面意思也可理解:重放消息服务,意思是重新从CommitLog中捞起消息消费。
1 | private void doReput() { |
取出reputFromOffset之后的日志,调用doDispatch()方法进行调度请求:
1 | public void doDispatch(DispatchRequest req) { |
可以看出一个topic下的每个queue为一个文件独立存储。这样读取的时候即为顺序读取。
对于索引IndexFile,RocketMQ保存的结构如下:
1 | public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) { |
消息读取
DefaultMessageStore的getMessage用于读消息。
给定查询offset,首先去ConsumeQueue去查询出offset后所有的查询的Queue的消息对应的位移记录。然后通过这些位移信息去CommitLog中一条条将结果取出并返回。
1 | public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums, |
下图中, 粉红色的线即代表消息读取的过程;绿线代码消息存储的过程。