Dubbo 服务调用

服务调用过程解析

服务过程解析部分我们分两部分来讲,客户端发送请求、接收结果和服务端接受请求、响应结果过程。

客户端发送请求、接收结果

首先,调用客户端方法时,首先被InvokerInvocationHandler拦截,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
//在服务引用过程分析中,提到这里的invoker。当不是没有`injvm`、`url`配置时,invoker是 XxxClusterInvoker
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}

接下来,我们指定XxxClusterInvokerFailoverClusterInvoker来分析整个流程。
FailoverClusterInvoker的结构如下:

所以,接下来会走进AbstractClusterInvoker的invoker方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public Result invoke(final Invocation invocation) throws RpcException {
LoadBalance loadbalance = null;
//根据参数列出所有可以使用的Invoker
List<Invoker<T>> invokers = list(invocation);
if (invokers != null && !invokers.isEmpty()) {
// 根据Url,获取负载均衡器,这里先假定为`RandomLoadBalance`
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);
}
protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
// 调用directory的list方法,这里的Directory为`RegistryDirectory`
List<Invoker<T>> invokers = directory.list(invocation);
return invokers;
}
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
// doList方法直接从Directory保存的InvokerMap中取出对应Service的Invokers
List<Invoker<T>> invokers = doList(invocation);
List<Router> localRouters = this.routers;
// 经过设置的Routers进行过滤出可以使用的Invokers
if (localRouters != null && !localRouters.isEmpty()) {
for (Router router : localRouters) {
if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
invokers = router.route(invokers, getConsumerUrl(), invocation);
}
}
}
return invokers;
}

获取完所有可用的Invoker后,继续看FailoverClusterInvoker的处理逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyinvokers = invokers;
//失败重试次数
int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
//len-1次重试
for (int i = 0; i < len; i++) {
if (i > 0) {
checkWhetherDestroyed();
copyinvokers = list(invocation);
}
//
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
Result result = invoker.invoke(invocation);
return result;
}...
}
}
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (invokers == null || invokers.isEmpty())
return null;
String methodName = invocation == null ? "" : invocation.getMethodName();
boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);
{
//如果带有粘性设置,则直接返回之前保存的`stickyInvoker`
if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
stickyInvoker = null;
}
if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
if (availablecheck && stickyInvoker.isAvailable()) {
return stickyInvoker;
}
}
}
// 负载均衡器根据策略选出一个Invoker,这里的Invoker是DubboInvoker
Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);

if (sticky) {
stickyInvoker = invoker;
}
return invoker;
}

接下来看DubboInvoker的处理逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);

ExchangeClient currentClient;
// 当有多个client(对同一个服务提供者建立多个链接)时,则按轮询的方式选择一个Client发起调用。
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (isOneway) {
//单向调用,调用完即返回
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
//异步调用
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
//同步调用
RpcContext.getContext().setFuture(null);
//这里最终的Client实现为`HeaderExchangeClient`
return (Result) currentClient.request(inv, timeout).get();
}
}

而后,HeaderExchangeClient会调用HeaderExchangeChannelrequest方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// 创建一个Request,其带有Id属性,这个Id属性就是用来唯一标识请求
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true);
req.setData(request);
//创建DefaultFuture实例时,会将其放在一个全局的Map中,用于请求返回时,设置结果
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
//这里的channel即是NettyClient
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}

这里,值得一提的是DefaultFuture是客户端发送请求到获取结果的桥梁。
channel.send下面即为Netty的处理逻辑:
这里会先经过EncoderHandlerNettyCodecAdapter.InternalEncoder)进行编码处理,最后由NettyHandler发送请求。
服务端结果响应后,会由DecodeHandlerNettyCodecAdapter.InternalDncoder)进行解码后,交由NettyHanlder处理。

您的支持是我创作源源不断的动力