本文主要基于 Sharding-JDBC 1.5.0 正式版
  • 1. 概述
  • 2. SQLToken
  • 3.SQL 改写
    • 3.4.1 分页补充
    • 3.1 TableToken
    • 3.2 ItemsToken
    • 3.3 OffsetToken
    • 3.4 RowCountToken
    • 3.5 OrderByToken
    • 3.6 GeneratedKeyToken
  • 4. SQL 生成
  • 666. 彩蛋
🙂🙂🙂关注微信公众号:【芋道源码】有福利:
  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

前置阅读:《SQL 解析(三)之查询SQL》
本文分享SQL 改写的源码实现。主要涉及两方面:
  1. SQL 改写:改写 SQL,解决分库分表后,查询结果需要聚合,需要对 SQL 进行调整,例如分页
  2. SQL 生成:生成分表分库的执行 SQL
SQLRewriteEngine,SQL重写引擎,实现 SQL 改写、生成功能。从 Sharding-JDBC 1.5.0 版本,SQL 改写进行了调整和大量优化。
1.4.x及之前版本,SQL改写是在SQL路由之前完成的,在1.5.x中调整为SQL路由之后,因为SQL改写可以根据路由至单库表还是多库表而进行进一步优化。
😆 很多同学看完《SQL 解析-系列》 可能是一脸懵逼,特别对“SQL 半理解”
希望本文能给你一些启发。
Sharding-JDBC 正在收集使用公司名单:传送门。

🙂 你的登记,会让更多人参与和使用 Sharding-JDBC。传送门

Sharding-JDBC 也会因此,能够覆盖更多的业务场景。传送门

登记吧,骚年!传送门

2. SQLToken

😁 SQLToken 在本文中很重要,所以即使在《SQL 解析-系列》已经分享过,我们也换个姿势,再来一次。
SQLToken,SQL标记对象接口。SQLRewriteEngine 基于 SQLToken 实现 SQL改写。SQL解析器在 SQL解析过程中,很重要的一个目的是标记需要SQL改写的部分,也就是 SQLToken。
各 SQLToken 生成条件如下(悲伤,做成表格形式排版是乱的):
  1. GeneratedKeyToken 自增主键标记对象
    • 插入SQL自增列不存在: INSERT INTO t_order(nickname)VALUES... 中没有自增列 order_id
  2. TableToken 表标记对象
    • 查询列的表别名: SELECT o.order_id 的 o
    • 查询的表名: SELECT*FROM t_order 的 t_order
  3. ItemsToken 选择项标记对象
    • AVG查询列: SELECT AVG(price)FROM t_order 的 AVG(price)
    • ORDER BY 字段不在查询列: SELECT order_id FROM t_order ORDER BY create_time 的 create_time
    • GROUP BY 字段不在查询列: SELECT COUNT(order_id)FROM t_order GROUP BY user_id 的 user_id
    • 自增主键未在插入列中: INSERT INTO t_order(nickname)VALUES... 中没有自增列 order_id
  4. OffsetToken 分页偏移量标记对象
    • 分页有偏移量,但不是占位符 ?
  5. RowCountToken 分页长度标记对象
    • 分页有长度,但不是占位符 ?
  6. OrderByToken 排序标记对象
    • 有 GROUP BY 条件,无 ORDER BY 条件: SELECT COUNT(*)FROM t_order GROUP BY order_id 的 order_id

3.SQL 改写

SQLRewriteEngine#rewrite() 实现了 SQL改写 功能。
  1. // SQLRewriteEngine.java
  2. /**
  3. * SQL改写.
  4. * @param isRewriteLimit 是否重写Limit
  5. * @return SQL构建器
  6. */
  7. publicSQLBuilder rewrite(finalboolean isRewriteLimit){
  8. SQLBuilder result =newSQLBuilder();
  9. if(sqlTokens.isEmpty()){
  10.       result.appendLiterals(originalSQL);
  11. return result;
  12. }
  13. int count =0;
  14. // 排序SQLToken,按照 beginPosition 递增
  15.   sortByBeginPosition();
  16. for(SQLToken each : sqlTokens){
  17. if(0== count){// 拼接第一个 SQLToken 前的字符串
  18.           result.appendLiterals(originalSQL.substring(0, each.getBeginPosition()));
  19. }
  20. // 拼接每个SQLToken
  21. if(each instanceofTableToken){
  22.           appendTableToken(result,(TableToken) each, count, sqlTokens);
  23. }elseif(each instanceofItemsToken){
  24.           appendItemsToken(result,(ItemsToken) each, count, sqlTokens);
  25. }elseif(each instanceofRowCountToken){
  26.           appendLimitRowCount(result,(RowCountToken) each, count, sqlTokens, isRewriteLimit);
  27. }elseif(each instanceofOffsetToken){
  28.           appendLimitOffsetToken(result,(OffsetToken) each, count, sqlTokens, isRewriteLimit);
  29. }elseif(each instanceofOrderByToken){
  30.           appendOrderByToken(result);
  31. }
  32.       count++;
  33. }
  34. return result;
  35. }
  • SQL改写以 SQLToken 为间隔顺序改写。
    • 顺序:调用 #sortByBeginPosition() 将 SQLToken 按照 beginPosition升序
    • 间隔:遍历 SQLToken,逐个拼接。
例如:

SQLBuilder,SQL构建器。下文会大量用到,我们看下实现代码。
  1. publicfinalclassSQLBuilder{
  2. /**
  3.     * 段集合
  4.     */
  5. privatefinalList<Object> segments;
  6. /**
  7.     * 当前段
  8.     */
  9. privateStringBuilder currentSegment;
  10. publicSQLBuilder(){
  11.        segments =newLinkedList<>();
  12.        currentSegment =newStringBuilder();
  13.        segments.add(currentSegment);
  14. }
  15. /**
  16.     * 追加字面量.
  17.     *
  18.     * @param literals 字面量
  19.     */
  20. publicvoid appendLiterals(finalString literals){
  21.        currentSegment.append(literals);
  22. }
  23. /**
  24.     * 追加表占位符.
  25.     *
  26.     * @param tableName 表名称
  27.     */
  28. publicvoid appendTable(finalString tableName){
  29. // 添加 TableToken
  30.        segments.add(newTableToken(tableName));
  31. // 新建当前段
  32.        currentSegment =newStringBuilder();
  33.        segments.add(currentSegment);
  34. }
  35. publicString toSQL(finalMap<String,String> tableTokens){
  36. // ... 省略代码,【SQL生成】处分享
  37. }
  38. @RequiredArgsConstructor
  39. privateclassTableToken{
  40. /**
  41.         * 表名
  42.         */
  43. privatefinalString tableName;
  44. }
  45. }

现在我们来逐个分析每种 SQLToken 的拼接实现。

3.1 TableToken

调用 #appendTableToken() 方法拼接。
  1. // SQLRewriteEngine.java
  2. /**
  3. * 拼接 TableToken
  4. *
  5. * @param sqlBuilder SQL构建器
  6. * @param tableToken tableToken
  7. * @param count tableToken 在 sqlTokens 的顺序
  8. * @param sqlTokens sqlTokens
  9. */
  10. privatevoid appendTableToken(finalSQLBuilder sqlBuilder,finalTableToken tableToken,finalint count,finalList<SQLToken> sqlTokens){
  11. // 拼接 TableToken
  12. String tableName = sqlStatement.getTables().getTableNames().contains(tableToken.getTableName())? tableToken.getTableName(): tableToken.getOriginalLiterals();
  13.   sqlBuilder.appendTable(tableName);
  14. // 拼接 SQLToken 后面的字符串
  15. int beginPosition = tableToken.getBeginPosition()+ tableToken.getOriginalLiterals().length();
  16. int endPosition = sqlTokens.size()-1== count ? originalSQL.length(): sqlTokens.get(count +1).getBeginPosition();
  17.   sqlBuilder.appendLiterals(originalSQL.substring(beginPosition, endPosition));
  18. }
  • 调用 SQLBuilder#appendTable() 拼接 TableToken。
  • sqlStatement.getTables().getTableNames().contains(tableToken.getTableName()) 目的是处理掉表名前后有的特殊字符,例如 SELECT*FROM't_order' 中 t_order 前后有 ' 符号。
  1. // TableToken.java
  2. /**
  3. * 获取表名称.
  4. */
  5. publicString getTableName(){
  6. returnSQLUtil.getExactlyValue(originalLiterals);
  7. }
  8. // SQLUtil.java
  9. publicstaticString getExactlyValue(finalString value){
  10. returnnull== value ?null:CharMatcher.anyOf("[]`'\"").removeFrom(value);
  11. }
  • 当 SQL 为 SELECT o.*FROM t_order o
    • TableToken 为查询列前的表别名 o 时返回结果: 
    • TableToken 为表名 t_order 时返回结果: 

3.2 ItemsToken

调用 #appendItemsToken() 方法拼接。
  1. // SQLRewriteEngine.java
  2. /**
  3. * 拼接 TableToken
  4. *
  5. * @param sqlBuilder SQL构建器
  6. * @param itemsToken itemsToken
  7. * @param count itemsToken 在 sqlTokens 的顺序
  8. * @param sqlTokens sqlTokens
  9. */
  10. privatevoid appendItemsToken(finalSQLBuilder sqlBuilder,finalItemsToken itemsToken,finalint count,finalList<SQLToken> sqlTokens){
  11. // 拼接 ItemsToken
  12. for(String item : itemsToken.getItems()){
  13.       sqlBuilder.appendLiterals(", ");
  14.       sqlBuilder.appendLiterals(item);
  15. }
  16. // SQLToken 后面的字符串
  17. int beginPosition = itemsToken.getBeginPosition();
  18. int endPosition = sqlTokens.size()-1== count ? originalSQL.length(): sqlTokens.get(count +1).getBeginPosition();
  19.   sqlBuilder.appendLiterals(originalSQL.substring(beginPosition, endPosition));
  20. }
  • 第一种情况,AVG查询列,SQL 为 SELECT AVG(order_id)FROM t_order o 时返回结果: 
  • 第二种情况,ORDER BY 字段不在查询列,SQL 为 SELECT userId FROM t_order o ORDER BY order_id 时返回结果: 
  • 第三种情况,GROUP BY 字段不在查询列,类似第二种情况,就不举例子列。

3.3 OffsetToken

调用 #appendLimitOffsetToken() 方法拼接。
  1. // SQLRewriteEngine.java
  2. /**
  3. * 拼接 OffsetToken
  4. *
  5. * @param sqlBuilder SQL构建器
  6. * @param offsetToken offsetToken
  7. * @param count offsetToken 在 sqlTokens 的顺序
  8. * @param sqlTokens sqlTokens
  9. * @param isRewrite 是否重写。当路由结果为单分片时无需重写
  10. */
  11. privatevoid appendLimitOffsetToken(finalSQLBuilder sqlBuilder,finalOffsetToken offsetToken,finalint count,finalList<SQLToken> sqlTokens,finalboolean isRewrite){
  12. // 拼接 OffsetToken
  13.   sqlBuilder.appendLiterals(isRewrite ?"0":String.valueOf(offsetToken.getOffset()));
  14. // SQLToken 后面的字符串
  15. int beginPosition = offsetToken.getBeginPosition()+String.valueOf(offsetToken.getOffset()).length();
  16. int endPosition = sqlTokens.size()-1== count ? originalSQL.length(): sqlTokens.get(count +1).getBeginPosition();
  17.   sqlBuilder.appendLiterals(originalSQL.substring(beginPosition, endPosition));
  18. }
  • 当分页跨分片时,需要每个分片都查询后在内存中进行聚合。此时 isRewrite=true。为什么是 "0" 开始呢?每个分片在 [0, offset) 的记录可能属于实际分页结果,因而查询每个分片需要从 0 开始。
  • 当分页单分片时,则无需重写,该分片执行的结果即是最终结果。SQL改写在SQL路由之后就有这个好处。如果先改写,因为没办法知道最终是单分片还是跨分片,考虑正确性,只能统一使用跨分片。

3.4 RowCountToken

调用 #appendLimitRowCount() 方法拼接。
  1. // SQLRewriteEngine.java
  2. privatevoid appendLimitRowCount(finalSQLBuilder sqlBuilder,finalRowCountToken rowCountToken,finalint count,finalList<SQLToken> sqlTokens,finalboolean isRewrite){
  3. SelectStatement selectStatement =(SelectStatement) sqlStatement;
  4. Limit limit = selectStatement.getLimit();
  5. if(!isRewrite){// 路由结果为单分片
  6.       sqlBuilder.appendLiterals(String.valueOf(rowCountToken.getRowCount()));
  7. }elseif((!selectStatement.getGroupByItems().isEmpty()||// [1.1] 跨分片分组需要在内存计算,可能需要全部加载
  8. !selectStatement.getAggregationSelectItems().isEmpty())// [1.2] 跨分片聚合列需要在内存计算,可能需要全部加载
  9. &&!selectStatement.isSameGroupByAndOrderByItems()){// [2] 如果排序一致,即各分片已经排序好结果,就不需要全部加载
  10.       sqlBuilder.appendLiterals(String.valueOf(Integer.MAX_VALUE));
  11. }else{// 路由结果为多分片
  12.       sqlBuilder.appendLiterals(String.valueOf(limit.isRowCountRewriteFlag()? rowCountToken.getRowCount()+ limit.getOffsetValue(): rowCountToken.getRowCount()));
  13. }
  14. // SQLToken 后面的字符串
  15. int beginPosition = rowCountToken.getBeginPosition()+String.valueOf(rowCountToken.getRowCount()).length();
  16. int endPosition = sqlTokens.size()-1== count ? originalSQL.length(): sqlTokens.get(count +1).getBeginPosition();
  17.   sqlBuilder.appendLiterals(originalSQL.substring(beginPosition, endPosition));
  18. }
  • [1.1] !selectStatement.getGroupByItems().isEmpty() 跨分片分组需要在内存计算,可能需要全部加载。如果不全部加载,部分结果被分页条件错误结果,会导致结果不正确。
  • [1.2] !selectStatement.getAggregationSelectItems().isEmpty()) 跨分片聚合列需要在内存计算,可能需要全部加载。如果不全部加载,部分结果被分页条件错误结果,会导致结果不正确。
  • [1.1][1.2],可能变成必须的前提是 GROUP BY 和 ORDER BY 排序不一致。如果一致,各分片已经排序完成,无需内存中排序。

3.4.1 分页补充

OffsetToken、RowCountToken 只有在分页对应位置非占位符 ? 才存在。当对应位置是占位符时,会对分页条件对应的预编译 SQL 占位符参数进行重写,整体逻辑和 OffsetToken、RowCountToken 是一致的
  1. // 👼 ParsingSQLRouter#route() 调用 #processLimit()
  2. // ParsingSQLRouter.java
  3. /**
  4. * 处理分页条件
  5. *
  6. * @see SQLRewriteEngine#appendLimitRowCount(SQLBuilder, RowCountToken, int, List, boolean)
  7. * @param parameters 占位符对应参数列表
  8. * @param selectStatement Select SQL语句对象
  9. * @param isSingleRouting 是否单表路由
  10. */
  11. privatevoid processLimit(finalList<Object> parameters,finalSelectStatement selectStatement,finalboolean isSingleRouting){
  12. boolean isNeedFetchAll =(!selectStatement.getGroupByItems().isEmpty()// // [1.1] 跨分片分组需要在内存计算,可能需要全部加载
  13. ||!selectStatement.getAggregationSelectItems().isEmpty())// [1.2] 跨分片聚合列需要在内存计算,可能需要全部加载
  14. &&!selectStatement.isSameGroupByAndOrderByItems();// [2] 如果排序一致,即各分片已经排序好结果,就不需要全部加载
  15.   selectStatement.getLimit().processParameters(parameters,!isSingleRouting, isNeedFetchAll);
  16. }
  17. // Limit.java
  18. /**
  19. * 填充改写分页参数.
  20. * @param parameters 参数
  21. * @param isRewrite 是否重写参数
  22. * @param isFetchAll 是否获取所有数据
  23. */
  24. publicvoid processParameters(finalList<Object> parameters,finalboolean isRewrite,finalboolean isFetchAll){
  25.   fill(parameters);
  26. if(isRewrite){
  27.       rewrite(parameters, isFetchAll);
  28. }
  29. }
  30. /**
  31. * 将占位符参数里是分页的参数赋值给 offset 、rowCount
  32. * 赋值的前提条件是 offset、rowCount 是 占位符
  33. * @param parameters 占位符参数
  34. */
  35. privatevoid fill(finalList<Object> parameters){
  36. int offset =0;
  37. if(null!=this.offset){
  38.       offset =-1==this.offset.getIndex()? getOffsetValue():NumberUtil.roundHalfUp(parameters.get(this.offset.getIndex()));
  39. this.offset.setValue(offset);
  40. }
  41. int rowCount =0;
  42. if(null!=this.rowCount){
  43.       rowCount =-1==this.rowCount.getIndex()? getRowCountValue():NumberUtil.roundHalfUp(parameters.get(this.rowCount.getIndex()));
  44. this.rowCount.setValue(rowCount);
  45. }
  46. if(offset <0|| rowCount <0){
  47. thrownewSQLParsingException("LIMIT offset and row count can not be a negative value.");
  48. }
  49. }
  50. /**
  51. * 重写分页条件对应的参数
  52. * @param parameters 参数
  53. * @param isFetchAll 是否拉取所有
  54. */
  55. privatevoid rewrite(finalList<Object> parameters,finalboolean isFetchAll){
  56. int rewriteOffset =0;
  57. int rewriteRowCount;
  58. // 重写
  59. if(isFetchAll){
  60.       rewriteRowCount =Integer.MAX_VALUE;
  61. }elseif(rowCountRewriteFlag){
  62.       rewriteRowCount =null== rowCount ?-1: getOffsetValue()+ rowCount.getValue();
  63. }else{
  64.       rewriteRowCount = rowCount.getValue();
  65. }
  66. // 参数设置
  67. if(null!= offset && offset.getIndex()>-1){
  68.       parameters.set(offset.getIndex(), rewriteOffset);
  69. }
  70. if(null!= rowCount && rowCount.getIndex()>-1){
  71.       parameters.set(rowCount.getIndex(), rewriteRowCount);
  72. }
  73. }

3.5 OrderByToken

调用 #appendOrderByToken() 方法拼接。数据库里,当无 ORDER BY条件 而有 GROUP BY 条件时候,会使用 GROUP BY条件将结果升序排序:
  • SELECT order_id FROM t_order GROUP BY order_id 等价于 SELECT order_id FROM t_order GROUP BY order_id ORDER BY order_id ASC
  • SELECT order_id FROM t_order GROUP BY order_id DESC 等价于 SELECT order_id FROM t_order GROUP BY order_id ORDER BY order_id DESC
  1. // ParsingSQLRouter.java
  2. /**
  3. * 拼接 OrderByToken
  4. *
  5. * @param sqlBuilder SQL构建器
  6. */
  7. privatevoid appendOrderByToken(finalSQLBuilder sqlBuilder){
  8. SelectStatement selectStatement =(SelectStatement) sqlStatement;
  9. // 拼接 OrderByToken
  10. StringBuilder orderByLiterals =newStringBuilder(" ORDER BY ");
  11. int i =0;
  12. for(OrderItem each : selectStatement.getOrderByItems()){
  13. if(0== i){
  14.           orderByLiterals.append(each.getColumnLabel()).append(" ").append(each.getType().name());
  15. }else{
  16.           orderByLiterals.append(",").append(each.getColumnLabel()).append(" ").append(each.getType().name());
  17. }
  18.       i++;
  19. }
  20.   orderByLiterals.append(" ");
  21.   sqlBuilder.appendLiterals(orderByLiterals.toString());
  22. }
  • 当 SQL 为 SELECT order_id FROM t_order o GROUP BY order_id 返回结果: 

3.6 GeneratedKeyToken

前置阅读:《SQL 解析(四)之插入SQL》
GeneratedKeyToken,和其它 SQLToken 不同,在 SQL解析 完进行处理。
  1. // ParsingSQLRouter.java
  2. @Override
  3. publicSQLStatement parse(finalString logicSQL,finalint parametersSize){
  4. SQLParsingEngine parsingEngine =newSQLParsingEngine(databaseType, logicSQL, shardingRule);
  5. Context context =MetricsContext.start("Parse SQL");
  6. SQLStatement result = parsingEngine.parse();
  7. if(result instanceofInsertStatement){// 处理 GenerateKeyToken
  8. ((InsertStatement) result).appendGenerateKeyToken(shardingRule, parametersSize);
  9. }
  10. MetricsContext.stop(context);
  11. return result;
  12. }
  13. // InsertStatement.java
  14. /**
  15. * 追加自增主键标记对象.
  16. *
  17. * @param shardingRule 分片规则
  18. * @param parametersSize 参数个数
  19. */
  20. publicvoid appendGenerateKeyToken(finalShardingRule shardingRule,finalint parametersSize){
  21. // SQL 里有主键列
  22. if(null!= generatedKey){
  23. return;
  24. }
  25. // TableRule 存在
  26. Optional<TableRule> tableRule = shardingRule.tryFindTableRule(getTables().getSingleTableName());
  27. if(!tableRule.isPresent()){
  28. return;
  29. }
  30. // GeneratedKeyToken 存在
  31. Optional<GeneratedKeyToken> generatedKeysToken = findGeneratedKeyToken();
  32. if(!generatedKeysToken.isPresent()){
  33. return;
  34. }
  35. // 处理 GenerateKeyToken
  36. ItemsToken valuesToken =newItemsToken(generatedKeysToken.get().getBeginPosition());
  37. if(0== parametersSize){
  38.       appendGenerateKeyToken(shardingRule, tableRule.get(), valuesToken);
  39. }else{
  40.       appendGenerateKeyToken(shardingRule, tableRule.get(), valuesToken, parametersSize);
  41. }
  42. // 移除 generatedKeysToken
  43.   getSqlTokens().remove(generatedKeysToken.get());
  44. // 新增 ItemsToken
  45.   getSqlTokens().add(valuesToken);
  46. }
  • 根据占位符参数数量不同,调用的 #appendGenerateKeyToken() 是不同的:
  • 占位符参数数量 = 0 时,直接生成分布式主键,保持无占位符的做法。
  1. // InsertStatement.java
  2. privatevoid appendGenerateKeyToken(finalShardingRule shardingRule,finalTableRule tableRule,finalItemsToken valuesToken){
  3. // 生成分布式主键
  4. Number generatedKey = shardingRule.generateKey(tableRule.getLogicTable());
  5. // 添加到 ItemsToken
  6.   valuesToken.getItems().add(generatedKey.toString());
  7. // 增加 Condition,用于路由
  8.   getConditions().add(newCondition(newColumn(tableRule.getGenerateKeyColumn(), tableRule.getLogicTable()),newSQLNumberExpression(generatedKey)), shardingRule);
  9. // 生成 GeneratedKey
  10. this.generatedKey =newGeneratedKey(tableRule.getLogicTable(),-1, generatedKey);
  11. }
  • 占位符参数数量 > 0 时,生成自增列的占位符,保持有占位符的做法。
  1. privatevoid appendGenerateKeyToken(finalShardingRule shardingRule,finalTableRule tableRule,finalItemsToken valuesToken,finalint parametersSize){
  2. // 生成占位符
  3.   valuesToken.getItems().add("?");
  4. // 增加 Condition,用于路由
  5.   getConditions().add(newCondition(newColumn(tableRule.getGenerateKeyColumn(), tableRule.getLogicTable()),newSQLPlaceholderExpression(parametersSize)), shardingRule);
  6. // 生成 GeneratedKey
  7.   generatedKey =newGeneratedKey(tableRule.getGenerateKeyColumn(), parametersSize,null);
  8. }
  • 因为 GenerateKeyToken 已经处理完,所以移除,避免 SQLRewriteEngine#rewrite() 二次改写。另外,通过 ItemsToken 补充自增列。
  • 生成 GeneratedKey 会在 ParsingSQLRouter 进一步处理。
  1. // ParsingSQLRouter.java
  2. publicSQLRouteResult route(finalString logicSQL,finalList<Object> parameters,finalSQLStatement sqlStatement){
  3. finalContext context =MetricsContext.start("Route SQL");
  4. SQLRouteResult result =newSQLRouteResult(sqlStatement);
  5. // 处理 插入SQL 主键字段
  6. if(sqlStatement instanceofInsertStatement&&null!=((InsertStatement) sqlStatement).getGeneratedKey()){
  7.       processGeneratedKey(parameters,(InsertStatement) sqlStatement, result);
  8. }
  9. // ... 省略部分代码
  10. }
  11. /**
  12. * 处理 插入SQL 主键字段
  13. * 当 主键编号 未生成时,{@link ShardingRule#generateKey(String)} 进行生成
  14. * @param parameters 占位符参数
  15. * @param insertStatement Insert SQL语句对象
  16. * @param sqlRouteResult SQL路由结果
  17. */
  18. privatevoid processGeneratedKey(finalList<Object> parameters,finalInsertStatement insertStatement,finalSQLRouteResult sqlRouteResult){
  19. GeneratedKey generatedKey = insertStatement.getGeneratedKey();
  20. if(parameters.isEmpty()){// 已有主键,无占位符,INSERT INTO t_order(order_id, user_id) VALUES (1, 100);
  21.       sqlRouteResult.getGeneratedKeys().add(generatedKey.getValue());
  22. }elseif(parameters.size()== generatedKey.getIndex()){// 主键字段不存在存在,INSERT INTO t_order(user_id) VALUES(?);
  23. Number key = shardingRule.generateKey(insertStatement.getTables().getSingleTableName());// 生成主键编号
  24.       parameters.add(key);
  25.       setGeneratedKeys(sqlRouteResult, key);
  26. }elseif(-1!= generatedKey.getIndex()){// 主键字段存在,INSERT INTO t_order(order_id, user_id) VALUES(?, ?);
  27.       setGeneratedKeys(sqlRouteResult,(Number) parameters.get(generatedKey.getIndex()));
  28. }
  29. }
  30. /**
  31. * 设置 主键编号 到 SQL路由结果
  32. * @param sqlRouteResult SQL路由结果
  33. * @param generatedKey 主键编号
  34. */
  35. privatevoid setGeneratedKeys(finalSQLRouteResult sqlRouteResult,finalNumber generatedKey){
  36.   generatedKeys.add(generatedKey);
  37.   sqlRouteResult.getGeneratedKeys().clear();
  38.   sqlRouteResult.getGeneratedKeys().addAll(generatedKeys);
  39. }
  • parameters.size()==generatedKey.getIndex() 处对应 #appendGenerateKeyToken() 的 占位符参数数量 > 0 情况,此时会生成分布式主键。😈 该处是不是可以考虑把生成分布式主键挪到 #appendGenerateKeyToken(),这样更加统一一些。

4. SQL 生成

SQL路由完后,会生成各数据分片的执行SQL
  1. // ParsingSQLRouter.java
  2. @Override
  3. publicSQLRouteResult route(finalString logicSQL,finalList<Object> parameters,finalSQLStatement sqlStatement){
  4. SQLRouteResult result =newSQLRouteResult(sqlStatement);
  5. // 省略部分代码... 处理 插入SQL 主键字段
  6. // 路由
  7. RoutingResult routingResult = route(parameters, sqlStatement);
  8. // 省略部分代码... SQL重写引擎
  9. SQLRewriteEngine rewriteEngine =newSQLRewriteEngine(shardingRule, logicSQL, sqlStatement);
  10. boolean isSingleRouting = routingResult.isSingleRouting();
  11. // 省略部分代码... 处理分页
  12. // SQL 重写
  13. SQLBuilder sqlBuilder = rewriteEngine.rewrite(!isSingleRouting);
  14. // 生成 ExecutionUnit
  15. if(routingResult instanceofCartesianRoutingResult){
  16. for(CartesianDataSource cartesianDataSource :((CartesianRoutingResult) routingResult).getRoutingDataSources()){
  17. for(CartesianTableReference cartesianTableReference : cartesianDataSource.getRoutingTableReferences()){
  18. // 👼 生成 SQL
  19.               result.getExecutionUnits().add(newSQLExecutionUnit(cartesianDataSource.getDataSource(), rewriteEngine.generateSQL(cartesianTableReference, sqlBuilder)));
  20. }
  21. }
  22. }else{
  23. for(TableUnit each : routingResult.getTableUnits().getTableUnits()){
  24. // 👼 生成 SQL
  25.           result.getExecutionUnits().add(newSQLExecutionUnit(each.getDataSourceName(), rewriteEngine.generateSQL(each, sqlBuilder)));
  26. }
  27. }
  28. return result;
  29. }
  • 调用 RewriteEngine#generateSQL() 生成执行SQL。对于笛卡尔积路由结果和简单路由结果传递的参数略有不同:前者使用 CartesianDataSource ( CartesianTableReference ),后者使用路由表单元 ( TableUnit )。对路由结果不是很了解的同学,建议看下 《SQL 路由(二)之分库分表路由》。
RewriteEngine#generateSQL() 对于笛卡尔积路由结果和简单路由结果两种情况,处理上大体是一致的:1. 获得 SQL 相关逻辑表对应的真实表映射,2. 根据映射改写 SQL 相关逻辑表真实表
  1. // SQLRewriteEngine.java
  2. /**
  3. * 生成SQL语句.
  4. * @param tableUnit 路由表单元
  5. * @param sqlBuilder SQL构建器
  6. * @return SQL语句
  7. */
  8. publicString generateSQL(finalTableUnit tableUnit,finalSQLBuilder sqlBuilder){
  9. return sqlBuilder.toSQL(getTableTokens(tableUnit));
  10. }
  11. /**
  12. * 生成SQL语句.
  13. * @param cartesianTableReference 笛卡尔积路由表单元
  14. * @param sqlBuilder SQL构建器
  15. * @return SQL语句
  16. */
  17. publicString generateSQL(finalCartesianTableReference cartesianTableReference,finalSQLBuilder sqlBuilder){
  18. return sqlBuilder.toSQL(getTableTokens(cartesianTableReference));
  19. }
  20. // SQLRewriteEngine.java
  21. // SQLBuilder.java
  22. /**
  23. * 生成SQL语句.
  24. * @param tableTokens 占位符集合(逻辑表与真实表映射)
  25. * @return SQL语句
  26. */
  27. publicString toSQL(finalMap<String,String> tableTokens){
  28. StringBuilder result =newStringBuilder();
  29. for(Object each : segments){
  30. if(each instanceofTableToken&& tableTokens.containsKey(((TableToken) each).tableName)){
  31.           result.append(tableTokens.get(((TableToken) each).tableName));
  32. }else{
  33.           result.append(each);
  34. }
  35. }
  36. return result.toString();
  37. }
  • #toSQL() 结果如图: 
    😜 对 SQL改写 是不是清晰很多了。

下面我们以笛卡尔积路由结果获得 SQL 相关逻辑表对应的真实表映射为例子(简单路由结果基本类似而且简单)。
  1. // SQLRewriteEngine.java
  2. /**
  3. * 获得(笛卡尔积表路由组里的路由表单元逻辑表 和 与其互为BindingTable关系的逻辑表)对应的真实表映射(逻辑表需要在 SQL 中存在)
  4. * @param cartesianTableReference 笛卡尔积表路由组
  5. * @return 集合
  6. */
  7. privateMap<String,String> getTableTokens(finalCartesianTableReference cartesianTableReference){
  8. Map<String,String> tableTokens =newHashMap<>();
  9. for(TableUnit each : cartesianTableReference.getTableUnits()){
  10.       tableTokens.put(each.getLogicTableName(), each.getActualTableName());
  11. // 查找 BindingTableRule
  12. Optional<BindingTableRule> bindingTableRule = shardingRule.findBindingTableRule(each.getLogicTableName());
  13. if(bindingTableRule.isPresent()){
  14.           tableTokens.putAll(getBindingTableTokens(each, bindingTableRule.get()));
  15. }
  16. }
  17. return tableTokens;
  18. }
  19. /**
  20. * 获得 BindingTable 关系的逻辑表对应的真实表映射(逻辑表需要在 SQL 中存在)
  21. * @param tableUnit 路由单元
  22. * @param bindingTableRule Binding表规则配置对象
  23. * @return 映射
  24. */
  25. privateMap<String,String> getBindingTableTokens(finalTableUnit tableUnit,finalBindingTableRule bindingTableRule){
  26. Map<String,String> result =newHashMap<>();
  27. for(String eachTable : sqlStatement.getTables().getTableNames()){
  28. if(!eachTable.equalsIgnoreCase(tableUnit.getLogicTableName())&& bindingTableRule.hasLogicTable(eachTable)){
  29.           result.put(eachTable, bindingTableRule.getBindingActualTable(tableUnit.getDataSourceName(), eachTable, tableUnit.getActualTableName()));
  30. }
  31. }
  32. return result;
  33. }
  • 笛卡尔积表路由组( CartesianTableReference )包含多个路由表单元( TableUnit )。每个路由表单元需要遍历。
  • 路由表单元本身包含逻辑表和真实表,直接添加到映射即可。
  • 互为 BindingTable 关系的表只计算一次路由分片,因此未计算的真实表需要以其对应的已计算的真实表去查找,即 bindingTableRule.getBindingActualTable(tableUnit.getDataSourceName(),eachTable,tableUnit.getActualTableName()) 处逻辑。
  1. // BindingTableRule.java
  2. /**
  3. * 根据其他Binding表真实表名称获取相应的真实Binding表名称.
  4. *
  5. * @param dataSource 数据源名称
  6. * @param logicTable 逻辑表名称
  7. * @param otherActualTable 其他真实Binding表名称
  8. * @return 真实Binding表名称
  9. */
  10. publicString getBindingActualTable(finalString dataSource,finalString logicTable,finalString otherActualTable){
  11. // 计算 otherActualTable 在其 TableRule 的 actualTable 是第几个
  12. int index =-1;
  13. for(TableRule each : tableRules){
  14. if(each.isDynamic()){
  15. thrownewUnsupportedOperationException("Dynamic table cannot support Binding table.");
  16. }
  17.       index = each.findActualTableIndex(dataSource, otherActualTable);
  18. if(-1!= index){
  19. break;
  20. }
  21. }
  22. Preconditions.checkState(-1!= index,String.format("Actual table [%s].[%s] is not in table config", dataSource, otherActualTable));
  23. // 计算 logicTable 在其 TableRule 的 第index 的 真实表
  24. for(TableRule each : tableRules){
  25. if(each.getLogicTable().equalsIgnoreCase(logicTable)){
  26. return each.getActualTables().get(index).getTableName();
  27. }
  28. }
  29. thrownewIllegalStateException(String.format("Cannot find binding actual table, data source: %s, logic table: %s, other actual table: %s", dataSource, logicTable, otherActualTable));
  30. }
可能看起来有些绕,我们看张图:
友情提示:这里不嫌啰嗦在提一句,互为 BindingTable 的表,配置 TableRule 时, actualTables 数量一定要一致,否则多出来的表,可能会无法被路由到。

666. 彩蛋

哈哈哈,看完SQL改写后,SQL解析是不是清晰多了!嘿嘿嘿,反正我现在有点嗨。恩,蛮嗨的。
当然,如果SQL解析理解上有点疑惑的你,欢迎加我的微信,咱 1对1 搞基。关注我的微信公众号:【芋道源码】 即可获得。
道友,转发一波朋友圈可好?
Let's Go! 《分布式主键》、《SQL 执行》、《结果聚合》 继续。
感谢技术牛逼如你耐心的阅读本文。
继续阅读
阅读原文