7. 特性
顺序消息
RocketMQ无法保证消息的绝对顺序,即所有消息都按发送顺序被消费。因为当有N多个消费者时,可能有的消息者拿到后来的消息但由于消费的比其他消费者快导致后来的消息先消费。
但是使用方可以保证局部有序。
RocketMQ提供API,让用户自己选择消息发送到哪个Queue:
1 | Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); |
根据指定的id来选择发往哪个队列,这样同一标识的消息会按时间先后顺序发往队列。
再者,在Consumer
重负载的时候,如果是顺序消息会锁住消息不然其他Consumer
进行消费。
最后,当消息到达Consumer
本地时,选择以顺序的形式进行消费ConsumeMessageOrderlyService
,则可保证消息按时间顺序消费。
顺序性也带来一定的弊端:
- 并行度会成为消息系统的瓶颈
- 更多的异常处理。消费端出现问题,就会导致真个流程阻塞。
消息重复
RocketMQ并不能保证消息完全不重复,如果业务方需要此属性,需要业务方自己去重。
事务消息
关于事务消息,RocketMQ官方资料给出这样一张图,描述事务消息的流程图。
- 生产者向Broker发送
Prepared
消息后返回消息偏移;(TransactionProducer) - 本地事务执行;
- 根据本地事务的执行结果提交回滚或者提交;(EndTransactionProcessor)
- 在
ReputMessageService
中,只会将非事务消息或者提交的事务消息位移加入ConsumeQueue
中。 - 当生产者异常时,需要
Broker
发起回查。
截止到目前为止,RocketMQ
事务消息这块的实现并不完整,缺少Broker
到Producer
的会查逻辑。
在网上看到其在3.x
版本实现了其逻辑,而在4.0
中进行了移除。
定时消息
RocketMQ
只支持固定精度的定时消息,不能支持任意时间精度的消息。
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
其实现比较简单。大致过程如下:
- 在消息存储的时候会是否判断是延迟消息则更改消息的topic为
SCHEDULE_TOPIC_XXXX
,然后根据延迟级别更改queueId,然后存储到CommitLog
中; ScheduleMessageService
服务则从SCHEDULE_TOPIC_XXXX
出捞出延迟数据,进行消费。如果到达指定时间则发送消息,如果没到则推迟到一定时间后再消费。
CommitLog.putMessage中处理事务消息:
1 | if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE// |
ScheduleMessageService的逻辑:对于每个延迟级别分别提交DeliverDelayedMessageTimerTask
。
1 | public void start() { |
任务执行时,首先从SCHEDULE_TOPIC_XXXX
的每个级别的延时消费队列中取出offset位移之后添加的消息位置信息;其后,计算消息是否到达预期时间,如果没到则提交延时任务到指定时间点,如果到了则取出消息并将消息转变成原有的消息进行发送。
1 | public void executeOnTimeup() { |