数据库中间件Zebra

Zebra

[TOC]

Zebra是什么?能干甚么?

Zebra是一个在JDBC协议上开发的数据库连接池中间件,它不是真连接池(与DB直接交互的连接池),而是对连接池做了一层包装。

功能:

  1. 支持适配目前主流的数据库连接池(如上图)
  2. 读写分离、分库分表
  3. 支持配置动态修改生效(连接池的配置、用户密码、数据库节点访问路由负载均衡配置)
  4. CAT全方位监控(SQL执行情况、数据库连接数、端到端监控)
  5. 支持压测(改写表名)、SQL限流、黑白名单、SQL改写、SQL审计(日志审计,SQL安全监控)…

同类产品有哪些,以及比较?

类别 案例 优点 缺点
基于代理 mycat、cobor、atlas、jed 多语言支持、节省数据库连接 风险大(链路长)、实现难度大、共享连接时有风险
基于客户端(jdbc层) tddl 直连数据库(风险较小)、更灵活 对于每种语言都需要重写sdk、富客户端的常见缺点

基于代理:

基于客户端:
set up-w500

公司目前:北京侧 Altas居多,也有Atlaszebra搭配使用(使用其压测处理、SQL监控特性),上海侧统一使用zebra。趋势是转向ZebraAtlas与Zebra的对比zebra秒杀Atlas?
数据库中间件比较

为什么美团一开始没有选择基于sdk的方式,而是基于代理的方式来做的?

架构

  • Zebra客户端做读写分离、分库分表、打点、监控
  • RDS、DBA管理平台维护配置信息
  • Lion监听配置更新,通知客户端生效变更
  • MHA保障主库的高可用性
  • zebra-monitor(自研)保障丛库的高可用性

MHA(Master High Availability)是作为MySQL高可用环境下,操作故障切换、主从提升的一套解决方案。MHA(Master High Availability)目前在MySQL高可用方面是一个相对成熟的解决方案。能做到30内完成故障切换。 官网
PS: Atlas维护主的高可用性也是MHA,且和Zebra是同一个MHA。

高可用

主库的高可用

利用MHA进行master节点的可用性监控,在发生故障,master节点不可用时,MHA进行mysql层的主从切换,切换成功后通知zebra新master节点的IP,由zebra客户端负责应用访问层的切换。官方文档
切换流程如下:

  1. MHA对MySQL集群进行监控管理
  2. 当主库发生故障时,MHA通知zebra对主库的写进行关闭,并进行MySQL集群的主从切换(切换期间应用无法写数据)
  3. zebra禁止掉对故障集群的写操作
  4. MHA切换成功,通知zebra新的写数据IP
  5. zebra用新的写IP替换老IP,开放应用访问。

丛库的高可用

zebra-monitor的监控服务负责,实时监控线上MySQL从库的健康状况,如果出现从库“故障”,将会通知zebra将读流量转移到其他可读节点,实现从库的“故障”转移。

丛库状态判断: zebra-monitor监控首先使用select 1 测试是否可以连通数据库,连接没有问题则使用 show slave status 获取到second_behind_master字段来得到该从库上的延迟,从而做出判断:

  • markdown(故障)

    1. 30s内从库连续ping不通; (从库宕机)
    2. 30s内 second_behind_master取到的延迟为null。 (主从同步中断)
    3. 延迟超过阈值。(可根据每个库的敏感程度进行个性化配置,需要进行另外配置)

    故障处理:从库markdown,zebra客户端会收到通知动态刷新连接池配置,重建本地数据源配置,流量不会再走到故障丛库,老的数据源会在全部sql执行完成后被close。

  • markup(恢复)

    • 30s内能够连续ping通并且主从延迟为0.

故障恢复:丛库恢复时,会通知客户端进行动态刷新数据源。

Zebra-client

##总览

推荐使用包搭配

zebra-apizebra-ds-monitormtrace-zebra

zebra-api

ShardDataSource + m x GroupDataSource(Master + n x Slave)
set up-w600

SingleDataSource

  • 屏蔽底层DataSource的差异,通过C3p0DataSourceAdapter适配具体连接池配置,并根据配置创建指定类型的连接池。

GroupDataSource

GroupDataSource主要职能 读写分离、负载均衡与路由。但还有一个好处就是不需要写死配置(jdbcUrl、user|pwd、连接池配置,且配置修改可实时生效)。

读写分离策略: 官方文档
set up-w460

路由策略: 官方文档,见代码片段
set up-w500

ShardDataSource

ShardDataSource的职责是支持分库分表路由,除此之外还有并发执行等。
set up-w600

分片规则:

利用脚本语言的灵活性,支持任意维度的分片:支持HASH、时间等,同时也可以使用Groovy内置的函数

示例脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<shard-dimension dbRule="(#id#.intValue() % 8).intdiv(2)"
dbIndexes="id[0-3]"
tbRule="#id#.intValue() % 2"
tbSuffix="alldb:[0,7]"
isMaster="true">
</shard-dimension>

class RuleEngineBaseImpl extends com.dianping.zebra.shard.router.rule.engine.RuleEngineBase {
Object execute(Map context) {
(context.get("id").intValue() % 8).intdiv(2)
}
}

class RuleEngineBaseImpl extends com.dianping.zebra.shard.router.rule.engine.RuleEngineBase{
Object execute(Map context) {
context.get("id").intValue() % 2
}
}

SQL解析:利用DruidSqlParser解析SQL成语法树,能从中直接获取SQL类型、SQL表名、参数名、Hint等…,以此为基础进行路由分库分表
SQL路由: 根据SQL解析的结果以及配置的规则,通过运行规则运算脚本可以获得应该路由到哪个库的哪个表去执行。
SQL改写:结合SQL解析器解析的结果、SQL路由的结果改写SQL语句(重设表名)
并行执行:当路由结果需要到多个库的多张表中执行,则会使用线程池去并行执行结果并合并结果
结果合并: 客户端将多库返回的所有结果加载到内存,进行合并操作

Filter链(Filter-Chain模式、责任链模式)

Filter链是zebra实现可扩展性的机制。其作用相当于Spring AOP的Interceptor
每个Filter可以在SQL执行的各个阶段起作用。比如getConnectionprepareStatementexecuteSingleStatement。可以在Filter中实现监控连接池状态、SQL执行情况;改写SQL语句;改写SQL表名;SQL流控。

set up-w600
set up-w600

另外,业务方可以完全自行拓展Filter实现SQL执行统计、改写SQL等。(这里的MtraceFilter就是Mtrace的人员开发的。)

怎么拓展?

  1. 覆盖DefaultJdbcFilter,覆盖关注的SQL执行阶段对应的方法。
  2. 在META-INF中添加一个zebra-filter.properties中记录需要加入到Filter链中的自定义Filter:
1
zebra.filter.mtrace=com.meituan.mtrace.zebra.filter.MtraceFilter

zebra-dao

zebra-dao职责是异步化物理分页。是在ORM框架MyBatis上做的一层封装。

异步化:
MyBatis中对每个访问层接口,都会生成一个代理对象,每个代理对象对应的InvocationHandlerMapperProxy
zebra-dao是在MyBatis.MapperProxy的代理对象上加了又加了一层代理AsyncMapperProxy。在AsyncMapperProxy中实现异步化。

值得注意的是,这里异步化将请求放到线程池去执行,然后通过future.get()的形式,从IO模式上来说也只是阻塞模式,而不是NIO的方式;
另外,这里一个进程会共用一个线程池。

分页:

  • 逻辑分页: 先从DB中拿出查询出来的所有的数据,而后再在内存中进行分页;

  • 物理分页: 改写SQL,在SQL中加入分页部分的逻辑limit 10,10

  • 高级物理分页:不仅返回分页数据,同时也返回总数

zebra-dao实现分页的方式是实现MyBatis的Interceptor(PageInterceptor)。Interceptor是MyBatis的插件(另一种拓展性方式),通过它可以在MyBatis的执行过程中插入一些额外的逻辑。这里PageInterceptor主要是作用针对query方法。具体做法是:

  • 改写SQL在尾部添加 limit x,x;
  • 如果是高级物理分页,则会再进行一次select count(*)查询,然后返回。
    见源码片段

工作原理

zebra-api

GroupDataSource

GroupDataSource初始化时,会从配置平台读取所有数据库节点的配置信息。然后依据这些信息来创建两种类型的代理数据源d FailOverDataSourceLoadBalancedDataSource。其中,FailOverDataSource用来连接和管理主节点故障的拒绝写等。LoadBalancedDataSource则是用来连接所有从节点,并负责从节点读流量的路由。

读写策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
LoadBalancedDataSource readDataSource;

FailOverDataSource writeDataSource;

Connection getRealConnection(boolen forceWrite) {
//是否进行了强行配置
if (this.routerType == RouterType.SLAVE_ONLY) {
return getReadConnection();
} else if (this.routerType == RouterType.MASTER_ONLY) {
return getWriteConnection();
}
//主制度
if (forceWrite) {
return getWriteConnection();
} else if (!autoCommit ||
// SQL中带有HINT
StringUtils.trimToEmpty(sql).contains("/*+zebra:w*/")
|| StringUtils.trimToEmpty(sql).contains("/*master*/")) {
return getWriteConnection();
} else if (readWriteStrategy != null && readWriteStrategy.shouldReadFromMaster()) {
//即设置了ZebraForceMasterHelper.forceMasterInLocalContext()
return getWriteConnection();
}

//通过Druid parser解析出SQL的类型
SqlType sqlType = SqlUtils.getSqlType(sql);
if (sqlType.isRead()) {
return getReadConnection();
} else {
return getWriteConnection();
}
}
private Connection getReadConnection() {
return readDataSource.getConnection();
}
private Connection getWriteConnection {
return writeDataSource.getConnection();
}

路由策略(针对读流量):

LoadBalanceDataSource中的router负责物理库和表的路由。

初始化时,会根据用户设置的配置信息(idcAware、..),创建路由

  • 先将所有的数据库节点信息按是否与本地机器在同一个区域,分为两个集合 localRegionRouterremoteRegionRouter
  • 跟据配置的路由策略(IdcAwareRouterCenterWeightRouterRegionAwareRouter)选择不同的实际路由器。
  • (1) 如果设置的只是区域感知的话,则直接按权重路由WeightDataSourceRouter
  • (2) 如果设置的是CenterAware,则将选择的是CenterAwareRouter
  • (3) 如果设置的是IdcAware,也是CenterAwareRouter,只是CenterAwareRouter中还有一个IdcAwareRouter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
public RegionAwareRouter(Map<String, DataSourceConfig> dataSourceConfigs, String configManagerType,
String routerStrategy) {
this.regionManager = ZebraRegionManagerLoader.getRegionManager(configManagerType);

Map<String, DataSourceConfig> localRegionDataSourceConfigs = new HashMap<String, DataSourceConfig>();
Map<String, DataSourceConfig> remoteRegionDataSourceConfigs = new HashMap<String, DataSourceConfig>();
//将所有的数据库节点分为同区域节点、和不同区域节点
for (Map.Entry<String, DataSourceConfig> entry : dataSourceConfigs.entrySet()) {
String dsId = entry.getKey();
DataSourceConfig config = entry.getValue();

try {
Matcher matcher = JDBC_URL_PATTERN.matcher(config.getJdbcUrl());
if (matcher.matches()) {
String url = matcher.group(1);
String[] urlAndPort = url.split(":");

if (urlAndPort != null && urlAndPort.length > 0) {
if (this.regionManager.isInLocalRegion(urlAndPort[0])) {
localRegionDataSourceConfigs.put(dsId, config);
} else {
remoteRegionDataSourceConfigs.put(dsId, config);
}
}
} else {
remoteRegionDataSourceConfigs.put(dsId, config);
}
} catch (Throwable t) {
logger.warn(String.format(
"Cannot recognize the idc for jdbcUrl(%s), so put this datasource in the other region by default.",
config.getJdbcUrl()));
remoteRegionDataSourceConfigs.put(dsId, config);
}
}
//如果路由策略配置的只是同区域优先,则同区域内部走权重路由
if (Constants.ROUTER_STRATEGY_REGION_AWARE_ROUTER.equals(routerStrategy)) {
if (localRegionDataSourceConfigs.size() > 0) {
this.localRegionRouter = new WeightDataSourceRouter(localRegionDataSourceConfigs);
}
if (remoteRegionDataSourceConfigs.size() > 0) {
this.remoteRegionRouter = new WeightDataSourceRouter(remoteRegionDataSourceConfigs);
}
} else {
// 如果设置的是中心感知或机房感知
boolean idcAware = false;
if (Constants.ROUTER_STRATEGY_IDC_AWARE_ROUTER.equals(routerStrategy)) {
idcAware = true;
}
if (localRegionDataSourceConfigs.size() > 0) {
this.localRegionRouter = new CenterAwareRouter(localRegionDataSourceConfigs, configManagerType,
idcAware);
}
if (remoteRegionDataSourceConfigs.size() > 0) {
this.remoteRegionRouter = new CenterAwareRouter(remoteRegionDataSourceConfigs, configManagerType,
idcAware);
}
}
}

public class CenterAwareRouter implements DataSourceRouter {
private DataSourceRouter localCenterRouter;

protected CenterAwareRouter(Map<String, DataSourceConfig> dataSourceConfigs, String configManagerType,
boolean idcAware) {
...
if (idcAware) {
localCenterSourceConfigs.putAll(localIdcSourceConfigs);
if (localCenterSourceConfigs.size() > 0) {
//IdcAwareRouter的节点必定在同中心
this.localCenterRouter = new IdcAwareRouter(localCenterSourceConfigs, configManagerType);
}
}
}
public RouterTarget select(RouterContext routerContext) {
RouterTarget routerTarget = null;
//存在同机房路由,则有同机房路由
if (localCenterRouter != null) {
routerTarget = localCenterRouter.select(routerContext);
}
if (routerTarget == null) {
for (WeightDataSourceRouter weightDataSourceRouter : priorityCenterIdcAwareRouters) {
routerTarget = weightDataSourceRouter.select(routerContext);
if (routerTarget != null) {
return routerTarget;
}
}
}
return routerTarget;
}
}

ShardDataSource

分库分表原理

ShardDataSource中的DefaultShardRouter负责分库分表:

1
2
3
4
//路由规则:根据分库分表规则和运行时参数,计算出应该到的库名和表名
RouterRule routerRule;
//负责RouterRule计算的结果改写SQL,形成带有物理表名的SQL
SQLRewrite sqlRewrite = new DefaultSQLRewrite();

执行路由的逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public RouterResult router(final String sql, List<Object> params) throws ShardRouterException, ShardParseException {
RouterResult routerResult = new RouterResult();
//解析SQL得出表名等信息
SQLParsedResult parsedResult = SQLParser.parseWithCache(sql);
//根据SQL解析结果找出对应表逻辑名的分片规则。
List<TableShardRule> findShardRules = findShardRules(parsedResult.getRouterContext(), params);
if (findShardRules.size() == 1) {
TableShardRule tableShardRule = findShardRules.get(0);
//执行逻辑表的分片规则获得物理库和表名
ShardEvalResult shardResult = tableShardRule.eval(new ShardEvalContext(parsedResult, params));
routerResult.setMergeContext(new MergeContext(parsedResult.getMergeContext()));
routerResult.setSqls(buildSqls(shardResult.getDbAndTables(), parsedResult, tableShardRule.getTableName()));
routerResult.setParams(buildParams(params, routerResult));
return routerResult;
}...
}

public ShardEvalResult eval(ShardEvalContext matchContext) {
ShardEvalResult result = new ShardEvalResult(matchContext.getTableName());
for (ColumnValue evalContext : matchContext.getColumnValues()) {
if (!evalContext.isUsed()) {
evalContext.setUsed(true);
//通过库规则,获得在库的索引号
Object dbObj = dbRuleEngine.eval(evalContext.getValue());
Number dbPos = (Number)dbObj;
TableSets tableSet = tablesMappingManager.getTableSetsByPos(dbPos.intValue());
//通过表规则,获得表的索引号
Number tablePos = (Number) tableRuleEngine.eval(evalContext.getValue());
String table = tableSet.getTableSets().get(tablePos.intValue());
result.addDbAndTable(tableSet.getDbIndex(), table);
}
}
return result;
}

zebra-dao

异步化

MyBatis中每个DAO都对应一个MapperProxy,zebra-dao中则是AsyncMapperProxy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class AsyncMapperProxy<T> implements InvocationHandler {
public Object invoke(Object proxy, Method method, Object[] args) {
// 是否是带有回调的方法
if (isCallbackMethod(method, args)) {
Method _method = getAnnotationMethod(method);
//...
Object[] newArgs = new Object[args.length - 1];
int i = 0;
AsyncDaoCallback callback = null;
for (Object arg : args) {
if (arg != null) {
if (!AsyncDaoCallback.class.isAssignableFrom(arg.getClass())) {
newArgs[i++] = arg;
} else {
callback = (AsyncDaoCallback) arg;
}
} else {
newArgs[i++] = arg;
}
}
//异步执行,并设置回调
AsyncMapperExecutor.executeRunnable(mapper, _method, newArgs, callback);
return null;
} else if (Future.class.isAssignableFrom(method.getReturnType())) {
//返回值是Future,则也异步发起请求
Method _method = getAnnotationMethod(method);
if (_method != null) {
return AsyncMapperExecutor.submitCallback(mapper, _method, args);
}
} else {
//直接调用
return method.invoke(mapper, args);
}
}
}

分页 - PageInterceptor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
Object[] args = invocation.getArgs();
Object rowBound = args[2];
MappedStatement ms = (MappedStatement) args[0];
if (rowBound != null) {
RowBounds rb = (RowBounds) rowBound;
// 无分页信息
if (rb.getOffset() == RowBounds.NO_ROW_OFFSET && rb.getLimit() == RowBounds.NO_ROW_LIMIT) {
return invocation.proceed();
} else {
BoundSql boundSql = ms.getBoundSql(args[1]);
if (rowBound instanceof PageModel) {
// 高级物理分页
PageModel pageModel = (PageModel) rowBound;
//首先获得次数
Object count = queryCount(invocation, args, ms, boundSql);
//获取结果
Object records = queryLimit(invocation, args, ms, boundSql, pageModel);
pageModel.setRecordCount((Integer) ((List<?>) count).get(0));
pageModel.setRecords((List<?>) records);
return null;
} else {
// 物理分页
return queryLimit(invocation, args, ms, boundSql, rb);
}
}
} else {
// without pagination
return invocation.proceed();
}
private Object queryCount(Invocation invocation, Object[] args, MappedStatement ms, BoundSql boundSql) {
//通过原始SQL生成select count(*)语句,通过SQL Parser解析SQL,再拼接SQL
String countSql = dialect.getCountSql(boundSql.getSql());
//生成新的SQL
BoundSql newBoundSql = new BoundSql(ms.getConfiguration(), countSql, boundSql.getParameterMappings(),
boundSql.getParameterObject());
//生成新的Statement 去执行select count(*)
countRowStatement = buildMappedStatement(ms, new SqlSqlSourceWrapper(newBoundSql), ms.getId() + "_COUNT",
resultMaps);

args[0] = countRowStatement;
args[2] = new RowBounds();
args[3] = null;
...
return invocation.proceed();
}

private Object queryLimit(Invocation invocation, Object[] args, MappedStatement ms, BoundSql boundSql, RowBounds rb) {
//生成带有limit的SQL
String limitSql = dialect.getLimitSql(boundSql.getSql(), rb.getOffset(), rb.getLimit());
BoundSql newBoundSql = new BoundSql(ms.getConfiguration(), limitSql, boundSql.getParameterMappings(),
boundSql.getParameterObject());
//构造新的Statement去执行分页语句
args[0] = buildMappedStatement(ms, new SqlSqlSourceWrapper(newBoundSql), ms.getId() + "_LIMIT",
ms.getResultMaps());
args[2] = new RowBounds();
args[3] = null;
return invocation.proceed();
}

SQL解析与改写

Zebra在之前的版本中使用Antlr解析SQL,后来的版本替换成了Druid的 Sql ParserDruid是个非常强大的工具,它支持达9种数据库类型SQL(db2|h2|mysql|hive|oracle…)。
这里我们以MySQL为例来介绍其SQL解析过程。

SQL的解析过程,分为词法分析语法分析

  • 词法分析用来识别词汇,如关键字等。Lexer就是用于词法分析的词法分析器。每个词法器都有自己的词汇库,KeyWords就是词汇库,且是所有数据库共用的词汇库。MySQLLexerLexer的子类。是用来专门解析MySQL的SQL语句。它的词汇库也在KeyWords加上了MySQL特有的关键字LIMITIDENTIFIED等;
  • 语法分析是在词法分析的基础上进行语法分析。生成一颗AST(abstract syntax tree抽象语法树)。且其过程中也会判断用户的输入是否符合语法逻辑;

先来看一个词法分析的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
String sql = "select * from order where cinema_id = 11 order by id, user_id";
Lexer lexer = new Lexer(sql);
for (;;) {
//触发下一个词识别,识别过程即在其中。(各种if|switch,有限状态机)
lexer.nextToken();
//获取识别的词
Token tok = lexer.token();
//标识符
if (tok == Token.IDENTIFIER) {
System.out.println(tok.name() + "\t\t" + lexer.stringVal());
//数字
} else if (tok == Token.LITERAL_INT) {
System.out.println(tok.name() + "\t\t" + lexer.numberString());
//其他类型
} else {
System.out.println(tok.name() + "\t\t\t" + tok.name);
}
//条件判断,以及其在SQL中的位置
if (tok == Token.WHERE) {
System.out.println("where pos : " + lexer.pos());
}
//结束标记
if (tok == Token.EOF) {
break;
}
}

其输出结果是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
SELECT			SELECT
STAR *
FROM FROM
ORDER ORDER
WHERE WHERE
where pos : 25
IDENTIFIER cinema_id
EQ =
LITERAL_INT 11
ORDER ORDER
BY BY
IDENTIFIER id
COMMA ,
IDENTIFIER user_id
EOF null

语法分析器在进行语法分析时,会根据词法分析器分析出的TOKEN、值来创建不同类型的节点,加到AST树中。
例如:

SELECT -> SQLSelect
Order by -> SQLOrderBy
* -> SQLSelectItem(每个结果集字段一个SQLSelectItem)
FROM -> SQLTableSource,SQLExprTableSource或者SQLSelect(嵌套查询)

以下面的例子来说明语法分析生成的结果:

1
select id, name, movie_id from order where cinema_id = 11 order by id, user_id

生成的语法树如下:

对AST树信息的访问,Druid定义一套接口SQLASTVisitor。业务方可以实现接口加入特定信息收集逻辑。调用SQLStatement.accept(visitor)visitor就会以类先序的顺序访问树上的所有节点。
下面给出一个自定义查找SQL中出现了哪些表名:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class TableNameASTVisitor implements MySqlASTVisitor {
List<String> tableNames = Lists.newArrayList();
@Override
public boolean visit(SQLExprTableSource x) {
SQLName name = x.getName();
if (name!= null) {
tableNames.add(name.getSimpleName());
}
return false;
}
@Override
public boolean visit(SQLSelect x) {
return true;
}
@Override
public boolean visit(SQLSelectStatement astNode) {
return true;
}
public List<String> getTableNames() {
return tableNames;
}
//...
}
String sql = "select id, name, movie_id from order where cinema_id = 11 order by id, user_id";
SQLStatementParser parser = new MySqlStatementParser(sql);
SQLStatement statement = parser.parseStatement();
TableNameASTVisitor visitor = new TableNameASTVisitor();
statement.accept(visitor);
System.out.println(visitor.getTableNames());
//输出结果为 [order]

ShardRouter实现SQL改写的方式就是实现了一个Visitor-SimpleRewriteTableOutputVisitor,在其内部,将逻辑表名改写为路由后的物理表名:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class SimpleRewriteTableOutputVisitor extends MySqlOutputVisitor {
private Map<String, String> tableMapping;
//访问保存表信息的节点
public boolean visit(SQLExprTableSource x) {
SQLName name = (SQLName) x.getExpr();
String simpleName = name.getSimpleName();
boolean hasQuote = simpleName.charAt(0) == '`';
//逻辑表名
String tableName = hasQuote ? parseTableName(simpleName) : simpleName;
//物理表名
String finalTable = tableMapping.get(tableName);
...
print0("`" + finalTable + "`");
...
}
}
您的支持是我创作源源不断的动力