Zamperini


  • 首页

  • 关于

  • 标签23

  • 分类11

  • 归档56

数据库中间件Zebra

发表于 2019-01-03 | 分类于 database

Zebra

[TOC]

Zebra是什么?能干甚么?

Zebra是一个在JDBC协议上开发的数据库连接池中间件,它不是真连接池(与DB直接交互的连接池),而是对连接池做了一层包装。

功能:

  1. 支持适配目前主流的数据库连接池(如上图)
  2. 读写分离、分库分表
  3. 支持配置动态修改生效(连接池的配置、用户密码、数据库节点访问路由负载均衡配置)
  4. CAT全方位监控(SQL执行情况、数据库连接数、端到端监控)
  5. 支持压测(改写表名)、SQL限流、黑白名单、SQL改写、SQL审计(日志审计,SQL安全监控)…

同类产品有哪些,以及比较?

类别 案例 优点 缺点
基于代理 mycat、cobor、atlas、jed 多语言支持、节省数据库连接 风险大(链路长)、实现难度大、共享连接时有风险
基于客户端(jdbc层) tddl 直连数据库(风险较小)、更灵活 对于每种语言都需要重写sdk、富客户端的常见缺点

基于代理:

基于客户端:
set up-w500

公司目前:北京侧 Altas居多,也有Atlas与zebra搭配使用(使用其压测处理、SQL监控特性),上海侧统一使用zebra。趋势是转向Zebra。Atlas与Zebra的对比。zebra秒杀Atlas?
数据库中间件比较

阅读全文 »

Dubbo 负载均衡

发表于 2018-04-26 | 分类于 dubbo

负载均衡

一致性HASH算法

适合场景:

  • 均衡性(Balance):
    平衡性是指哈希的结果能够尽可能分布到所有的缓冲中去,这样可以使得所有的缓冲空间都得到利用。很多哈希算法都能够满足这一条件。
  • 单调性(Monotonicity):当缓冲区大小变化时一致性哈希(Consistent hashing)尽量保护已分配的内容不会被重新映射到新缓冲区。(线性Hash就不能保证)
  • 分散性(Spread):避免相同内容被存储到不同缓冲中去,降低了系统存储的效率
  • 负载(Load):负载问题实际上是从另一个角度看待分散性问题。既然不同的终端可能将相同的内容映射到不同的缓冲区中,那么对于一个特定的缓冲区而言,也可能被不同的用户映射为不同的内容

CARP: Common Access Redundancy Protocol共用地址冗余协议Common Access Redundancy Protocol,或简称 CARP 能够使多台主机共享同一 IP 地址。在某些配置中,这样做可以提高可用性,或实现负载均衡。这些主机也可以同时使用其他的不同的 IP 地址。

##

阅读全文 »

Dubbo 服务调用

发表于 2018-04-25 | 分类于 dubbo

服务调用过程解析

服务过程解析部分我们分两部分来讲,客户端发送请求、接收结果和服务端接受请求、响应结果过程。

客户端发送请求、接收结果

首先,调用客户端方法时,首先被InvokerInvocationHandler拦截,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
//在服务引用过程分析中,提到这里的invoker。当不是没有`injvm`、`url`配置时,invoker是 XxxClusterInvoker
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}

接下来,我们指定XxxClusterInvoker为FailoverClusterInvoker来分析整个流程。
FailoverClusterInvoker的结构如下:

所以,接下来会走进AbstractClusterInvoker的invoker方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public Result invoke(final Invocation invocation) throws RpcException {
LoadBalance loadbalance = null;
//根据参数列出所有可以使用的Invoker
List<Invoker<T>> invokers = list(invocation);
if (invokers != null && !invokers.isEmpty()) {
// 根据Url,获取负载均衡器,这里先假定为`RandomLoadBalance`
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);
}
protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
// 调用directory的list方法,这里的Directory为`RegistryDirectory`
List<Invoker<T>> invokers = directory.list(invocation);
return invokers;
}
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
// doList方法直接从Directory保存的InvokerMap中取出对应Service的Invokers
List<Invoker<T>> invokers = doList(invocation);
List<Router> localRouters = this.routers;
// 经过设置的Routers进行过滤出可以使用的Invokers
if (localRouters != null && !localRouters.isEmpty()) {
for (Router router : localRouters) {
if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
invokers = router.route(invokers, getConsumerUrl(), invocation);
}
}
}
return invokers;
}

获取完所有可用的Invoker后,继续看FailoverClusterInvoker的处理逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyinvokers = invokers;
//失败重试次数
int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
//len-1次重试
for (int i = 0; i < len; i++) {
if (i > 0) {
checkWhetherDestroyed();
copyinvokers = list(invocation);
}
//
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
Result result = invoker.invoke(invocation);
return result;
}...
}
}
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (invokers == null || invokers.isEmpty())
return null;
String methodName = invocation == null ? "" : invocation.getMethodName();
boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);
{
//如果带有粘性设置,则直接返回之前保存的`stickyInvoker`
if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
stickyInvoker = null;
}
if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
if (availablecheck && stickyInvoker.isAvailable()) {
return stickyInvoker;
}
}
}
// 负载均衡器根据策略选出一个Invoker,这里的Invoker是DubboInvoker
Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);

if (sticky) {
stickyInvoker = invoker;
}
return invoker;
}

接下来看DubboInvoker的处理逻辑。

阅读全文 »

Dubbo 服务引用

发表于 2018-04-24 | 分类于 dubbo

服务引用过程解析

调用服务时,在XML中配置<dubbo:reference>和注册中心即可像调用一个内存方法一样调用远端服务。

1
2
<dubbo:registry address="multicast://224.5.6.7:1234"/>
<dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService" loadbalance="leastactive"/>

<dubbo:reference>同样是由DubboBeanDefinitionParser进行解析,其被解析后成实例ReferenceBean。因此,其是服务引用过程的关键。

可以看出,它是一个工厂Bean(实现FactoryBean接口)、InitializingBean接口。因此当有服务引用demoService时,会调用getObject()返回代理的对象。所以getObject()即是关键服务。因为实现了InitializingBean所以在向Spring暴露bean之前会调用afterPropertiesSet方法。RefrenceBean的afterPropertiesSet方法作用跟ServiceBean的afterPropertiesSet一样,从Spring容器中获取Consumer、Application、Module、Registeries、Monitor、Protocol对象设置到其属性中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void afterPropertiesSet() throws Exception {
if (getConsumer() == null) {
Map<String, ConsumerConfig> consumerConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ConsumerConfig.class, false, false);
if (consumerConfigMap != null && consumerConfigMap.size() > 0) {
ConsumerConfig consumerConfig = null;
for (ConsumerConfig config : consumerConfigMap.values()) {
if (config.isDefault() == null || config.isDefault().booleanValue()) {
consumerConfig = config;
}
}
if (consumerConfig != null) {
setConsumer(consumerConfig);
}
}
}
...//获取`Application`、`Module`、`Registeries`、`Monitor`、`Protocol`
Boolean b = isInit();
if (b == null && getConsumer() != null) {
b = getConsumer().isInit();
}
//若配置了init则会提前初始化。调用getObject方法
if (b != null && b.booleanValue()) {
getObject();
}
}

getObject最终会调用init方法,init方法中会组装所有配置的属性然后根据这些属性来创建代理对象createProxy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private void init() {
...
// 获取所有配置属性组装成map
Map<String, String> map = new HashMap<String, String>();
Map<Object, Object> attributes = new HashMap<Object, Object>();
map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
if (!isGeneric()) {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put("revision", revision);
}

String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
map.put("methods", Constants.ANY_VALUE);
} else {
map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
map.put(Constants.INTERFACE_KEY, interfaceName);
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, consumer, Constants.DEFAULT_KEY);
appendParameters(map, this);
...
// 根据组装成的属性 map 创建代理
ref = createProxy(map);
ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
}

当配置injvm=true时,且当前JVM有对应服务时,createProxy会直接应用本地的服务,本章为了涵盖更多的内容,因此会专注于远端服务引用的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
private T createProxy(Map<String, String> map) {
URL tmpUrl = new URL("temp", "localhost", 0, map);
// 当用户配置了url属性,则创建的是点对点的服务。
if (url != null && url.length() > 0) {
String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (url.getPath() == null || url.getPath().length() == 0) {
url = url.setPath(interfaceName);
}
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
} else {
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else {
//获取注册中心的配置,装配URL
//URL示例:registry://224.5.6.7:1234/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.0&pid=1269&qos.port=33333&registry=multicast&timestamp=1520264898587
List<URL> us = loadRegistries(false);
if (us != null && !us.isEmpty()) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
}
if (urls.size() == 1) {
//之前的章节里说过,refprotocol是Protocol的自适应对象,其在执行refer时,会获取`url.getProtocol()`属性决定调用哪个拓展。
//根据上面的URL可知是RegistryProtocol
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) { // registry url is available
// use AvailableCluster only when register's cluster is available
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // not a registry url
invoker = cluster.join(new StaticDirectory(invokers));
}
}
// create service proxy
//经过上面的处理后,invoker现在已是XxxClusterInvoker,下面我们以FailoverClusterInvoker为例来
return (T) proxyFactory.getProxy(invoker);
}

proxyFactory是个扩展点,有JdkProxyFactory和JavassistProxyFactory。进一步可以看到,配置的拦截器都是InvokerInvocationHandler。因此当调用Client方法时,都会被其拦截,调用其invoke方法。

阅读全文 »

Dubbo 服务暴露

发表于 2018-04-23 | 分类于 dubbo

服务暴露过程解析

当在XML文件中配置下面内容,启动时Dubbo即会发布demoService服务,服务调用者皆可并在注册中心查到此服务提供者,并向其发起调用:

1
2
3
4
5
6
<dubbo:application name="demo-provider"/>
<dubbo:registry address="multicast://224.5.6.7:1234"/>
<dubbo:protocol name="dubbo" port="32321"/>
<bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl"/>
//服务发布
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/>

其中<dubbo:service>用来初始化发布服务,其会被DubboBeanDefinitionParser解析成ServiceBean,并放在Spring容器中。afterPropertiesSet方法初始化ServiceBean。

其主要是从Spring容器中获取Provider、Application、Module、Registeries、Monitor、Protocol对象设置到属性中。

  • 没有配置delay属性时,直接调用export暴露服务
  • 配置了delay属性时,则会在Spring实例完所有bean后,发布ContextRefreshEvent事件时,调用ServiceBean(实现ApplicationListener接口)的onApplicationEvent方法进行export
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void afterPropertiesSet() throws Exception {
if (getProvider() == null) {
// 获取所有Provider
Map<String, ProviderConfig> providerConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProviderConfig.class, false, false);
if (providerConfigMap != null && providerConfigMap.size() > 0) {
Map<String, ProtocolConfig> protocolConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class, false, false);
if ((protocolConfigMap == null || protocolConfigMap.size() == 0)
&& providerConfigMap.size() > 1) { // backward compatibility
List<ProviderConfig> providerConfigs = new ArrayList<ProviderConfig>();
for (ProviderConfig config : providerConfigMap.values()) {
if (config.isDefault() != null && config.isDefault().booleanValue()) {
providerConfigs.add(config);
}
}
if (!providerConfigs.isEmpty()) {
setProviders(providerConfigs);
}
}
}
}
...
if (!isDelay()) {
export();
}
}

export方法最关键的部分是执行doExportUrls方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
private void doExportUrls() {
//获取注册中心的URL,向多个注册中心发布服务
List<URL> registryURLs = loadRegistries(true);
// protocols即为`ServiceBean`初始化时从`Spring`容器中获取的对象集合
//用Protocol挨个暴露服务
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
//
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
String name = protocolConfig.getName();
if (name == null || name.length() == 0) {
name = "dubbo";
}
// 拼装参数
Map<String, String> map = new HashMap<String, String>();
map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, provider, Constants.DEFAULT_KEY);
appendParameters(map, protocolConfig);
appendParameters(map, this);
...
String scope = url.getParameter(Constants.SCOPE_KEY);
// scope为local时,只向本地暴露服务
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
exportLocal(url);
}
// scope为remote时,向注册中心暴露服务
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (registryURLs != null && !registryURLs.isEmpty()) {
for (URL registryURL : registryURLs) {
// registryURL:
url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
//通过代理工厂生成一个Invoker
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
//创建一个代理Invoker,包装Invoker并带有元数据
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
//调用protocol暴露服务
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} ...
}
}

接下来,就是Protocol进行暴露以及发布服务:
ServiceBean中的Protocol对应的是自适应类型Protocol,调用export方法时,其会根据参数Invoker的URL属性url.getProtocol()来判定调用哪个Protocol对象。上文中的registryURL.getProtocol为registry,所以回去调用RegistryProtocol(会先调用包装类ProtocolListenerWrapper、ProtocolFilterWrapper)。

阅读全文 »
prev1…456…12next
Zamperini

Zamperini

GitHub E-Mail Weibo
© 2020 Zamperini
由 Hexo 强力驱动
|
主题 — NexT.Gemini v6.0.3