2. remoting
[TOC]
RocketMQ的通讯模块实现的比较简单,不像Dubbo那样给用户提供多种可选的方式,只有一种基于通讯框架Netty的实现方式。
其最主要的两个类是NettyRemotingServer、NettyRemotingClient。以下为其类图:
首先我们来分析Server部分,再看Client部分。
Server:
1. 初始化
首先初始化Server,初始化Netty 的AcceptorEventLoopGroup和SelectorEventLoopGroup,并设置限流阈值和通道事件监听器。
1 | public NettyRemotingServer(final NettyServerConfig nettyServerConfig, final ChannelEventListener channelEventListener) { |
2. 启动
调用start()方法,启动Server。监听端口,开启服务,并开启后台线程,扫描超时的响应并作回调。
1 | public void start() { |
另外,NettyRemotingServer提供了两个方法给使用方用于注册自己的业务处理器NettyRequestProcessor(每个需要提供服务的组件,实现这个接口并向RemotingServer中注册):
1 | /** |
3. 调用
当请求到达后,Netty进行解码后最终交给NettyServerHandler进行处理:
1 | class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> { |
NettyServerHandler调用NettyRemotingAbstract.processRequestCommand来进行处理:
1 | public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { |
继续跟进,processRequestCommand中会根据请求码去看是否注册了相应的RequestProcessor,若没有则使用默认RequestProcessor。
1 | public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) |
Client
1. 初始化与启动
其初始化和启动过程与Server大致相似,且比Server做的事要少。其主要是初始化io.netty.bootstrap.Boostrap,设置参数,并启动事件处理线程和扫描响应线程等。
3. 调用
其主要提供三个调用形式:
- invokeSync:同步阻塞调用;
- invokeOneway:单向调用,不需要反馈结果;
- invokeAsync:异步调用,结果返回后触发回调逻辑。
以invokeSync为例来讲解调用的过程,其他两种方式类似。
1 | public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis) |
首先获取Channel,然后通过Channel来发送请求。
1 | /** |
获取Channel后,发起请求:
1 | public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) |
看完后,肯定会很奇怪:这里responseFuture根本没有设置返回值啊,怎么返回的?
这是因为在NettyClient初始化的时候设置了NettyClientHandler,其最终会调用NettyRemotingAbstract的processResponseCommand方法:
1 | public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) { |
到此,RocketMQ的通讯模块就分析结束了。可以感受到,其实现的比较粗糙。