Dubbo 服务暴露

服务暴露过程解析

当在XML文件中配置下面内容,启动时Dubbo即会发布demoService服务,服务调用者皆可并在注册中心查到此服务提供者,并向其发起调用:

1
2
3
4
5
6
<dubbo:application name="demo-provider"/>
<dubbo:registry address="multicast://224.5.6.7:1234"/>
<dubbo:protocol name="dubbo" port="32321"/>
<bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl"/>
//服务发布
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/>

其中<dubbo:service>用来初始化发布服务,其会被DubboBeanDefinitionParser解析成ServiceBean,并放在Spring容器中。afterPropertiesSet方法初始化ServiceBean

其主要是从Spring容器中获取ProviderApplicationModuleRegisteriesMonitorProtocol对象设置到属性中。

  • 没有配置delay属性时,直接调用export暴露服务
  • 配置了delay属性时,则会在Spring实例完所有bean后,发布ContextRefreshEvent事件时,调用ServiceBean(实现ApplicationListener接口)的onApplicationEvent方法进行export
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void afterPropertiesSet() throws Exception {
if (getProvider() == null) {
// 获取所有Provider
Map<String, ProviderConfig> providerConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProviderConfig.class, false, false);
if (providerConfigMap != null && providerConfigMap.size() > 0) {
Map<String, ProtocolConfig> protocolConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class, false, false);
if ((protocolConfigMap == null || protocolConfigMap.size() == 0)
&& providerConfigMap.size() > 1) { // backward compatibility
List<ProviderConfig> providerConfigs = new ArrayList<ProviderConfig>();
for (ProviderConfig config : providerConfigMap.values()) {
if (config.isDefault() != null && config.isDefault().booleanValue()) {
providerConfigs.add(config);
}
}
if (!providerConfigs.isEmpty()) {
setProviders(providerConfigs);
}
}
}
}
...
if (!isDelay()) {
export();
}
}

export方法最关键的部分是执行doExportUrls方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
private void doExportUrls() {
//获取注册中心的URL,向多个注册中心发布服务
List<URL> registryURLs = loadRegistries(true);
// protocols即为`ServiceBean`初始化时从`Spring`容器中获取的对象集合
//用Protocol挨个暴露服务
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
//
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
String name = protocolConfig.getName();
if (name == null || name.length() == 0) {
name = "dubbo";
}
// 拼装参数
Map<String, String> map = new HashMap<String, String>();
map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, provider, Constants.DEFAULT_KEY);
appendParameters(map, protocolConfig);
appendParameters(map, this);
...
String scope = url.getParameter(Constants.SCOPE_KEY);
// scope为local时,只向本地暴露服务
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
exportLocal(url);
}
// scope为remote时,向注册中心暴露服务
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (registryURLs != null && !registryURLs.isEmpty()) {
for (URL registryURL : registryURLs) {
// registryURL:
url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
//通过代理工厂生成一个Invoker
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
//创建一个代理Invoker,包装Invoker并带有元数据
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
//调用protocol暴露服务
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} ...
}
}

接下来,就是Protocol进行暴露以及发布服务:
ServiceBean中的Protocol对应的是自适应类型Protocol,调用export方法时,其会根据参数Invoker的URL属性url.getProtocol()来判定调用哪个Protocol对象。上文中的registryURL.getProtocolregistry,所以回去调用RegistryProtocol(会先调用包装类ProtocolListenerWrapperProtocolFilterWrapper)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//本地暴露服务
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
URL registryUrl = getRegistryUrl(originInvoker);
//获取注册组件
final Registry registry = getRegistry(originInvoker);
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
boolean register = registedProviderUrl.getParameter("register", true);
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);
if (register) {
register(registryUrl, registedProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl);
}

上面的逻辑,可以分为两大部分:第一部分就是 本地服务暴露的逻辑,另一部分是注册服务的逻辑。因此,下面分别从这两部分说:

本地服务暴露:

doLocalExport就是服务本地暴露的主体代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
//getProviderUrl提取出要暴露的服务Url信息:dubbo://172.19.154.24:32321/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=172.19.154.24&bind.port=32321&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=5464&qos.port=22222&scope=remote&side=provider&timestamp=1520158786026
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
// protocol.export(invokerDelegete)根据url.getProtocol最终调用DubboProtocol的export方法
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return exporter;
}
//DubboProtocol.export():
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
//key: com.alibaba.dubbo.demo.DemoService:32321
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
// *** 当RPC调用时,也通过key在这个exporterMap中取Exporter ***
exporterMap.put(key, exporter);
...
// 创建Server,监听端口,开启服务:
openServer(url);
optimizeSerialization(url);
return exporter;
}
1
2
3
openServer
-> createServer
-> Exchangers.bind(url, requestHandler)

其中requestHandlerDubboProtocol的成员变量,当客户端发送请求时将由它进行处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
...
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
// 最终同反射的方式调用业务逻辑
return invoker.invoke(inv);
}
}
...

//从exporterMap中获取Exporter和Invoker
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
...
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
return exporter.getInvoker();
}

Exchangers.bind(url, requestHandler)监听端口,开启服务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Exchangers.bind(url, requestHandler)
-> HeaderExchanger.bind()

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
// 获取Transporter是网络传输层的拓展,根据url中的server或者transporter来确定具体类型,这里先设定为Netty
return getTransporter().bind(url, handler);
}

//NettyTransporter.bind()
public Server bind(URL url, ChannelHandler handler) throws RemotingException {
return new NettyServer(url, handler);
}

NettyServer是监听端口,暴露服务的关键。其先设置bindAddressbindIpbindPort属性,然后调用doOpen()函数开启服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
// ChannelHandlers.wrap()对handler做了一层包装,在其中加上了心跳处理、多消息处理以及`Dispatcher`的逻辑(Dispatcher作用是消息派发)
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
ChannelHandlers.wrap
->ChannelHandlers.getInstance().wrapInternal
-> new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url)))
}

//熟悉Netty的都明白这段的含义:设置boss线程池、work线程池大小,设置pipeline,启动服务监听端口
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);

final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// bind
channel = bootstrap.bind(getBindAddress());
}

服务注册

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
URL registryUrl = getRegistryUrl(originInvoker);
//获取注册器
final Registry registry = getRegistry(originInvoker);
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);

//to judge to delay publish whether or not
boolean register = registedProviderUrl.getParameter("register", true);
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);
if (register) {
// 注册
register(registryUrl, registedProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
//监听
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
...
}
//dubbo官方文档中的demo服务默认用`MulticastRegistry`作注册器。
public void register(URL registryUrl, URL registedProviderUrl) {
//获取Registry
Registry registry = registryFactory.getRegistry(registryUrl);
// 注册服务
registry.register(registedProviderUrl);
}
//以MulticastRegistry为例,最终会调用`doRegister(URL url)`服务:
protected void doRegister(URL url) {
broadcast(Constants.REGISTER + " " + url.toFullString());
}
// 广播注册消息
private void broadcast(String msg) {
try {
byte[] data = (msg + "\n").getBytes();
DatagramPacket hi = new DatagramPacket(data, data.length, mutilcastAddress, mutilcastPort);
mutilcastSocket.send(hi);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}

总结起来:

您的支持是我创作源源不断的动力