6. 消息拉取并消费
本章我们将分别从Consumer
和Broker
两个地方进行介绍。
Consumer 处理
在第四章初始化与启动
中,介绍了Consumer
的初始化启动过程,其中会启动一个RebalanceService
服务。它的作用就是定时去做重负载。
重负载的内容就是去Broker去查询当前所有的消费者然后根据消息分配策略计算出分配给自己的队列有哪些。最后跟本地进行比较,如果是新增的就发起消息的消费,如果是本地不再存在的则从本地删除,其他的则不变。
对于Consumer
刚启动的时候,也会触发一次重负载。
重负载的逻辑在RebalanceImpl.rebalanceByTopic
方法中。
1 | private void rebalanceByTopic(final String topic, final boolean isOrder) { |
特别需要说的是:对于有序消费,在重负载的时候会尝试锁住资源;如果锁资源失败则不进行消费。
对于新增的消费队列,consumer会构造拉取参数PullRequest
调用dispatchPullRequest
进行提交消费。可以看到,最终是将拉取请求丢进PullMessageService
的消息拉取队列中pullRequestQueue
。
1 | public void dispatchPullRequest(List<PullRequest> pullRequestList) { |
PullMessageService
中有个后台线程会从pullRequestQueue
不停取PullRequest
开始拉取消息。
拉取消费过程大致如下:
- 调用
pullAPIWrapper.pullKernelImpl
去拉取消息 - 对拉取回来的消息调用
PullCallback
进行消费
拉取消息请求的是Broker的PullMessageProcessor
,后面我们再来分析它。
1 | if (pullResult != null) { |
Broker 处理
1 | private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) |
拉取消息后,需要进行一次判断:
- 当拉取到数据后,如果配置不通过堆传输的话,就通过zero-copy的方式直接写socket返回
- 如果没有拉取到数据,则将请求挂起,放到
PullRequestHoldService
中。当有新消息产生时,再返回。长轮询
1 | switch (response.getCode()) { |
最后,保存Topic下该Queue的消费进度。
1 | // 保存 消费进度 |