2. remoting
[TOC]
RocketMQ
的通讯模块实现的比较简单,不像Dubbo
那样给用户提供多种可选的方式,只有一种基于通讯框架Netty
的实现方式。
其最主要的两个类是NettyRemotingServer
、NettyRemotingClient
。以下为其类图:
首先我们来分析Server
部分,再看Client
部分。
Server:
1. 初始化
首先初始化Server
,初始化Netty 的Acceptor
EventLoopGroup和Selector
EventLoopGroup,并设置限流阈值和通道事件监听器。
1 | public NettyRemotingServer(final NettyServerConfig nettyServerConfig, final ChannelEventListener channelEventListener) { |
2. 启动
调用start()
方法,启动Server
。监听端口,开启服务,并开启后台线程,扫描超时的响应并作回调。
1 | public void start() { |
另外,NettyRemotingServer
提供了两个方法给使用方用于注册自己的业务处理器NettyRequestProcessor
(每个需要提供服务的组件,实现这个接口并向RemotingServer中注册):
1 | /** |
3. 调用
当请求到达后,Netty
进行解码后最终交给NettyServerHandler
进行处理:
1 | class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> { |
NettyServerHandler
调用NettyRemotingAbstract.processRequestCommand
来进行处理:
1 | public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { |
继续跟进,processRequestCommand
中会根据请求码去看是否注册了相应的RequestProcessor
,若没有则使用默认RequestProcessor
。
1 | public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) |
Client
1. 初始化与启动
其初始化和启动过程与Server
大致相似,且比Server
做的事要少。其主要是初始化io.netty.bootstrap.Boostrap
,设置参数,并启动事件处理线程和扫描响应线程等。
3. 调用
其主要提供三个调用形式:
- invokeSync:同步阻塞调用;
- invokeOneway:单向调用,不需要反馈结果;
- invokeAsync:异步调用,结果返回后触发回调逻辑。
以invokeSync
为例来讲解调用的过程,其他两种方式类似。
1 | public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis) |
首先获取Channel
,然后通过Channel
来发送请求。
1 | /** |
获取Channel
后,发起请求:
1 | public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) |
看完后,肯定会很奇怪:这里responseFuture
根本没有设置返回值啊,怎么返回的?
这是因为在Netty
Client初始化的时候设置了NettyClientHandler
,其最终会调用NettyRemotingAbstract
的processResponseCommand
方法:
1 | public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) { |
到此,RocketMQ的通讯模块就分析结束了。可以感受到,其实现的比较粗糙。