Zamperini


  • 首页

  • 关于

  • 标签23

  • 分类11

  • 归档56

Dubbo SPI拓展机制

发表于 2018-04-22 | 分类于 dubbo

SPI拓展机制(IOC、AOP、自适应拓展)

Dubbo 官方文档 拓展点加载 介绍了Dubbo的SPI拓展机制,总结起来如下:

  1. 是JDK SPI的加强版:
    • 只有使用的时候才会实例化拓展点,而摒弃JDK SPI一次性实例化所有拓展点
    • 增加了对IOC、AOP的支持
  2. 拓展的方法:在扩展类的 jar 包内,放置扩展点配置文件 META-INF/dubbo/接口全限定名,内容为:配置名=扩展实现类全限定名,多个实现类用换行符分隔。即可被Dubbo自动加载。
  3. 扩展点的特性:
    • 扩展点自动装配(IOC):加载扩展点时,扩展点实现类的成员如果为其它扩展点类型,在会自动注入依赖的扩展点;
    • 扩展点自动包装:AOP的一种实现方式,为拓展点自动添加一层包装,将某些可拓展的共有逻辑放在包装类(Wrapper)中。调用时,会先调用包装类的逻辑再调用拓展点的内容;
    • 扩展点自适应:在拓展点方法执行的时候,根据调用的参数(URL)决定调用的是哪个拓展点实现,而不会在初始化时就直接指定是哪个拓展点;
    • 扩展点自动激活:对于集合类拓展点,可以同时加载多个实现。自动激活可以简化配置,且可以编排它们的顺序。

ExtensionLoader是实现其SPI拓展机制的关键类,Dubbo会为每种类型拓展点都创建有一个ExtensionLoader实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
//首先从缓存里获取
ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
if (loader == null) {
EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
}
return loader;
}
//私有构造函数
private ExtensionLoader(Class<?> type) {
this.type = type;
objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
}

其最主要的属性如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//所处理的拓展点类型(接口类型)
Class<?> type;
//拓展点工厂,用于在拓展点注入属性时获取属性的实例(从SPI中获取或者从Spring的上下文中获取)
ExtensionFactory objectFactory;
//拓展点类和名称的映射
ConcurrentMap<Class<?>, String> cachedNames = new ConcurrentHashMap<Class<?>, String>();
// 名称和类的映射,类可能有多个名称
Holder<Map<String, Class<?>>> cachedClasses = new Holder<Map<String, Class<?>>>();
// 名称与激活拓展点注解的映射。
Map<String, Activate> cachedActivates = new ConcurrentHashMap<String, Activate>();
//名称与拓展点实例的映射,一般一个拓展点只会有一个实例
ConcurrentMap<String, Holder<Object>> cachedInstances = new
ConcurrentHashMap<String, Holder<Object>>();
// 自适应拓展点实例
Holder<Object> cachedAdaptiveInstance = new Holder<Object>();
//自实例拓展点类型
Class<?> cachedAdaptiveClass = null;
String cachedDefaultName;
//包装类集合 AOP
Set<Class<?>> cachedWrapperClasses;

对外暴露的最主要的方法如下:

1
2
3
4
5
6
//获取自适应拓展
public T getAdaptiveExtension();
//根据name获取特定的拓展实例
public T getExtension(String name);
// 根据条件获取当前扩展可自动激活的实现
public List<T> getActivateExtension(URL url, String[] values, String group);

Dubbo初始化时,获取拓展点时一般是调用getActivateExtension获取自适应拓展点,然后在运行时调用方法时,根据实时的参数URL确定拓展点实例的name,调用getExtension(name)获取实例。下面我们依次看这三个实现方法:

阅读全文 »

Dubbo

发表于 2018-04-21 | 分类于 dubbo

Dubbo

Dubbo是阿里开源的一套基于Java的高性能RPC框架。经常听人提起它,正好前段时间看了下新美大的RPC框架,因此闲暇中翻开代码来咀嚼了一番,收获颇多,也很有感触!!

本文主要涵盖以下内容:

  • SPI拓展机制(IOC、AOP、自适应拓展)
  • 服务暴露解析
  • 服务引用解析
  • 服务调用过程解析(请求、响应、编解码)
  • 服务发布订阅与通知
  • 组件解析与比较(Cluster、LoadBalancer、Directory、Router、Registry)

其中,会大量结合源码讲解整个过程!

内容

  • SPI(IOC、AOP)✅
  • 执行流程
  • 初始化过程
    • 服务注册、发现
    • 服务变更
  • 负载均衡
  • 服务降级
  • 失败重试
  • 服务监控
  • 传输层实现
    • NettyServer
    • NettyClient
    • NettyHandler
阅读全文 »

Netty 内存管理

发表于 2018-04-12 | 分类于 netty

内存管理

Netty的内存管理有两部分要点:自适应内存大小算法和真实的内存管理:

再讲之前我们再来看看数据读取的过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public final void read() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
// 用来分配ByteBuf内存的 PooledByteBufAllocator
final ByteBufAllocator allocator = config.getAllocator();
// 用来决定分配多大的 ByteBuf (自适应缓存分配大小),以防止缓存分配过多或过少
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
do {
// allocator.ioBuffer(allocHandle.guess());
byteBuf = allocHandle.allocate(allocator);
// 在读取的过程中,alloHandle会记录读取了多少字节
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
break;
}

allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
// 跟据本次读取的字节,计算下一次应该分配的ByteBuf大小
allocHandle.readComplete();
pipeline.fireChannelReadComplete();

if (close) {
closeOnRead(pipeline);
}
}

可以看到两个在内存分配管理上比较重要的Allocator、AllocHandle。

Allocator 是用来分配内存的,而AllocHandle则是用来辅助分配内存的,决定在不知道内存空间有多大的情况下预分配多大的ByteBuf。

AllocHandle(决定分配ByteBuf大小)

AllocHandle,其实现有多种。这里,重点来看AdaptiveRecvByteBufAllocator.HandleImpl:

在AdaptiveRecvByteBufAllocator内部会维护以下信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 初始化时的配置
static final int DEFAULT_MINIMUM = 64;
static final int DEFAULT_INITIAL = 1024;
static final int DEFAULT_MAXIMUM = 65536;

// index的自增值
private static final int INDEX_INCREMENT = 4;
// index自建值
private static final int INDEX_DECREMENT = 1;

// SIZE_TABLE 用来存放分配大小的尺度
private static final int[] SIZE_TABLE;

// 根据最小配置大小得到的在SIZE_TABLE中的下标
private final int minIndex;
private final int maxIndex;
private final int initial;

static {
List<Integer> sizeTable = new ArrayList<Integer>();
for (int i = 16; i < 512; i += 16) {
sizeTable.add(i);
}
for (int i = 512; i > 0; i <<= 1) {
sizeTable.add(i);
}
SIZE_TABLE = new int[sizeTable.size()];
for (int i = 0; i < SIZE_TABLE.length; i ++) {
SIZE_TABLE[i] = sizeTable.get(i);
}
// 初始化完成后,sizeTable : 16, 32, 48, 64, ..., 496,512,1024,2048...
}
阅读全文 »

Netty 服务接收请求&响应

发表于 2018-04-11 | 分类于 netty

服务端创建连接并响应请求

客户端与服务端通讯必须先创建连接。
在前文介绍服务端初始化的时候说道初始化的时候会向boss线程注册一个OP_ACCEPT事件。客户端请求链接时即会触发此事件。当boss线程池中的NioEventLoop在循环的时候检测到此事件时,会触发NioMessageUnsafe.read方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public void read() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
do {
// NioServerSocketChannel.doReadMessage
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 触发 ChannelHandler的 channelRead方法,经过LoggingHandler最后到ServerBootstrapAcceptor
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
}
protected int doReadMessages(List<Object> buf) throws Exception {
// 从ServerSocketChannel.accept一个SocketChannel
SocketChannel ch = SocketUtils.accept(javaChannel());
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
return 0;
}

和Client端的不同之处即在于doReadMessages方法。此处调用的是NioServerSocketChannel.doReadMessage。
其调用的是原生ServerSocketChannel.accept方法来获取一个SocektChannel然后封装在NioSocetChannel中,并将其传递到ServerBootstrapAcceptor:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
// 将初始化时设置的childHandler 加入到pipeline 中
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
// 往reactor线程池中注入,即跟某个`NioEventLoop`绑定。
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}

首先,将初始化时配置的childHandler加到Channel的pipeline中,然后将Channel注册到某个 Reactor线程 上,然后开始监听OP_READ读事件。
此后与客户端的通讯工作即会在Reactor线程中进行,其过程与Client中一致,因此这里不再复述。

阅读全文 »

Netty 客户端请求&响应

发表于 2018-04-09 | 分类于 netty

客户端请求发送与获取响应

在Netty里,发送请求的过程可以分为两步:1. 将请求写到缓存队列中,2.将缓存队列中的请求进行 flush —– 调用原生的SocketChannel进行发送。

在客户端初始化时,讲到客户端的Channel会在Selector上注册OP_READ操作,当有服务端响应结果时,则触发OP_READ事件。

请求发送

首先来看发送请求的过程:SocketChannel 有两个方法用于发送请求,write和writeAndFlush,其中write用于将请求ByteBuf放入发送队列中,writeAndFlush则将请求发送到队列中,并把发送队列中的数据发送出去。这里以writeAndFlush来讲解整个过程
调用writeAndFlush方法最终会调用AbstractChannelHandlerContext.write方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}

其判断发起请求操作 channel 的对应的线程上下文中,不是则创建任务并将其扔进EventLoop的任务队列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}
private void invokeWrite0(Object msg, ChannelPromise promise) {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
}
private void invokeFlush0() {
((ChannelOutboundHandler) handler()).flush(this);
}

这里就可以看到invokeWrite0和invokeFlush0两个过程:

  1. invokeWrite0调用链如下:
阅读全文 »
prev1…567…12next
Zamperini

Zamperini

GitHub E-Mail Weibo
© 2020 Zamperini
由 Hexo 强力驱动
|
主题 — NexT.Gemini v6.0.3