RocketMQ 特性

7. 特性

顺序消息

RocketMQ无法保证消息的绝对顺序,即所有消息都按发送顺序被消费。因为当有N多个消费者时,可能有的消息者拿到后来的消息但由于消费的比其他消费者快导致后来的消息先消费。
但是使用方可以保证局部有序。

RocketMQ提供API,让用户自己选择消息发送到哪个Queue:

1
2
3
4
5
6
7
8
9
Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);

根据指定的id来选择发往哪个队列,这样同一标识的消息会按时间先后顺序发往队列。
再者,在Consumer重负载的时候,如果是顺序消息会锁住消息不然其他Consumer进行消费。
最后,当消息到达Consumer本地时,选择以顺序的形式进行消费ConsumeMessageOrderlyService,则可保证消息按时间顺序消费。

顺序性也带来一定的弊端:

  • 并行度会成为消息系统的瓶颈
  • 更多的异常处理。消费端出现问题,就会导致真个流程阻塞。

消息重复

RocketMQ并不能保证消息完全不重复,如果业务方需要此属性,需要业务方自己去重。

事务消息

关于事务消息,RocketMQ官方资料给出这样一张图,描述事务消息的流程图。

  1. 生产者向Broker发送Prepared消息后返回消息偏移;(TransactionProducer
  2. 本地事务执行;
  3. 根据本地事务的执行结果提交回滚或者提交;(EndTransactionProcessor
  4. ReputMessageService中,只会将非事务消息或者提交的事务消息位移加入ConsumeQueue中。
  5. 当生产者异常时,需要Broker发起回查。

截止到目前为止,RocketMQ事务消息这块的实现并不完整,缺少BrokerProducer的会查逻辑。
在网上看到其在3.x版本实现了其逻辑,而在4.0中进行了移除。

定时消息

RocketMQ只支持固定精度的定时消息,不能支持任意时间精度的消息。

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

其实现比较简单。大致过程如下:

  1. 在消息存储的时候会是否判断是延迟消息则更改消息的topic为SCHEDULE_TOPIC_XXXX,然后根据延迟级别更改queueId,然后存储到CommitLog中;
  2. ScheduleMessageService服务则从SCHEDULE_TOPIC_XXXX出捞出延迟数据,进行消费。如果到达指定时间则发送消息,如果没到则推迟到一定时间后再消费。

CommitLog.putMessage中处理事务消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE//
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
// 更改 topic 和 queueId
topic = ScheduleMessageService.SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}

ScheduleMessageService的逻辑:对于每个延迟级别分别提交DeliverDelayedMessageTimerTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void start() {
// 定时发送消息
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
// 提交延迟提交任务
if (timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
// 定时持久化发送进度
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
ScheduleMessageService.this.persist();
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}

任务执行时,首先从SCHEDULE_TOPIC_XXXX的每个级别的延时消费队列中取出offset位移之后添加的消息位置信息;其后,计算消息是否到达预期时间,如果没到则提交延时任务到指定时间点,如果到了则取出消息并将消息转变成原有的消息进行发送。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public void executeOnTimeup() {
ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel));
long failScheduleOffset = offset;
if (cq != null) {
//从ConsumeQueue中获取延时消息位移列表
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
long nextOffset = offset;
int i = 0;
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
long tagsCode = bufferCQ.getByteBuffer().getLong();
long now = System.currentTimeMillis();
// 应该传输的时间点
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long countdown = deliverTimestamp - now;
// 到达发送时间点
if (countdown <= 0) {
//取出消息
MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
if (msgExt != null) {
// 恢复到原始发送时的消息
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
// 存储正常消息后续可以被消费
PutMessageResult putMessageResult = ScheduleMessageService.this.defaultMessageStore.putMessage(msgInner);
if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { // 发送成功
continue;
} else {
// 存储失败后提交下一次任务
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_PERIOD);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
}
} else {
//没到时间
//推迟到 指定发送时间进行发送
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown);
// 更新延迟消息进度
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
}
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
// 调度下一次消费
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
// 更新进度
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {
bufferCQ.release();
}
}
}
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE);
}

Reference

您的支持是我创作源源不断的动力