RocketMQ 初始化

3. 初始化

[TOC]

NameServerBrokerRocetMQ中独立部署的集群。
下面我们将来分别介绍他们。

NameServer

NameServer管理Broker集群的注册信息并提供心跳来检测他们是否可用,持有关于Broker集群和队列的全部路由信息。接下来详细了解其具体功能和实现。

启动

NameServer的启动类为NamesrvController,其包含成员变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 后台线程,负责扫描失活的Broker,并销毁与其的链接
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"NSScheduledThread"));
// 用于加载NameServer的配置参数,配置路径 $HOME/namesrv/kvConfig.json
private final KVConfigManager kvConfigManager;
// 路由信息管理
private final RouteInfoManager routeInfoManager;
// 基于Netty实现远端服务,监听请求并转发到`DefaultRequestProcessor`,并根据请求码进行不同请求逻辑的处理
private RemotingServer remotingServer;
// Broker连接管理服务,用于监听Netty的Channel事件,当异常时从RouteInfoManager中删除相应Channel对应的路由信息
private BrokerHousekeepingService brokerHousekeepingService;
private ExecutorService remotingExecutor;
private Configuration configuration;

NameServer的启动过程如下:

1
2
3
4
5
6
7
8
9
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
//初始化服务
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
//启动Socekt链接,对外提供服务
controller.start();

先初始化,再启动服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public boolean initialize() {
//加载本地配置
this.kvConfigManager.load();
//初始化Netty远端服务
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
//远端请求处理线程池,当接收到请求会让remotingExecutor处理,避免占用过长Netty的workThread,降低Netty的并发处理能力。
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
// 向Netty服务注册默认处理器`xxx.namesrv.processor.DefaultRequestProcessor`
this.registerProcessor();
// 扫描不是活跃状态的Broker,将其剔除出路由信息中
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
// 定时向info日志中输出 配置信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
return true;
}
public void start() throws Exception {
// Netty远端服务开始监听端口,接收请求。
this.remotingServer.start();
}

提供服务

NameServer 接收到请求后,会被转发到 xxx.namesrv.processor.DefaultRequestProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
case RequestCode.REGISTER_BROKER:
// 注册Broker信息
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
case RequestCode.UNREGISTER_BROKER:
// 注销Broker
return this.unregisterBroker(ctx, request);
case RequestCode.GET_ROUTEINTO_BY_TOPIC:
// 跟据topic查询路由信息
return this.getRouteInfoByTopic(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_INFO:
return this.getBrokerClusterInfo(ctx, request);
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
return this.wipeWritePermOfBroker(ctx, request);
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
return getAllTopicListFromNameserver(ctx, request);
case RequestCode.DELETE_TOPIC_IN_NAMESRV:
return deleteTopicInNamesrv(ctx, request);
case RequestCode.GET_KVLIST_BY_NAMESPACE:
return this.getKVListByNamespace(ctx, request);
case RequestCode.GET_TOPICS_BY_CLUSTER:
return this.getTopicsByCluster(ctx, request);
case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
return this.getSystemTopicListFromNs(ctx, request);
case RequestCode.GET_UNIT_TOPIC_LIST:
return this.getUnitTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
return this.getHasUnitSubTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
return this.getHasUnitSubUnUnitTopicList(ctx, request);
case RequestCode.UPDATE_NAMESRV_CONFIG:
return this.updateConfig(ctx, request);
case RequestCode.GET_NAMESRV_CONFIG:
return this.getConfig(ctx, request);
default:
break;
}
return null;
}

关于NameServer,我们最关注的即是路由信息管理 RouteInfoManager,以下为其内部维护的信息。然后围绕这些信息进行提供查询和更新:

1
2
3
4
5
6
7
8
9
10
11
12
// broker连接的过期时间
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
// topic 2 broker队列
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
// broker名称 2 broker详细信息的集合,一个broker名称可以对应对个broker机器
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
// 集群名称 2 broker名称集合
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
// broker地址 2 broker 连接
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
// broker 地址 2 filter server
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

Broker启动初始化

BrokerRocketMQ中用于接收并保存生产者消息,并向服务消费者提供数据消费服务。
Broker的启动类为BrokerController,首先来看下其主要成员变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 消费位移管理器,管理队列当前的消费位移
private final ConsumerOffsetManager consumerOffsetManager;
//链接本Broker的消费者管理
private final ConsumerManager consumerManager;
//链接本Broker的生产者管理
private final ProducerManager producerManager;
// Netty的通道事件通知器,当连接断开或者异常时,从相应的管理器中移除连接
private final ClientHousekeepingService clientHousekeepingService;
// 向消费者提供服务的消息拉取处理器
private final PullMessageProcessor pullMessageProcessor;
// 当消费者未拉取到消息时,挂起拉取连接。当有消息时触发它们拉取消息
private final PullRequestHoldService pullRequestHoldService;
// 消息到达监听器(在store章节介绍过),当产生新消息时,会触发其,而它会进一步去告诉pullRequestHoldService里维护的挂起的拉取去消费消息
private final MessageArrivingListener messageArrivingListener;
// 用在adminProcessor上,用于跟与Broker连接的客户端进行通讯
private final Broker2Client broker2Client;
private final SubscriptionGroupManager subscriptionGroupManager;
private final ConsumerIdsChangeListener consumerIdsChangeListener;
//消息存储器
private MessageStore messageStore;
//topic 配置管理
private TopicConfigManager topicConfigManager;
// RPC请求客户端
private final BrokerOuterAPI brokerOuterAPI;
private RemotingServer remotingServer;
private RemotingServer fastRemotingServer;

启动

其启动过程先调用initialize()进行初始化,然后调用start()方法让Server监听端口开启服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public boolean initialize() throws ... {
boolean result = true;
//若是重启,加载之前的信息
result = result && this.topicConfigManager.load();
result = result && this.consumerOffsetManager.load();
result = result && this.subscriptionGroupManager.load();
if (result) {
//初始化信息存储器
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
this.brokerConfig);
this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
//load plugin
MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
this.messageStore = MessageStoreFactory.build(context, this.messageStore);
}
result = result && this.messageStore.load();
if (result) {
// 起两个 remoting Server,一个监听设置的端口,一个为设置的端口 -2
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
//向Server注册处理器
this.registerProcessor();
// 定时持久化消费进度
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
BrokerController.this.consumerOffsetManager.persist();
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
// ...
if (this.brokerConfig.getNamesrvAddr() != null) {
this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
}
}
return result;
}
public void registerProcessor() {
// 接收消息者发送来的消息
SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
sendProcessor.registerSendMessageHook(sendMessageHookList);
sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
// 向消费者提供消息拉取服务
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
//QueryMessageProcessor
//ClientManageProcessor
//ConsumerManageProcessor
//EndTransactionProcessor
//...

可以看到 Broker向外提供服务众多服务。
初始化完成后开始启动服务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void start() throws Exception {
if (this.messageStore != null) {
this.messageStore.start();
}
if (this.remotingServer != null) {
this.remotingServer.start();
}
if (this.fastRemotingServer != null) {
this.fastRemotingServer.start();
}
if (this.brokerOuterAPI != null) {
this.brokerOuterAPI.start();
}
//...
// 向NameServer注册其自身所拥有的信息,包括topic等信息
this.registerBrokerAll(true, false);
//开启30s定时注册
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
BrokerController.this.registerBrokerAll(true, false);
}
}, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);
//...
}

Producer

生产者类DefaultMQProducer是消息发送的壳子,其内部真正用来发送消息的类是DefaultMQProducerImpl。对于事务消息,其发送者为TransactionMQProducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
// 创建 MQ Client
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
// 注册Producer
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
// ...
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
// 启动MQ client
if (startFactory) {
mQClientFactory.start();
}
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "//
+ this.serviceState//
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
// 发送心跳到所有的Broker
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}

最重要的内容即是创建MQ client,深入其中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
// ...
// Broker回调Client时的处理器
this.clientRemotingProcessor = new ClientRemotingProcessor(this);
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
if (this.clientConfig.getNamesrvAddr() != null) {
// 更新NameServer
this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
}
//...
}
public MQClientAPIImpl(final NettyClientConfig nettyClientConfig, final ClientRemotingProcessor clientRemotingProcessor,
RPCHook rpcHook, final ClientConfig clientConfig) {
this.clientConfig = clientConfig;
topAddressing = new TopAddressing(MixAll.WS_ADDR, clientConfig.getUnitName());
this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);
this.clientRemotingProcessor = clientRemotingProcessor;
this.remotingClient.registerRPCHook(rpcHook);
//注册服务
this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);
}

实例化完成员变量后,触发start()方法;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
if (null == this.clientConfig.getNamesrvAddr()) {
//拉取NameServer地址信息
this.mQClientAPIImpl.fetchNameServerAddr();
}
// 开启连接,注意长连接即可以发送请求,同时也可以接收请求
this.mQClientAPIImpl.start();
// 开启各种任务
this.startScheduledTask();
// 开启拉取服务,只对consumer起作用
this.pullMessageService.start();
// 开启负载服务,只对consumer起作用
this.rebalanceService.start();
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
//...
}
}
}
// 开启后台异步任务
private void startScheduledTask() {
if (null == this.clientConfig.getNamesrvAddr()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
// 拉取 NameServer Addr
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
//拉取 路由信息
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
}
}, 10, this.clientConfig.getPollNameServerInteval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
//清楚下线的broker
MQClientInstance.this.cleanOfflineBroker();
//向所有broker发送心跳
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
// 定时持久化消费偏移信息
MQClientInstance.this.persistAllConsumerOffset();
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
MQClientInstance.this.adjustThreadPool();
}
}, 1, 1, TimeUnit.MINUTES);
}

通过已上代码发现,RocektMQ在处理这块时比较粗糙,在MQClientInstance里,ProducerConsumer的逻辑揉在一起。

Consumer

Consumer有两种模式,MQPullConsumerMQPushConsumer。推拉模式,拉模式需要自己手动去拉取更新,而推的方式则支持添加监听器,自动拉取(长轮询拉取,Push)消息进行消费。

由于Pull方式比较简单,因此后面我们主要分析Push的方式。

1
2
3
4
5
6
7
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
// 消息队列分配策略,即一个topic下所有queue按某种策略分配给不同的consumer
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
//消息发送实现
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}

再初始化前先订阅某个topic

1
consumer.subscribe(topic, "*");

然后,再注册消息监听器。当新消息到达时进行消费。

DefaultMQPushConsumer.start()进行初始化并开始消费消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// 检查配置
this.checkConfig();
// 复制订阅数据
this.copySubscription();
// 设置instanceName
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
// 获取并创建MQClient,与Producer初始化过程相同
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
// 设置负载均衡器属性
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
// 封装消息拉取API
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
// 获取消费偏移存储,集群消费时,其类型为RemoteFileOffsetStore(从Broker获取偏移);广播消费时,其类型为LocalFileOffsetStore。
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
}
this.offsetStore.load();
// 设置消费消息服务,当要求是 顺序消费时,其类型为 ConsumeMessageOrderlyService;否则为 ConsumeMessageConcurrentlyService。
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
mQClientFactory.start();
// 设置服务状态
this.serviceState = ServiceState.RUNNING;
break;
case //...
}
// 更新路由信息
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
// 向Broker发送心跳
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
// 首次重负载,即拉取决定需要消费哪些队列,并开始拉取消费
// 此外,`RebalanceService`服务会起一个定时任务,每20S进行一次调整消费重负载。对于新增的需要构建`PullRequest`参数发起消息拉取请求,对于剔除的则需要从本地移除,并让本地consumer不再消费相应的队列。
this.mQClientFactory.rebalanceImmediately();
}

至此各组件启动初始化的过程就介绍完了。
有了前几章节的铺垫,接下来的章节重点带大家一开看 消息发送到接收再到消费的过程。

您的支持是我创作源源不断的动力