本文主要基于 Sharding-JDBC 1.5.0 正式版
  • 1. 概述
  • 2. SQLRouteResult
  • 3. 路由策略 x 算法
  • 4. SQL 路由
  • 5. DatabaseHintSQLRouter
  • 6. ParsingSQLRouter
    • 6.1 SimpleRoutingEngine
    • 6.2 ComplexRoutingEngine
    • 6.3 CartesianRoutingEngine
    • 6.3 ParsingSQLRouter 主#route()
  • 666. 彩蛋

🙂🙂🙂关注微信公众号:【芋道源码】有福利:
  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

本文分享分表分库路由相关的实现。涉及内容如下:
  1. SQL 路由结果
  2. 路由策略 x 算法
  3. SQL 路由器
内容顺序如编号。
Sharding-JDBC 正在收集使用公司名单:传送门。

🙂 你的登记,会让更多人参与和使用 Sharding-JDBC。传送门

Sharding-JDBC 也会因此,能够覆盖更多的业务场景。传送门

登记吧,骚年!传送门
SQL 路由大体流程如下:

2. SQLRouteResult

经过 SQL解析SQL路由后,产生SQL路由结果,即 SQLRouteResult。根据路由结果,生成SQL执行SQL
  • sqlStatement :SQL语句对象,经过SQL解析的结果对象。
  • executionUnits :SQL最小执行单元集合。SQL执行时,执行每个单元。
  • generatedKeys :插入SQL语句生成的主键编号集合。目前不支持批量插入而使用集合的原因,猜测是为了未来支持批量插入做准备。

3. 路由策略 x 算法

ShardingStrategy,分片策略。目前支持两种分片:
分片资源:在分库策略里指的是库,在分表策略里指的是表。
【1】 计算静态分片(常用)
  1. // ShardingStrategy.java
  2. /**
  3. * 计算静态分片.
  4. * @param sqlType SQL语句的类型
  5. * @param availableTargetNames 所有的可用分片资源集合
  6. * @param shardingValues 分片值集合
  7. * @return 分库后指向的数据源名称集合
  8. */
  9. publicCollection<String> doStaticSharding(finalSQLType sqlType,finalCollection<String> availableTargetNames,finalCollection<ShardingValue<?>> shardingValues){
  10. Collection<String> result =newTreeSet<>(String.CASE_INSENSITIVE_ORDER);
  11. if(shardingValues.isEmpty()){
  12. Preconditions.checkState(!isInsertMultiple(sqlType, availableTargetNames),"INSERT statement should contain sharding value.");// 插入不能有多资源对象
  13.       result.addAll(availableTargetNames);
  14. }else{
  15.       result.addAll(doSharding(shardingValues, availableTargetNames));
  16. }
  17. return result;
  18. }
  19. /**
  20. * 插入SQL 是否插入多个分片
  21. * @param sqlType SQL类型
  22. * @param availableTargetNames 所有的可用分片资源集合
  23. * @return 是否
  24. */
  25. privateboolean isInsertMultiple(finalSQLType sqlType,finalCollection<String> availableTargetNames){
  26. returnSQLType.INSERT == sqlType && availableTargetNames.size()>1;
  27. }
  • 插入SQL 需要有片键值,否则无法判断单个分片资源。(Sharding-JDBC 目前仅支持单条记录插入)
【2】计算动态分片
  1. // ShardingStrategy.java
  2. /**
  3. * 计算动态分片.
  4. * @param shardingValues 分片值集合
  5. * @return 分库后指向的分片资源集合
  6. */
  7. publicCollection<String> doDynamicSharding(finalCollection<ShardingValue<?>> shardingValues){
  8. Preconditions.checkState(!shardingValues.isEmpty(),"Dynamic table should contain sharding value.");// 动态分片必须有分片值
  9. Collection<String> availableTargetNames =Collections.emptyList();
  10. Collection<String> result =newTreeSet<>(String.CASE_INSENSITIVE_ORDER);
  11.   result.addAll(doSharding(shardingValues, availableTargetNames));
  12. return result;
  13. }
  • 动态分片对应 TableRule.dynamic=true
  • 动态分片必须有分片值
😈 闷了,看起来两者没啥区别?答案在分片算法上。我们先看 #doSharding() 方法的实现。
  1. // ShardingStrategy.java
  2. /**
  3. * 计算分片
  4. * @param shardingValues 分片值集合
  5. * @param availableTargetNames 所有的可用分片资源集合
  6. * @return 分库后指向的分片资源集合
  7. */
  8. privateCollection<String> doSharding(finalCollection<ShardingValue<?>> shardingValues,finalCollection<String> availableTargetNames){
  9. // 无片键
  10. if(shardingAlgorithm instanceofNoneKeyShardingAlgorithm){
  11. returnCollections.singletonList(((NoneKeyShardingAlgorithm) shardingAlgorithm).doSharding(availableTargetNames, shardingValues.iterator().next()));
  12. }
  13. // 单片键
  14. if(shardingAlgorithm instanceofSingleKeyShardingAlgorithm){
  15. SingleKeyShardingAlgorithm<?> singleKeyShardingAlgorithm =(SingleKeyShardingAlgorithm<?>) shardingAlgorithm;
  16. ShardingValue shardingValue = shardingValues.iterator().next();
  17. switch(shardingValue.getType()){
  18. case SINGLE:
  19. returnCollections.singletonList(singleKeyShardingAlgorithm.doEqualSharding(availableTargetNames, shardingValue));
  20. case LIST:
  21. return singleKeyShardingAlgorithm.doInSharding(availableTargetNames, shardingValue);
  22. case RANGE:
  23. return singleKeyShardingAlgorithm.doBetweenSharding(availableTargetNames, shardingValue);
  24. default:
  25. thrownewUnsupportedOperationException(shardingValue.getType().getClass().getName());
  26. }
  27. }
  28. // 多片键
  29. if(shardingAlgorithm instanceofMultipleKeysShardingAlgorithm){
  30. return((MultipleKeysShardingAlgorithm) shardingAlgorithm).doSharding(availableTargetNames, shardingValues);
  31. }
  32. thrownewUnsupportedOperationException(shardingAlgorithm.getClass().getName());
  33. }
  • 无分片键算法:对应 NoneKeyShardingAlgorithm 分片算法接口。
  1. publicinterfaceNoneKeyShardingAlgorithm<T extendsComparable<?>>extendsShardingAlgorithm{
  2. String doSharding(Collection<String> availableTargetNames,ShardingValue<T> shardingValue);
  3. }
  • 单片键算法:对应 SingleKeyShardingAlgorithm 分片算法接口。
  1. publicinterfaceSingleKeyShardingAlgorithm<T extendsComparable<?>>extendsShardingAlgorithm{
  2. String doEqualSharding(Collection<String> availableTargetNames,ShardingValue<T> shardingValue);
  3. Collection<String> doInSharding(Collection<String> availableTargetNames,ShardingValue<T> shardingValue);
  4. Collection<String> doBetweenSharding(Collection<String> availableTargetNames,ShardingValue<T> shardingValue);
  5. }
ShardingValueTypeSQL 操作符接口方法
SINGLE=#doEqualSharding()
LISTIN#doInSharding()
RANGEBETWEEN#doBetweenSharding()
  • 多片键算法:对应 MultipleKeysShardingAlgorithm 分片算法接口。
  1. publicinterfaceMultipleKeysShardingAlgorithmextendsShardingAlgorithm{
  2. Collection<String> doSharding(Collection<String> availableTargetNames,Collection<ShardingValue<?>> shardingValues);
  3. }
分片算法类结构如下:
来看看 Sharding-JDBC 实现的无需分库的分片算法 NoneDatabaseShardingAlgorithm (NoneTableShardingAlgorithm 基本一模一样):
  1. publicfinalclassNoneDatabaseShardingAlgorithmimplementsSingleKeyDatabaseShardingAlgorithm<String>,MultipleKeysDatabaseShardingAlgorithm{
  2. @Override
  3. publicCollection<String> doSharding(finalCollection<String> availableTargetNames,finalCollection<ShardingValue<?>> shardingValues){
  4. return availableTargetNames;
  5. }
  6. @Override
  7. publicString doEqualSharding(finalCollection<String> availableTargetNames,finalShardingValue<String> shardingValue){
  8. return availableTargetNames.isEmpty()?null: availableTargetNames.iterator().next();
  9. }
  10. @Override
  11. publicCollection<String> doInSharding(finalCollection<String> availableTargetNames,finalShardingValue<String> shardingValue){
  12. return availableTargetNames;
  13. }
  14. @Override
  15. publicCollection<String> doBetweenSharding(finalCollection<String> availableTargetNames,finalShardingValue<String> shardingValue){
  16. return availableTargetNames;
  17. }
  18. }
  • 一定要注意,NoneXXXXShardingAlgorithm 只适用于无分库/表的需求,否则会是错误的路由结果。例如, #doEqualSharding() 返回的是第一个分片资源。

再来看测试目录下实现的余数基偶分表算法 ModuloTableShardingAlgorithm 的实现:
  1. // com.dangdang.ddframe.rdb.integrate.fixture.ModuloTableShardingAlgorithm.java
  2. publicfinalclassModuloTableShardingAlgorithmimplementsSingleKeyTableShardingAlgorithm<Integer>{
  3. @Override
  4. publicString doEqualSharding(finalCollection<String> tableNames,finalShardingValue<Integer> shardingValue){
  5. for(String each : tableNames){
  6. if(each.endsWith(shardingValue.getValue()%2+"")){
  7. return each;
  8. }
  9. }
  10. thrownewUnsupportedOperationException();
  11. }
  12. @Override
  13. publicCollection<String> doInSharding(finalCollection<String> tableNames,finalShardingValue<Integer> shardingValue){
  14. Collection<String> result =newLinkedHashSet<>(tableNames.size());
  15. for(Integer value : shardingValue.getValues()){
  16. for(String tableName : tableNames){
  17. if(tableName.endsWith(value %2+"")){
  18.                    result.add(tableName);
  19. }
  20. }
  21. }
  22. return result;
  23. }
  24. @Override
  25. publicCollection<String> doBetweenSharding(finalCollection<String> tableNames,finalShardingValue<Integer> shardingValue){
  26. Collection<String> result =newLinkedHashSet<>(tableNames.size());
  27. Range<Integer> range = shardingValue.getValueRange();
  28. for(Integer i = range.lowerEndpoint(); i <= range.upperEndpoint(); i++){
  29. for(String each : tableNames){
  30. if(each.endsWith(i %2+"")){
  31.                    result.add(each);
  32. }
  33. }
  34. }
  35. return result;
  36. }
  37. }
  • 我们可以参考这个例子编写自己的分片算哟 👼。
  • 多片键分库算法接口实现例子:MultipleKeysModuloDatabaseShardingAlgorithm.java

😈 来看看动态计算分片需要怎么实现分片算法。
  1. // com.dangdang.ddframe.rdb.integrate.fixture.SingleKeyDynamicModuloTableShardingAlgorithm.java
  2. publicfinalclassSingleKeyDynamicModuloTableShardingAlgorithmimplementsSingleKeyTableShardingAlgorithm<Integer>{
  3. /**
  4.    * 表前缀
  5.    */
  6. privatefinalString tablePrefix;
  7. @Override
  8. publicString doEqualSharding(finalCollection<String> availableTargetNames,finalShardingValue<Integer> shardingValue){
  9. return tablePrefix + shardingValue.getValue()%10;
  10. }
  11. @Override
  12. publicCollection<String> doInSharding(finalCollection<String> availableTargetNames,finalShardingValue<Integer> shardingValue){
  13. Collection<String> result =newLinkedHashSet<>(shardingValue.getValues().size());
  14. for(Integer value : shardingValue.getValues()){
  15.            result.add(tablePrefix + value %10);
  16. }
  17. return result;
  18. }
  19. @Override
  20. publicCollection<String> doBetweenSharding(finalCollection<String> availableTargetNames,finalShardingValue<Integer> shardingValue){
  21. Collection<String> result =newLinkedHashSet<>(availableTargetNames.size());
  22. Range<Integer> range = shardingValue.getValueRange();
  23. for(Integer i = range.lowerEndpoint(); i <= range.upperEndpoint(); i++){
  24.            result.add(tablePrefix + i %10);
  25. }
  26. return result;
  27. }
  28. }
  • 骚年,是不是明白了一些?动态表无需把真实表配置到 TableRule,而是通过分片算法计算出真实表

4. SQL 路由

SQLRouter,SQL 路由器接口,共有两种实现:
  • DatabaseHintSQLRouter:通过提示且仅路由至数据库的SQL路由器
  • ParsingSQLRouter:需要解析的SQL路由器
它们实现 #parse()进行SQL解析#route()进行SQL路由

RoutingEngine,路由引擎接口,共有四种实现:
  • DatabaseHintRoutingEngine:基于数据库提示的路由引擎
  • SimpleRoutingEngine:简单路由引擎
  • CartesianRoutingEngine:笛卡尔积的库表路由
  • ComplexRoutingEngine:混合多库表路由引擎
ComplexRoutingEngine 根据路由结果会转化成 SimpleRoutingEngine 或 ComplexRoutingEngine。下文会看相应源码。

路由结果有两种:
  • RoutingResult:简单路由结果
  • CartesianRoutingResult:笛卡尔积路由结果
从图中,我们已经能大概看到两者有什么区别,更具体的下文随源码一起分享。
😈 SQLRouteResult 和 RoutingResult 有什么区别?
  • SQLRouteResult:整个SQL路由返回的路由结果
  • RoutingResult:RoutingEngine返回路由结果

一下子看到这么多"对象",可能有点紧张。不要紧张,我们一起在整理下。
路由器路由引擎路由结果
DatabaseHintSQLRouterDatabaseHintRoutingEngineRoutingResult
ParsingSQLRouterSimpleRoutingEngineRoutingResult
ParsingSQLRouterCartesianRoutingEngineCartesianRoutingResult
😈 逗比博主给大家解决了"对象",是不是应该分享朋友圈

5. DatabaseHintSQLRouter

DatabaseHintSQLRouter,基于数据库提示的路由引擎。路由器工厂 SQLRouterFactory 创建路由器时,判断到使用数据库提示( Hint ) 时,创建 DatabaseHintSQLRouter。
  1. // DatabaseHintRoutingEngine.java
  2. publicstaticSQLRouter createSQLRouter(finalShardingContext shardingContext){
  3. returnHintManagerHolder.isDatabaseShardingOnly()?newDatabaseHintSQLRouter(shardingContext):newParsingSQLRouter(shardingContext);
  4. }
先来看下 HintManagerHolder、HintManager 部分相关的代码:
  1. // HintManagerHolder.java
  2. publicfinalclassHintManagerHolder{
  3. /**
  4.     * HintManager 线程变量
  5.     */
  6. privatestaticfinalThreadLocal<HintManager> HINT_MANAGER_HOLDER =newThreadLocal<>();
  7. /**
  8.     * 判断是否当前只分库.
  9.     *
  10.     * @return 是否当前只分库.
  11.     */
  12. publicstaticboolean isDatabaseShardingOnly(){
  13. returnnull!= HINT_MANAGER_HOLDER.get()&& HINT_MANAGER_HOLDER.get().isDatabaseShardingOnly();
  14. }
  15. /**
  16.     * 清理线索分片管理器的本地线程持有者.
  17.     */
  18. publicstaticvoid clear(){
  19.        HINT_MANAGER_HOLDER.remove();
  20. }
  21. }
  22. // HintManager.java
  23. publicfinalclassHintManagerimplementsAutoCloseable{
  24. /**
  25.     * 库分片值集合
  26.     */
  27. privatefinalMap<ShardingKey,ShardingValue<?>> databaseShardingValues =newHashMap<>();
  28. /**
  29.     * 只做库分片
  30.     * {@link DatabaseHintRoutingEngine}
  31.     */
  32. @Getter
  33. privateboolean databaseShardingOnly;
  34. /**
  35.     * 获取线索分片管理器实例.
  36.     *
  37.     * @return 线索分片管理器实例
  38.     */
  39. publicstaticHintManager getInstance(){
  40. HintManager result =newHintManager();
  41. HintManagerHolder.setHintManager(result);
  42. return result;
  43. }
  44. /**
  45.     * 设置分库分片值.
  46.     *
  47.     * <p>分片操作符为等号.该方法适用于只分库的场景</p>
  48.     *
  49.     * @param value 分片值
  50.     */
  51. publicvoid setDatabaseShardingValue(finalComparable<?> value){
  52.        databaseShardingOnly =true;
  53.        addDatabaseShardingValue(HintManagerHolder.DB_TABLE_NAME,HintManagerHolder.DB_COLUMN_NAME, value);
  54. }
  55. }
那么如果要使用 DatabaseHintSQLRouter,我们只需要 HintManager.getInstance().setDatabaseShardingValue(库分片值) 即可。这里有两点要注意下:
  • HintManager#getInstance(),每次获取到的都是的 HintManager,多次赋值需要小心。
  • HintManager#close(),使用完需要去清理,避免下个请求读到遗漏的线程变量。

看看 DatabaseHintSQLRouter 的实现:
  1. // DatabaseHintSQLRouter.java
  2. @Override
  3. publicSQLStatement parse(finalString logicSQL,finalint parametersSize){
  4. returnnewSQLJudgeEngine(logicSQL).judge();// 只解析 SQL 类型
  5. }
  6. @Override
  7. // TODO insert的SQL仍然需要解析自增主键
  8. publicSQLRouteResult route(finalString logicSQL,finalList<Object> parameters,finalSQLStatement sqlStatement){
  9. Context context =MetricsContext.start("Route SQL");
  10. SQLRouteResult result =newSQLRouteResult(sqlStatement);
  11. // 路由
  12. RoutingResult routingResult =newDatabaseHintRoutingEngine(shardingRule.getDataSourceRule(), shardingRule.getDatabaseShardingStrategy(), sqlStatement.getType())
  13. .route();
  14. // SQL最小执行单元
  15. for(TableUnit each : routingResult.getTableUnits().getTableUnits()){
  16.       result.getExecutionUnits().add(newSQLExecutionUnit(each.getDataSourceName(), logicSQL));
  17. }
  18. MetricsContext.stop(context);
  19. if(showSQL){
  20. SQLLogger.logSQL(logicSQL, sqlStatement, result.getExecutionUnits(), parameters);
  21. }
  22. return result;
  23. }
  • #parse() 只解析了 SQL 类型,即 SELECT / UPDATE / DELETE / INSERT 。
  • 使用的分库策略来自 ShardingRule,不是 TableRule,这个一定要留心。❓因为 SQL 未解析表名。因此,即使在 TableRule 设置了 actualTables 属性也是没有效果的。
  • 目前不支持 Sharding-JDBC 的主键自增。❓因为 SQL 未解析自增主键。从代码上的 TODO应该会支持。
  • HintManager.getInstance().setDatabaseShardingValue(库分片值) 设置的库分片值使用的是 EQUALS,因而分库策略计算出来的只有一个库分片,即 TableUnit 只有一个,SQLExecutionUnit 只有一个。

看看 DatabaseHintSQLRouter 的实现:
  1. // DatabaseHintRoutingEngine.java
  2. @Override
  3. publicRoutingResult route(){
  4. // 从 Hint 获得 分片键值
  5. Optional<ShardingValue<?>> shardingValue =HintManagerHolder.getDatabaseShardingValue(newShardingKey(HintManagerHolder.DB_TABLE_NAME,HintManagerHolder.DB_COLUMN_NAME));
  6. Preconditions.checkState(shardingValue.isPresent());
  7.   log.debug("Before database sharding only db:{} sharding values: {}", dataSourceRule.getDataSourceNames(), shardingValue.get());
  8. // 路由。表分片规则使用的是 ShardingRule 里的。因为没 SQL 解析。
  9. Collection<String> routingDataSources = databaseShardingStrategy.doStaticSharding(sqlType, dataSourceRule.getDataSourceNames(),Collections.<ShardingValue<?>>singleton(shardingValue.get()));
  10. Preconditions.checkState(!routingDataSources.isEmpty(),"no database route info");
  11.   log.debug("After database sharding only result: {}", routingDataSources);
  12. // 路由结果
  13. RoutingResult result =newRoutingResult();
  14. for(String each : routingDataSources){
  15.       result.getTableUnits().getTableUnits().add(newTableUnit(each,"",""));
  16. }
  17. return result;
  18. }
  • 调用 databaseShardingStrategy.doStaticSharding() 方法计算分片。
  • newTableUnit(each,"","") 的 logicTableName, actualTableName 都是空串,相信原因你已经知道。

6. ParsingSQLRouter

ParsingSQLRouter,需要解析的SQL路由器。
ParsingSQLRouter 使用 SQLParsingEngine 解析SQL。对SQL解析有兴趣的同学可以看看拙作《Sharding-JDBC 源码分析 —— SQL 解析》。
  1. // ParsingSQLRouter.java
  2. publicSQLStatement parse(finalString logicSQL,finalint parametersSize){
  3. SQLParsingEngine parsingEngine =newSQLParsingEngine(databaseType, logicSQL, shardingRule);
  4. Context context =MetricsContext.start("Parse SQL");
  5. SQLStatement result = parsingEngine.parse();
  6. if(result instanceofInsertStatement){
  7. ((InsertStatement) result).appendGenerateKeyToken(shardingRule, parametersSize);
  8. }
  9. MetricsContext.stop(context);
  10. return result;
  11. }
  • #appendGenerateKeyToken() 会在《SQL 改写》分享

ParsingSQLRouter 在路由时,会根据表情况使用 SimpleRoutingEngine 或 CartesianRoutingEngine 进行路由。
  1. privateRoutingResult route(finalList<Object> parameters,finalSQLStatement sqlStatement){
  2. Collection<String> tableNames = sqlStatement.getTables().getTableNames();
  3. RoutingEngine routingEngine;
  4. if(1== tableNames.size()|| shardingRule.isAllBindingTables(tableNames)){
  5.       routingEngine =newSimpleRoutingEngine(shardingRule, parameters, tableNames.iterator().next(), sqlStatement);
  6. }else{
  7. // TODO 可配置是否执行笛卡尔积
  8.       routingEngine =newComplexRoutingEngine(shardingRule, parameters, tableNames, sqlStatement);
  9. }
  10. return routingEngine.route();
  11. }
  • 当只进行一张表或者多表互为BindingTable关系时,使用 SimpleRoutingEngine 简单路由引擎。多表互为BindingTable关系时,每张表的路由结果是相同的,所以只要计算第一张表的分片即可。
  • tableNames.iterator().next() 注意下, tableNames 变量是 newTreeMap<>(String.CASE_INSENSITIVE_ORDER)。所以 SELECT*FROM t_order o join t_order_item i ON o.order_id=i.order_id 即使 t_order_item 排在 t_order 前面, tableNames.iterator().next() 返回的是 t_order。当 t_order 和 t_order_item 为 BindingTable关系 时,计算的是 t_order 路由分片。
  • BindingTable关系在 ShardingRule 的 tableRules 配置。配置该关系 TableRule 有如下需要遵守的规则:
    • 分片策略与算法相同
    • 数据源配置对象相同
    • 真实表数量相同
举个例子
  • SQL : SELECT*FROM t_order o join t_order_item i ON o.order_id=i.order_id
  • 分库分表情况:
  1. multi_db_multi_table_01
  2. ├── t_order_0                        ├── t_order_item_01
  3. └── t_order_1                        ├── t_order_item_02
  4. ├── t_order_item_03
  5. ├── t_order_item_04
  6. multi_db_multi_table_02
  7. ├── t_order_0                        ├── t_order_item_01
  8. └── t_order_1                        ├── t_order_item_02
  9. ├── t_order_item_03
  10. ├── t_order_item_04
最终执行的SQL如下:
  1. SELECT * FROM t_order_item_01 i JOIN t_order_01 o ON o.order_id = i.order_id
  2. SELECT * FROM t_order_item_01 i JOIN t_order_01 o ON o.order_id = i.order_id
  3. SELECT * FROM t_order_item_02 i JOIN t_order_02 o ON o.order_id = i.order_id
  4. SELECT * FROM t_order_item_02 i JOIN t_order_02 o ON o.order_id = i.order_id
  • t_order_item_03、 t_order_item_04 无法被查询到。
下面我们看看 #isAllBindingTables() 如何实现多表互为BindingTable关系
  1. // ShardingRule.java
  2. // 调用顺序 #isAllBindingTables()=>#filterAllBindingTables()=>#findBindingTableRule()=>#findBindingTableRule()
  3. /**
  4. * 判断逻辑表名称集合是否全部属于Binding表.
  5. * @param logicTables 逻辑表名称集合
  6. */
  7. publicboolean isAllBindingTables(finalCollection<String> logicTables){
  8. Collection<String> bindingTables = filterAllBindingTables(logicTables);
  9. return!bindingTables.isEmpty()&& bindingTables.containsAll(logicTables);
  10. }
  11. /**
  12. * 过滤出所有的Binding表名称.
  13. */
  14. publicCollection<String> filterAllBindingTables(finalCollection<String> logicTables){
  15. if(logicTables.isEmpty()){
  16. returnCollections.emptyList();
  17. }
  18. Optional<BindingTableRule> bindingTableRule = findBindingTableRule(logicTables);
  19. if(!bindingTableRule.isPresent()){
  20. returnCollections.emptyList();
  21. }
  22. // 交集
  23. Collection<String> result =newArrayList<>(bindingTableRule.get().getAllLogicTables());
  24.   result.retainAll(logicTables);
  25. return result;
  26. }
  27. /**
  28. * 获得包含<strong>任一</strong>在逻辑表名称集合的binding表配置的逻辑表名称集合
  29. */
  30. privateOptional<BindingTableRule> findBindingTableRule(finalCollection<String> logicTables){
  31. for(String each : logicTables){
  32. Optional<BindingTableRule> result = findBindingTableRule(each);
  33. if(result.isPresent()){
  34. return result;
  35. }
  36. }
  37. returnOptional.absent();
  38. }
  39. /**
  40. * 根据逻辑表名称获取binding表配置的逻辑表名称集合.
  41. */
  42. publicOptional<BindingTableRule> findBindingTableRule(finalString logicTable){
  43. for(BindingTableRule each : bindingTableRules){
  44. if(each.hasLogicTable(logicTable)){
  45. returnOptional.of(each);
  46. }
  47. }
  48. returnOptional.absent();
  49. }
  • 逻辑看起来比较长,目的是找到一条 BindingTableRule 包含所有逻辑表集合
  • 不支持《传递关系》:配置 BindingTableRule 时,相同绑定关系一定要配置在一条,必须是 [a,b,c],而不能是 [a,b],[b,c]

6.1 SimpleRoutingEngine

SimpleRoutingEngine,简单路由引擎。
  1. // SimpleRoutingEngine.java
  2. // ... 超过微信30000字限制,省略代码。请点击原文阅读。
  • 可以使用 HintManager 设置分片值进行强制路由
  • #getShardingValues() 我们看到了《SQL 解析(二)之SQL解析》分享的 Condition 对象。之前我们提到过Parser 半理解SQL的目的之一是:提炼分片上下文,此处即是该目的的体现。Condition 里只放明确影响路由的条件,例如: order_id=1order_id IN(1,2)order_id BETWEEN(1,3),不放无法计算的条件,例如: o.order_id=i.order_id。该方法里,使用分片键从 Condition 查找 分片值。🙂 是不是对 Condition 的认识更加清晰一丢丢落。
  1. // SimpleRoutingEngine.java
  2. // ... 超过微信30000字限制,省略代码。请点击原文阅读。
  • 可以使用 HintManager 设置分片值进行强制路由
  • 根据 dynamic 属性来判断调用 #doDynamicSharding() 还是 #doStaticSharding() 计算分片。
  1. // SimpleRoutingEngine.java
  2. // ... 超过微信30000字限制,省略代码。请点击原文阅读。
  • 在 SimpleRoutingEngine 只生成了当前表的 TableUnits。如果存在与其互为BindingTable关系的表的 TableUnits 怎么获得?你可以想想噢,当然在后文《SQL 改写》也会给出答案,看看和你想的是否一样。

6.2 ComplexRoutingEngine

ComplexRoutingEngine,混合多库表路由引擎。
  1. // ComplexRoutingEngine.java
  2. // ... 超过微信30000字限制,省略代码。请点击原文阅读。
  • ComplexRoutingEngine 计算每个逻辑表的简单路由分片,路由结果交给 CartesianRoutingEngine 继续路由形成笛卡尔积结果。
  • 由于目前 ComplexRoutingEngine 路由前已经判断全部表互为 BindingTable 关系,因而不会出现 result.size==1,属于防御性编程。
  • 部分表互为 BindingTable 关系时,ComplexRoutingEngine 不重复计算分片。

6.3 CartesianRoutingEngine

CartesianRoutingEngine,笛卡尔积的库表路由。
实现逻辑上相对复杂,请保持耐心哟,😈 其实目的就是实现连连看的效果:
  • RoutingResult[0] x RoutingResult[1] …… x RoutingResult[n- 1] x RoutingResult[n]
  • 同库 才可以进行笛卡尔积
  1. // CartesianRoutingEngine.java
  2. // ... 超过微信30000字限制,省略代码。请点击原文阅读。
  • 第一步,获得同库对应的逻辑表集合,即 Entry<数据源(库), Set<逻辑表>> entry
  • 第二步,遍历数据源(库),获得当前数据源(库)路由表单元分组
  • 第三步,对路由表单元分组进行笛卡尔积,并合并到路由结果。
下面,我们一起逐步看看代码实现。
  • SQL : SELECT*FROM t_order o join t_order_item i ON o.order_id=i.order_id
  • 分库分表情况:
  1. multi_db_multi_table_01
  2. ├── t_order_0                        ├── t_order_item_01
  3. └── t_order_1                        ├── t_order_item_02
  4. multi_db_multi_table_02
  5. ├── t_order_0                        ├── t_order_item_01
  6. └── t_order_1                        ├── t_order_item_02
  1. // 第一步
  2. // CartesianRoutingEngine.java
  3. /**
  4. * 获得同库对应的逻辑表集合
  5. */
  6. // ... 超过微信30000字限制,省略代码。请点击原文阅读。
  • #getDataSourceLogicTablesMap() 返回如图:

  1. // 第二步
  2. // CartesianRoutingEngine.java
  3. privateList<Set<String>> getActualTableGroups(finalString dataSource,finalSet<String> logicTables){
  4. List<Set<String>> result =newArrayList<>(logicTables.size());
  5. for(RoutingResult each : routingResults){
  6.       result.addAll(each.getTableUnits().getActualTableNameGroups(dataSource, logicTables));
  7. }
  8. return result;
  9. }
  10. privateList<Set<TableUnit>> toTableUnitGroups(finalString dataSource,finalList<Set<String>> actualTableGroups){
  11. List<Set<TableUnit>> result =newArrayList<>(actualTableGroups.size());
  12. for(Set<String> each : actualTableGroups){
  13.       result.add(newHashSet<>(Lists.transform(newArrayList<>(each),newFunction<String,TableUnit>(){
  14. @Override
  15. publicTableUnit apply(finalString input){
  16. return findTableUnit(dataSource, input);
  17. }
  18. })));
  19. }
  20. return result;
  21. }
  • #getActualTableGroups() 返回如图:
  • #toTableUnitGroups() 返回如图:

  1. // CartesianRoutingEngine.java
  2. privateList<CartesianTableReference> getCartesianTableReferences(finalSet<List<TableUnit>> cartesianTableUnitGroups){
  3. List<CartesianTableReference> result =newArrayList<>(cartesianTableUnitGroups.size());
  4. for(List<TableUnit> each : cartesianTableUnitGroups){
  5.       result.add(newCartesianTableReference(each));
  6. }
  7. return result;
  8. }
  9. // CartesianRoutingResult.java
  10. @Getter
  11. privatefinalList<CartesianDataSource> routingDataSources =newArrayList<>();
  12. void merge(finalString dataSource,finalCollection<CartesianTableReference> routingTableReferences){
  13. for(CartesianTableReference each : routingTableReferences){
  14.       merge(dataSource, each);
  15. }
  16. }
  17. privatevoid merge(finalString dataSource,finalCartesianTableReference routingTableReference){
  18. for(CartesianDataSource each : routingDataSources){
  19. if(each.getDataSource().equalsIgnoreCase(dataSource)){
  20.           each.getRoutingTableReferences().add(routingTableReference);
  21. return;
  22. }
  23. }
  24.   routingDataSources.add(newCartesianDataSource(dataSource, routingTableReference));
  25. }
  • Sets.cartesianProduct(tableUnitGroups) 返回如图(Guava 工具库真强大):
  • #getCartesianTableReferences() 返回如图:
    CartesianTableReference,笛卡尔积表路由组,包含多条 TableUnit,即 TableUnit[0] x TableUnit[1] …… x TableUnit[n]。例如图中: t_order_01 x t_order_item_02,最终转换成 SQL 为 SELECT*FROM t_order_01 o join t_order_item_02 i ON o.order_id=i.order_id
  • #merge() 合并笛卡尔积路由结果。CartesianRoutingResult 包含多个 CartesianDataSource,因此需要将 CartesianTableReference 合并(添加)到对应的 CartesianDataSource。当然,目前在实现时已经是按照数据源(库)生成对应的 CartesianTableReference。

6.4 ParsingSQLRouter 主#route()

  1. // ParsingSQLRouter.java
  2. // ... 超过微信30000字限制,省略代码。请点击原文阅读。
  • RoutingResultroutingResult=route(parameters,sqlStatement);调用的就是上文分析的 SimpleRoutingEngine、ComplexRoutingEngine、CartesianRoutingEngine 的 #route() 方法。
  • #processGeneratedKey()、 #processLimit()、 #rewrite()、 #generateSQL() 等会放在《SQL 改写》 分享。

666. 彩蛋

篇幅有些长,希望能让大家对
路由
有比较完整的认识。

如果内容有错误,烦请您指正,我会
认真
修改。

如果表述不清晰,不太理解的,欢迎加我微信(wangwenbin-server)一起探讨。
谢谢你技术这么好,还耐心看完了本文。
强制路由 HintManager 讲的相对略过,可以看如下内容进一步了解:
  1. 《官方文档-强制路由》
  2. HintManager.java 源码
厚着脸皮,道友,辛苦分享朋友圈可好?!
继续阅读
阅读原文