5. 消息发送并存储
发送分为同步和异步
的方式,本章主要解析同步方式,异步方式与同步方式很相似。
发送的过程,Producer
发送消息,Broker
接收消息并存储。
发送
发送过程,最主要逻辑在DefaultMQProducerImpl.sendDefaultImpl
中:
1 | private SendResult sendDefaultImpl(Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout) throws ... { |
可以看出整个过程:首先获取路由信息,然后按一定策略选择一个队列进行发送消息,当失败时会重试,最多重试timesTotal
次。
1 | private SendResult sendKernelImpl(final Message msg, final MessageQueue mq, final CommunicationMode communicationMode, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { |
接收
Broker
的fast Server接收到消息后,交给SendMessageProcessor
进行处理。以下为其主要核心逻辑:
1 | private RemotingCommand sendMessage(final ChannelHandlerContext ctx, final RemotingCommand request, final SendMessageContext sendMessageContext, final SendMessageRequestHeader requestHeader) throws RemotingCommandException { |
当消息为重试消息且超过设定的最大消费次数时,将会被放入死信队列中。
经过一些校验后,调用MessageStore.putMessage
(详见第3章 store
)存储消息,最后进行broker接收消息的统计和响应。