Dubbo 服务引用

服务引用过程解析

调用服务时,在XML中配置<dubbo:reference>和注册中心即可像调用一个内存方法一样调用远端服务。

1
2
<dubbo:registry address="multicast://224.5.6.7:1234"/>
<dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService" loadbalance="leastactive"/>

<dubbo:reference>同样是由DubboBeanDefinitionParser进行解析,其被解析后成实例ReferenceBean。因此,其是服务引用过程的关键。

可以看出,它是一个工厂Bean(实现FactoryBean接口)、InitializingBean接口。因此当有服务引用demoService时,会调用getObject()返回代理的对象。所以getObject()即是关键服务。因为实现了InitializingBean所以在向Spring暴露bean之前会调用afterPropertiesSet方法。RefrenceBeanafterPropertiesSet方法作用跟ServiceBeanafterPropertiesSet一样,从Spring容器中获取ConsumerApplicationModuleRegisteriesMonitorProtocol对象设置到其属性中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void afterPropertiesSet() throws Exception {
if (getConsumer() == null) {
Map<String, ConsumerConfig> consumerConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ConsumerConfig.class, false, false);
if (consumerConfigMap != null && consumerConfigMap.size() > 0) {
ConsumerConfig consumerConfig = null;
for (ConsumerConfig config : consumerConfigMap.values()) {
if (config.isDefault() == null || config.isDefault().booleanValue()) {
consumerConfig = config;
}
}
if (consumerConfig != null) {
setConsumer(consumerConfig);
}
}
}
...//获取`Application`、`Module`、`Registeries`、`Monitor`、`Protocol`
Boolean b = isInit();
if (b == null && getConsumer() != null) {
b = getConsumer().isInit();
}
//若配置了init则会提前初始化。调用getObject方法
if (b != null && b.booleanValue()) {
getObject();
}
}

getObject最终会调用init方法,init方法中会组装所有配置的属性然后根据这些属性来创建代理对象createProxy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private void init() {
...
// 获取所有配置属性组装成map
Map<String, String> map = new HashMap<String, String>();
Map<Object, Object> attributes = new HashMap<Object, Object>();
map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
if (!isGeneric()) {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put("revision", revision);
}

String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
map.put("methods", Constants.ANY_VALUE);
} else {
map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
map.put(Constants.INTERFACE_KEY, interfaceName);
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, consumer, Constants.DEFAULT_KEY);
appendParameters(map, this);
...
// 根据组装成的属性 map 创建代理
ref = createProxy(map);
ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
}

当配置injvm=true时,且当前JVM有对应服务时,createProxy会直接应用本地的服务,本章为了涵盖更多的内容,因此会专注于远端服务引用的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
private T createProxy(Map<String, String> map) {
URL tmpUrl = new URL("temp", "localhost", 0, map);
// 当用户配置了url属性,则创建的是点对点的服务。
if (url != null && url.length() > 0) {
String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (url.getPath() == null || url.getPath().length() == 0) {
url = url.setPath(interfaceName);
}
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
} else {
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else {
//获取注册中心的配置,装配URL
//URL示例:registry://224.5.6.7:1234/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.0&pid=1269&qos.port=33333&registry=multicast&timestamp=1520264898587
List<URL> us = loadRegistries(false);
if (us != null && !us.isEmpty()) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
}
if (urls.size() == 1) {
//之前的章节里说过,refprotocol是Protocol的自适应对象,其在执行refer时,会获取`url.getProtocol()`属性决定调用哪个拓展。
//根据上面的URL可知是RegistryProtocol
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) { // registry url is available
// use AvailableCluster only when register's cluster is available
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // not a registry url
invoker = cluster.join(new StaticDirectory(invokers));
}
}
// create service proxy
//经过上面的处理后,invoker现在已是XxxClusterInvoker,下面我们以FailoverClusterInvoker为例来
return (T) proxyFactory.getProxy(invoker);
}

proxyFactory是个扩展点,有JdkProxyFactoryJavassistProxyFactory。进一步可以看到,配置的拦截器都是InvokerInvocationHandler。因此当调用Client方法时,都会被其拦截,调用其invoke方法。

1
2
3
4
5
6
7

public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
// 根据URL获取具体的注册器实例,比如这里`MulticastRegistry`
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// ...
return doRefer(cluster, registry, type, url);
}
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 创建目录,创建目录的同时也会创建路由器,在其父类`AbstractDirectory`中`setRouters`中进行创建
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);

Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
//向注册中心注册服务消费者信息
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
// 目录向注册中心订阅服务,第一次订阅或者当服务提供者信息发生改变时注册中心会将更新通知到目录
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));

// 集群将目录合并到虚拟的`XxxClusterInvoker`中,而`XxxClusterInvoker`是一种虚拟的Invoker,起分发的作用。
// cluster自适应类型会根据`url.cluster`属性判断实际类型,默认为`FailoverCluster`,join后返回的是`FailoverClusterInvoker`,
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsuemr(invoker, url, subscribeUrl, directory);
return invoker;
}

//`AbstractDirectory`中`setRouters`逻辑
protected void setRouters(List<Router> routers) {
// copy list
routers = routers == null ? new ArrayList<Router>() : new ArrayList<Router>(routers);

String routerkey = url.getParameter(Constants.ROUTER_KEY);
if (routerkey != null && routerkey.length() > 0) {
//根据url.route属性创建Router实例
RouterFactory routerFactory = ExtensionLoader.getExtensionLoader(RouterFactory.class).getExtension(routerkey);
routers.add(routerFactory.getRouter(url));
}
// 最后加上MockInvokerSelector,作用是当一个请求被配置使用mock,router则保证只有带有MOCK协议的invokers才会出现在提供者列表上
routers.add(new MockInvokersSelector());
Collections.sort(routers);
this.routers = routers;
}

可以看出,实际上Router的作用是起过滤作用的,过滤出所有符合条件的Invoker
再来继续看下第一次订阅或者当服务提供者信息发生改变时注册中心通知目录的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
public synchronized void notify(List<URL> urls) {
List<URL> invokerUrls = new ArrayList<URL>();
List<URL> routerUrls = new ArrayList<URL>();
List<URL> configuratorUrls = new ArrayList<URL>();
for (URL url : urls) {
String protocol = url.getProtocol();
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
//分类处理:当时Route信息时,则更新Routers;
// 当为服务提供者时则,则刷新服务提供者
if (Constants.ROUTERS_CATEGORY.equals(category)
|| Constants.ROUTE_PROTOCOL.equals(protocol)) {
routerUrls.add(url);
} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
configuratorUrls.add(url);
} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
invokerUrls.add(url);
}
}
...
// routers
if (routerUrls != null && !routerUrls.isEmpty()) {
List<Router> routers = toRouters(routerUrls);
if (routers != null) { // null - do nothing
setRouters(routers);
}
}
...
// 刷新Invokers
refreshInvoker(invokerUrls);
}
private void refreshInvoker(List<URL> invokerUrls) {
this.forbidden = false; // Allow to access
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<URL>();
this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
}
if (invokerUrls.isEmpty()) {
return;
}
//将url转换为Invoker
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap);
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
//关闭无用的Invokers
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap);
}
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
Set<String> keys = new HashSet<String>();
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
for (URL providerUrl : urls) {
////URL: dubbo://192.168.199.105:32323/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-consumer&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&loadbalance=leastactive&methods=sayHello&pid=1723&qos.port=33333&register.ip=192.168.199.105&remote.timestamp=1520264874377&scope=remote&side=consumer&timestamp=1520268138014
URL url = mergeUrl(providerUrl);
String key = url.toFullString(); // The parameter urls are sorted
if (keys.contains(key)) { // Repeated url
continue;
}
keys.add(key);
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { // Not in the cache, refer again
boolean enabled = true;
if (url.hasParameter(Constants.DISABLED_KEY)) {
enabled = !url.getParameter(Constants.DISABLED_KEY, false);
} else {
enabled = url.getParameter(Constants.ENABLED_KEY, true);
}
if (enabled) {
//引用服务的关键,调用DubboProtocol.refer
invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
newUrlInvokerMap.put(key, invoker);
}
}
}
keys.clear();
return newUrlInvokerMap;
}

DubboProtocol.refer用来创建传输层的Clients,并建立长连接。

1
2
3
4
5
6
7
8
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// create rpc invoker.
//getClients即是用来创建客户端连接
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}

getClients会间接调用某种类型(XxxTransporter)的传输器connect方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
`getClients`
-> `HeaderExchanger.connect`
->new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true)
-> `Transporter.connect`
```Getclients`会
假定我们使用`Netty`做传输层,则`Transporter.connect`会创建一个`NettyClient`,它会在构造函数内进行初始化后`Netty`客户端模式的配置然后建立连接。
重点来看下初始化`NettyClient`的逻辑:

```java
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
bootstrap = new ClientBootstrap(channelFactory);
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("connectTimeoutMillis", getTimeout());
//
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
//这里封装了编码器和解码器逻辑
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ChannelPipeline pipeline = Channels.pipeline();
//解码,用途接收结果后的解码
pipeline.addLast("decoder", adapter.getDecoder());
//用于发送请求的编码
pipeline.addLast("encoder", adapter.getEncoder());
// 发送请求的出口和处理请求的入口
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
}

另外一方面,值得一提的是HeaderExchangeClientHeaderExchangeServer,都带有心跳功能,他们会周期扫描当前所有的Channel,如果该Channel最近一个周期内没被读或写的话,则发送一次心跳请求,若心跳超时则进行重连。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public HeaderExchangeClient(Client client, boolean needHeartbeat) {
...
if (needHeartbeat) {
startHeatbeatTimer();
}
}
private void startHeatbeatTimer() {
stopHeartbeatTimer();
if (heartbeat > 0) {
// 定时运行心跳任务
heartbeatTimer = scheduled.scheduleWithFixedDelay(
new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
public Collection<Channel> getChannels() {
return Collections.<Channel>singletonList(HeaderExchangeClient.this);
}
}, heartbeat, heartbeatTimeout),
heartbeat, heartbeat, TimeUnit.MILLISECONDS);
}
}
// 进一步看 HeartBeatTask的run方法
public void run() {
long now = System.currentTimeMillis();
for (Channel channel : channelProvider.getChannels()) {
if (channel.isClosed()) {
continue;
}
Long lastRead = (Long) channel.getAttribute(
HeaderExchangeHandler.KEY_READ_TIMESTAMP);
Long lastWrite = (Long) channel.getAttribute(
HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);
//如果上一次rpc操作时间到现在已经超过心跳周期,则发起一起心跳
if ((lastRead != null && now - lastRead > heartbeat)
|| (lastWrite != null && now - lastWrite > heartbeat)) {
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true);
req.setEvent(Request.HEARTBEAT_EVENT);
channel.send(req);
}
if (lastRead != null && now - lastRead > heartbeatTimeout) {
if (channel instanceof Client) {
((Client) channel).reconnect();
} else {
channel.close();
}
}
}
}

针对的,在NettyServerNettyClient构造函数里,可以看到ChannelHandlers.wrap(handler...)这段逻辑。深入进去后会发现:

1
2
3
4
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new `HeartbeatHandler`(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}

会在ChannelHandler中嵌入一个HeartbeatHandler,用来处理心跳请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void received(Channel channel, Object message) throws RemotingException {
setReadTimestamp(channel);
// 如果是心跳请求,则直接返回`Response`
if (isHeartbeatRequest(message)) {
Request req = (Request) message;
if (req.isTwoWay()) {
Response res = new Response(req.getId(), req.getVersion());
res.setEvent(Response.HEARTBEAT_EVENT);
channel.send(res);
}
return;
}
//如果是心跳返回结果,...
if (isHeartbeatResponse(message)) {
return;
}
handler.received(channel, message);
}
您的支持是我创作源源不断的动力