3. 初始化
[TOC]
NameServer
和Broker
是RocetMQ
中独立部署的集群。
下面我们将来分别介绍他们。
NameServer
NameServer
管理Broker
集群的注册信息并提供心跳来检测他们是否可用,持有关于Broker集群和队列的全部路由信息。接下来详细了解其具体功能和实现。
启动
NameServer
的启动类为NamesrvController
,其包含成员变量:
1 | // 后台线程,负责扫描失活的Broker,并销毁与其的链接 |
NameServer
的启动过程如下:
1 | final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig); |
先初始化,再启动服务。
1 | public boolean initialize() { |
提供服务
NameServer
接收到请求后,会被转发到 xxx.namesrv.processor.DefaultRequestProcessor
:
1 | public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { |
关于NameServer
,我们最关注的即是路由信息管理 RouteInfoManager
,以下为其内部维护的信息。然后围绕这些信息进行提供查询和更新:
1 | // broker连接的过期时间 |
Broker启动初始化
Broker
是RocketMQ
中用于接收并保存生产者消息,并向服务消费者提供数据消费服务。
Broker的启动类为BrokerController
,首先来看下其主要成员变量:
1 | // 消费位移管理器,管理队列当前的消费位移 |
启动
其启动过程先调用initialize()
进行初始化,然后调用start()
方法让Server
监听端口开启服务。
1 | public boolean initialize() throws ... { |
可以看到 Broker
向外提供服务众多服务。
初始化完成后开始启动服务:
1 | public void start() throws Exception { |
Producer
生产者类DefaultMQProducer
是消息发送的壳子,其内部真正用来发送消息的类是DefaultMQProducerImpl
。对于事务消息,其发送者为TransactionMQProducer
。
1 | public void start(final boolean startFactory) throws MQClientException { |
最重要的内容即是创建MQ client,深入其中:
1 | public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) { |
实例化完成员变量后,触发start()
方法;
1 | public void start() throws MQClientException { |
通过已上代码发现,RocektMQ
在处理这块时比较粗糙,在MQClientInstance
里,Producer
和Consumer
的逻辑揉在一起。
Consumer
Consumer
有两种模式,MQPullConsumer
和MQPushConsumer
。推拉模式,拉模式需要自己手动去拉取更新,而推的方式则支持添加监听器,自动拉取(长轮询拉取,Push)消息进行消费。
由于Pull
方式比较简单,因此后面我们主要分析Push
的方式。
1 | public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy) { |
再初始化前先订阅某个topic
1 | consumer.subscribe(topic, "*"); |
然后,再注册消息监听器。当新消息到达时进行消费。
DefaultMQPushConsumer.start()
进行初始化并开始消费消息。
1 | public void start() throws MQClientException { |
至此各组件启动初始化的过程就介绍完了。
有了前几章节的铺垫,接下来的章节重点带大家一开看 消息发送到接收再到消费的过程。