摘要: 原创出处 http://www.iocoder.cn/Sharding-JDBC/result-merger/ 「芋道源码」欢迎转载,保留摘要,谢谢!
微信排版崩崩的,建议使用 PC 点击【阅读原文】。
本文主要基于 Sharding-JDBC 1.5.0 正式版
  • 1. 概述
  • 2. MergeEngine
    • 2.2.1 AbstractStreamResultSetMerger
    • 2.2.2 AbstractMemoryResultSetMerger
    • 2.2.3 AbstractDecoratorResultSetMerger
    • 2.1 SelectStatement#setIndexForItems()
    • 2.2 ResultSetMerger
  • 3. OrderByStreamResultSetMerger
    • 3.1 归并算法
    • 3.2 #next()
  • 4. GroupByStreamResultSetMerger
    • 4.1 AggregationUnit
    • 4.2 #next()
  • 5. GroupByMemoryResultSetMerger
    • 5.1 #next()
  • 6. IteratorStreamResultSetMerger
  • 7. LimitDecoratorResultSetMerger
  • 666. 彩蛋
🙂🙂🙂关注微信公众号:【芋道源码】有福利:
  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

本文分享查询结果归并的源码实现。
正如前文《SQL 执行》提到的“分表分库,需要执行的 SQL 数量从单条变成了多条”,多个SQL执行结果必然需要进行合并,例如:
  1. SELECT * FROM t_order ORDER BY create_time
在各分片排序完后,Sharding-JDBC 获取到结果后,仍然需要再进一步排序。目前有 分页分组排序聚合列迭代 五种场景需要做进一步处理。当然,如果单分片SQL执行结果是无需合并的。在《SQL 执行》不知不觉已经分享了插入、更新、删除操作的结果合并,所以下面我们一起看看查询结果归并的实现。

Sharding-JDBC 正在收集使用公司名单:传送门。

🙂 你的登记,会让更多人参与和使用 Sharding-JDBC。传送门

Sharding-JDBC 也会因此,能够覆盖更多的业务场景。传送门

登记吧,骚年!传送门

2. MergeEngine

MergeEngine,分片结果集归并引擎。
  1. // MergeEngine.java
  2. /**
  3. * 数据库类型
  4. */
  5. privatefinalDatabaseType databaseType;
  6. /**
  7. * 结果集集合
  8. */
  9. privatefinalList<ResultSet> resultSets;
  10. /**
  11. * Select SQL语句对象
  12. */
  13. privatefinalSelectStatement selectStatement;
  14. /**
  15. * 查询列名与位置映射
  16. */
  17. privatefinalMap<String,Integer> columnLabelIndexMap;
  18. publicMergeEngine(finalDatabaseType databaseType,finalList<ResultSet> resultSets,finalSelectStatement selectStatement)throwsSQLException{
  19. this.databaseType = databaseType;
  20. this.resultSets = resultSets;
  21. this.selectStatement = selectStatement;
  22. // 获得 查询列名与位置映射
  23.   columnLabelIndexMap = getColumnLabelIndexMap(resultSets.get(0));
  24. }
  25. /**
  26. * 获得 查询列名与位置映射
  27. *
  28. * @param resultSet 结果集
  29. * @return 查询列名与位置映射
  30. * @throws SQLException 当结果集已经关闭
  31. */
  32. privateMap<String,Integer> getColumnLabelIndexMap(finalResultSet resultSet)throwsSQLException{
  33. ResultSetMetaData resultSetMetaData = resultSet.getMetaData();// 元数据(包含查询列信息)
  34. Map<String,Integer> result =newTreeMap<>(String.CASE_INSENSITIVE_ORDER);
  35. for(int i =1; i <= resultSetMetaData.getColumnCount(); i++){
  36.       result.put(SQLUtil.getExactlyValue(resultSetMetaData.getColumnLabel(i)), i);
  37. }
  38. return result;
  39. }
  • 当 MergeEngine 被创建时,会传入 resultSets 结果集集合,并根据其获得 columnLabelIndexMap 查询列名与位置映射。通过 columnLabelIndexMap,可以很方便的使用查询列名获得在返回结果记录列( header )的第几列。

MergeEngine 的 #merge() 方法作为入口提供查询结果归并功能。
  1. /**
  2. * 合并结果集.
  3. *
  4. * @return 归并完毕后的结果集
  5. * @throws SQLException SQL异常
  6. */
  7. publicResultSetMerger merge()throwsSQLException{
  8.   selectStatement.setIndexForItems(columnLabelIndexMap);
  9. return decorate(build());
  10. }
  • #merge() 主体逻辑就两行代码,设置查询列位置信息,并返回合适的归并结果集接口( ResultSetMerger ) 实现。

2.1 SelectStatement#setIndexForItems()

  1. // SelectStatement.java
  2. /**
  3. * 为选择项设置索引.
  4. *
  5. * @param columnLabelIndexMap 列标签索引字典
  6. */
  7. publicvoid setIndexForItems(finalMap<String,Integer> columnLabelIndexMap){
  8.   setIndexForAggregationItem(columnLabelIndexMap);
  9.   setIndexForOrderItem(columnLabelIndexMap, orderByItems);
  10.   setIndexForOrderItem(columnLabelIndexMap, groupByItems);
  11. }
  • 部分查询列是经过推到出来,在 SQL解析 过程中,未获得到查询列位置,需要通过该方法进行初始化。对这块不了解的同学,回头可以看下《SQL 解析(三)之查询SQL》。🙂 现在不用回头,皇冠会掉。
  • #setIndexForAggregationItem() 处理 AVG聚合计算列 推导出其对应的 SUM/COUNT 聚合计算列的位置:
    1. privatevoid setIndexForAggregationItem(finalMap<String,Integer> columnLabelIndexMap){
    2. for(AggregationSelectItem each : getAggregationSelectItems()){
    3. Preconditions.checkState(columnLabelIndexMap.containsKey(each.getColumnLabel()),String.format("Can't find index: %s, please add alias for aggregate selections", each));
    4.       each.setIndex(columnLabelIndexMap.get(each.getColumnLabel()));
    5. for(AggregationSelectItem derived : each.getDerivedAggregationSelectItems()){
    6. Preconditions.checkState(columnLabelIndexMap.containsKey(derived.getColumnLabel()),String.format("Can't find index: %s", derived));
    7.           derived.setIndex(columnLabelIndexMap.get(derived.getColumnLabel()));
    8. }
    9. }
    10. }
  • #setIndexForOrderItem() 处理 ORDER BY / GROUP BY 列不在查询列 推导出的查询列的位置:
    1. privatevoid setIndexForOrderItem(finalMap<String,Integer> columnLabelIndexMap,finalList<OrderItem> orderItems){
    2. for(OrderItem each : orderItems){
    3. if(-1!= each.getIndex()){
    4. continue;
    5. }
    6. Preconditions.checkState(columnLabelIndexMap.containsKey(each.getColumnLabel()),String.format("Can't find index: %s", each));
    7. if(columnLabelIndexMap.containsKey(each.getColumnLabel())){
    8.          each.setIndex(columnLabelIndexMap.get(each.getColumnLabel()));
    9. }
    10. }
    11. }

2.2 ResultSetMerger

ResultSetMerger,归并结果集接口。
我们先来看看整体的类结构关系:
功能 上分成四种:
  • 分组:GroupByMemoryResultSetMerger、GroupByStreamResultSetMerger;包含聚合列
  • 排序:OrderByStreamResultSetMerger
  • 迭代:IteratorStreamResultSetMerger
  • 分页:LimitDecoratorResultSetMerger
实现方式 上分成三种:
  • Stream 流式:AbstractStreamResultSetMerger
  • Memory 内存:AbstractMemoryResultSetMerger
  • Decorator 装饰者:AbstractDecoratorResultSetMerger
什么时候该用什么实现方式?
  • Stream 流式:将数据游标与结果集的游标保持一致,顺序的从结果集中一条条的获取正确的数据。看完下文第三节OrderByStreamResultSetMerger 可以形象的理解。
  • Memory 内存:需要将结果集的所有数据都遍历并存储在内存中,再通过内存归并后,将内存中的数据伪装成结果集返回。看完下文第五节 GroupByMemoryResultSetMerger 可以形象的理解。
  • Decorator 装饰者:可以和前二者任意组合
  1. // MergeEngine.java
  2. /**
  3. * 合并结果集.
  4. *
  5. * @return 归并完毕后的结果集
  6. * @throws SQLException SQL异常
  7. */
  8. publicResultSetMerger merge()throwsSQLException{
  9.   selectStatement.setIndexForItems(columnLabelIndexMap);
  10. return decorate(build());
  11. }
  12. privateResultSetMerger build()throwsSQLException{
  13. if(!selectStatement.getGroupByItems().isEmpty()||!selectStatement.getAggregationSelectItems().isEmpty()){// 分组 或 聚合列
  14. if(selectStatement.isSameGroupByAndOrderByItems()){
  15. returnnewGroupByStreamResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType());
  16. }else{
  17. returnnewGroupByMemoryResultSetMerger(columnLabelIndexMap, resultSets, selectStatement, getNullOrderType());
  18. }
  19. }
  20. if(!selectStatement.getOrderByItems().isEmpty()){
  21. returnnewOrderByStreamResultSetMerger(resultSets, selectStatement.getOrderByItems(), getNullOrderType());
  22. }
  23. returnnewIteratorStreamResultSetMerger(resultSets);
  24. }
  25. privateResultSetMerger decorate(finalResultSetMerger resultSetMerger)throwsSQLException{
  26. ResultSetMerger result = resultSetMerger;
  27. if(null!= selectStatement.getLimit()){
  28.       result =newLimitDecoratorResultSetMerger(result, selectStatement.getLimit());
  29. }
  30. return result;
  31. }

2.2.1 AbstractStreamResultSetMerger

AbstractStreamResultSetMerger,流式归并结果集抽象类,提供从当前结果集获得行数据。
  1. publicabstractclassAbstractStreamResultSetMergerimplementsResultSetMerger{
  2. /**
  3.     * 当前结果集
  4.     */
  5. privateResultSet currentResultSet;
  6. protectedResultSet getCurrentResultSet()throwsSQLException{
  7. if(null== currentResultSet){
  8. thrownewSQLException("Current ResultSet is null, ResultSet perhaps end of next.");
  9. }
  10. return currentResultSet;
  11. }
  12. @Override
  13. publicObject getValue(finalint columnIndex,finalClass<?> type)throwsSQLException{
  14. if(Object.class== type){
  15. return getCurrentResultSet().getObject(columnIndex);
  16. }
  17. if(int.class== type){
  18. return getCurrentResultSet().getInt(columnIndex);
  19. }
  20. if(String.class== type){
  21. return getCurrentResultSet().getString(columnIndex);
  22. }
  23. // .... 省略其他数据类型读取类似代码
  24. return getCurrentResultSet().getObject(columnIndex);
  25. }
  26. }

2.2.2 AbstractMemoryResultSetMerger

AbstractMemoryResultSetMerger,内存归并结果集抽象类,提供从内存数据行对象( MemoryResultSetRow ) 获得行数据。
  1. publicabstractclassAbstractMemoryResultSetMergerimplementsResultSetMerger{
  2. privatefinalMap<String,Integer> labelAndIndexMap;
  3. /**
  4.     * 内存数据行对象
  5.     */
  6. @Setter
  7. privateMemoryResultSetRow currentResultSetRow;
  8. @Override
  9. publicObject getValue(finalint columnIndex,finalClass<?> type)throwsSQLException{
  10. if(Blob.class== type ||Clob.class== type ||Reader.class== type ||InputStream.class== type || SQLXML.class== type){
  11. thrownewSQLFeatureNotSupportedException();
  12. }
  13. return currentResultSetRow.getCell(columnIndex);
  14. }
  15. }
  • 和 AbstractStreamResultSetMerger 对比,貌似区别不大?!确实,从抽象父类上看,两种实现方式差不多。抽象父类提供给实现子类的是数据读取的功能,真正的流式归并、内存归并是在子类实现上体现。
  1. publicclassMemoryResultSetRow{
  2. /**
  3.     * 行数据
  4.     */
  5. privatefinalObject[] data;
  6. publicMemoryResultSetRow(finalResultSet resultSet)throwsSQLException{
  7.        data = load(resultSet);
  8. }
  9. /**
  10.     * 加载 ResultSet 当前行数据到内存
  11.     * @param resultSet 结果集
  12.     * @return 行数据
  13.     * @throws SQLException 当结果集关闭
  14.     */
  15. privateObject[] load(finalResultSet resultSet)throwsSQLException{
  16. int columnCount = resultSet.getMetaData().getColumnCount();
  17. Object[] result =newObject[columnCount];
  18. for(int i =0; i < columnCount; i++){
  19.            result[i]= resultSet.getObject(i +1);
  20. }
  21. return result;
  22. }
  23. /**
  24.     * 获取数据.
  25.     *
  26.     * @param columnIndex 列索引
  27.     * @return 数据
  28.     */
  29. publicObject getCell(finalint columnIndex){
  30. Preconditions.checkArgument(columnIndex >0&& columnIndex < data.length +1);
  31. return data[columnIndex -1];
  32. }
  33. /**
  34.     * 设置数据.
  35.     *
  36.     * @param columnIndex 列索引
  37.     * @param value 值
  38.     */
  39. publicvoid setCell(finalint columnIndex,finalObject value){
  40. Preconditions.checkArgument(columnIndex >0&& columnIndex < data.length +1);
  41.        data[columnIndex -1]= value;
  42. }
  43. }
  • 调用 #load() 方法,将当前结果集的一条行数据加载到内存。

2.2.3 AbstractDecoratorResultSetMerger

AbstractDecoratorResultSetMerger,装饰结果集归并抽象类,通过调用其装饰的归并对象#getValue() 方法获得行数据。
  1. publicabstractclassAbstractDecoratorResultSetMergerimplementsResultSetMerger{
  2. /**
  3.     * 装饰的归并对象
  4.     */
  5. privatefinalResultSetMerger resultSetMerger;
  6. @Override
  7. publicObject getValue(finalint columnIndex,finalClass<?> type)throwsSQLException{
  8. return resultSetMerger.getValue(columnIndex, type);
  9. }
  10. }

3. OrderByStreamResultSetMerger

OrderByStreamResultSetMerger,基于 Stream 方式排序归并结果集实现。

3.1 归并算法

因为各个分片结果集已经排序完成,使用《归并算法》能够充分利用这个优势。
归并操作(merge),也叫归并算法,指的是将两个已经排序的序列合并成一个序列的操作。归并排序算法依赖归并操作。
【迭代法】
  1. 申请空间,使其大小为两个已经排序序列之和,该空间用来存放合并后的序列
  2. 设定两个指针,最初位置分别为两个已经排序序列的起始位置
  3. 比较两个指针所指向的元素,选择相对小的元素放入到合并空间,并移动指针到下一位置
  4. 重复步骤3直到某一指针到达序列尾
  5. 将另一序列剩下的所有元素直接复制到合并序列尾
从定义上看,是不是超级符合我们这个场景。😈 此时此刻,你是不是捂着胸口,感叹:“大学怎么没好好学数据结构与算法呢”?反正我是捂着了,都是眼泪。
  1. publicclassOrderByStreamResultSetMergerextendsAbstractStreamResultSetMerger{
  2. /**
  3.     * 排序列
  4.     */
  5. @Getter(AccessLevel.NONE)
  6. privatefinalList<OrderItem> orderByItems;
  7. /**
  8.     * 排序值对象队列
  9.     */
  10. privatefinalQueue<OrderByValue> orderByValuesQueue;
  11. /**
  12.     * 默认排序类型
  13.     */
  14. privatefinalOrderType nullOrderType;
  15. /**
  16.     * 是否第一个 ResultSet 已经调用 #next()
  17.     */
  18. privateboolean isFirstNext;
  19. publicOrderByStreamResultSetMerger(finalList<ResultSet> resultSets,finalList<OrderItem> orderByItems,finalOrderType nullOrderType)throwsSQLException{
  20. this.orderByItems = orderByItems;
  21. this.orderByValuesQueue =newPriorityQueue<>(resultSets.size());
  22. this.nullOrderType = nullOrderType;
  23.        orderResultSetsToQueue(resultSets);
  24.        isFirstNext =true;
  25. }
  26. privatevoid orderResultSetsToQueue(finalList<ResultSet> resultSets)throwsSQLException{
  27. for(ResultSet each : resultSets){
  28. OrderByValue orderByValue =newOrderByValue(each, orderByItems, nullOrderType);
  29. if(orderByValue.next()){
  30.                orderByValuesQueue.offer(orderByValue);
  31. }
  32. }
  33. // 设置当前 ResultSet,这样 #getValue() 能拿到记录
  34.        setCurrentResultSet(orderByValuesQueue.isEmpty()? resultSets.get(0): orderByValuesQueue.peek().getResultSet());
  35. }
  • 属性 orderByValuesQueue 使用的队列实现是优先级队列( PriorityQueue )。有兴趣的同学可以看看《JDK源码研究PriorityQueue》,本文不展开讲,不是主角戏份不多。我们记住几个方法的用途:
    • #offer():增加元素。增加时,会将该元素和已有元素们按照优先级进行排序
    • #peek():获得优先级第一的元素
    • #pool():获得优先级第一的元素并移除
  • 一个 ResultSet 构建一个 OrderByValue 用于排序,即上文归并算法提到的“空间”
    • 调用 OrderByValue#next() 方法时,获得其对应结果集排在第一条的记录,通过 #getOrderValues() 计算该记录的排序字段值。这样两个OrderByValue 通过 #compareTo() 方法可以比较两个结果集的第一条记录。
    1. publicfinalclassOrderByValueimplementsComparable<OrderByValue>{
      /**
    2. * 已排序结果集
    3. */
    4. @Getter
    5. privatefinalResultSet resultSet;
    6. /**
    7. * 排序列
    8. */
    9. privatefinalList&lt;OrderItem&gt; orderByItems;
    10. /**
    11. * 默认排序类型
    12. */
    13. privatefinalOrderType nullOrderType;
    14. /**
    15. * 排序列对应的值数组
    16. * 因为一条记录可能有多个排序列,所以是数组
    17. */
    18. privateList&lt;Comparable&lt;?&gt;&gt; orderValues;
    19. /**
    20. * 遍历下一个结果集游标.
    21. *
    22. * @return 是否有下一个结果集
    23. * @throws SQLException SQL异常
    24. */
    25. publicbooleannext()throwsSQLException{
    26. boolean result = resultSet.next();
    27.    orderValues = result ? getOrderValues():Collections.&lt;Comparable&lt;?&gt;&gt;emptyList();
    28. return result;
    29. }
    30. /**
    31. * 获得 排序列对应的值数组
    32. *
    33. * @return 排序列对应的值数组
    34. * @throws SQLException 当结果集关闭时
    35. */
    36. privateList&lt;Comparable&lt;?&gt;&gt; getOrderValues()throwsSQLException{
    37. List&lt;Comparable&lt;?&gt;&gt; result =newArrayList&lt;&gt;(orderByItems.size());
    38. for(OrderItem each : orderByItems){
    39. Object value = resultSet.getObject(each.getIndex());
    40. Preconditions.checkState(null== value || value instanceofComparable,"Order by value must implements Comparable");
    41.        result.add((Comparable&lt;?&gt;) value);
    42. }
    43. return result;
    44. }
    45. /**
    46. * 对比 {@link #orderValues},即两者的第一条记录
    47. *
    48. * @param o 对比 OrderByValue
    49. * @return -1 0 1
    50. */
    51. @Override
    52. publicint compareTo(finalOrderByValue o){
    53. for(int i =0; i &lt; orderByItems.size(); i++){
    54. OrderItem thisOrderBy = orderByItems.get(i);
    55. int result =ResultSetUtil.compareTo(orderValues.get(i), o.orderValues.get(i), thisOrderBy.getType(), nullOrderType);
    56. if(0!= result){
    57. return result;
    58. }
    59. }
    60. return0;
    61. }
    62. }
  • if(orderByValue.next()){ 处,调用 OrderByValue#next() 后,添加到 PriorityQueue。因此, orderByValuesQueue.peek().getResultSet() 能够获得多个 ResultSet 中排在第一的。

3.2 #next()

通过调用 OrderByStreamResultSetMerger#next() 不断获得当前排在第一的记录。 #next() 每次调用后,实际做的是当前 ResultSet 的替换,以及当前的 ResultSet 的记录指向下一条。这样说起来可能比较绕,我们来看一张图:
  • 白色向下箭头:OrderByStreamResultSetMerger 对 ResultSet 的指向。
  • 黑色箭头:ResultSet 对当前记录的指向。
  • ps:这块如果分享的不清晰让您费劲,十分抱歉。欢迎加我微信(wangwenbin-server)交流下,这样我也可以优化表述。
  1. // OrderByStreamResultSetMerger.java
  2. @Override
  3. publicbooleannext()throwsSQLException{
  4. if(orderByValuesQueue.isEmpty()){
  5. returnfalse;
  6. }
  7. if(isFirstNext){
  8.       isFirstNext =false;
  9. returntrue;
  10. }
  11. // 移除上一次获得的 ResultSet
  12. OrderByValue firstOrderByValue = orderByValuesQueue.poll();
  13. // 如果上一次获得的 ResultSet还有下一条记录,继续添加到 排序值对象队列
  14. if(firstOrderByValue.next()){
  15.       orderByValuesQueue.offer(firstOrderByValue);
  16. }
  17. if(orderByValuesQueue.isEmpty()){
  18. returnfalse;
  19. }
  20. // 设置当前 ResultSet
  21.   setCurrentResultSet(orderByValuesQueue.peek().getResultSet());
  22. returntrue;
  23. }
  • orderByValuesQueue.poll() 移除上一次获得的 ResultSet。为什么不能 #setCurrentResultSet() 就移除呢?如果该 ResultSet 里面还存在下一条记录,需要继续参加排序。而判断是否有下一条,需要调用 ResultSet#next() 方法,这会导致 ResultSet 指向了下一条记录。因而 orderByValuesQueue.poll() 调用是后置的。
  • isFirstNext 变量那的判断看着是不是很“灵异”?因为 #orderResultSetsToQueue() 处设置了第一次的 ResultSet。如果不加这个标记,会导致第一条记录“不见”了。
  • 通过不断的 Queue#poll()Queue#offset() 实现排序。巧妙!仿佛 Get 新技能了:
    1. // 移除上一次获得的 ResultSet
    2. OrderByValue firstOrderByValue = orderByValuesQueue.poll();
    3. // 如果上一次获得的 ResultSet还有下一条记录,继续添加到 排序值对象队列
    4. if(firstOrderByValue.next()){
    5.  orderByValuesQueue.offer(firstOrderByValue);
    6. }

在看下,我们上文 Stream 方式归并的定义:将数据游标与结果集的游标保持一致,顺序的从结果集中一条条的获取正确的数据。是不是能够清晰的对上了?!🙂

4. GroupByStreamResultSetMerger

GroupByStreamResultSetMerger,基于 Stream 方式分组归并结果集实现。 它继承自 OrderByStreamResultSetMerger,在排序的逻辑上,实现分组功能。实现原理也较为简单:
  1. publicfinalclassGroupByStreamResultSetMergerextendsOrderByStreamResultSetMerger{
  2. /**
  3.     * 查询列名与位置映射
  4.     */
  5. privatefinalMap<String,Integer> labelAndIndexMap;
  6. /**
  7.     * Select SQL语句对象
  8.     */
  9. privatefinalSelectStatement selectStatement;
  10. /**
  11.     * 当前结果记录
  12.     */
  13. privatefinalList<Object> currentRow;
  14. /**
  15.     * 下一条结果记录 GROUP BY 条件
  16.     */
  17. privateList<?> currentGroupByValues;
  18. publicGroupByStreamResultSetMerger(
  19. finalMap<String,Integer> labelAndIndexMap,finalList<ResultSet> resultSets,finalSelectStatement selectStatement,finalOrderType nullOrderType)throwsSQLException{
  20. super(resultSets, selectStatement.getOrderByItems(), nullOrderType);
  21. this.labelAndIndexMap = labelAndIndexMap;
  22. this.selectStatement = selectStatement;
  23.        currentRow =newArrayList<>(labelAndIndexMap.size());
  24. // 初始化下一条结果记录 GROUP BY 条件
  25.        currentGroupByValues = getOrderByValuesQueue().isEmpty()?Collections.emptyList():newGroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues();
  26. }
  27. @Override
  28. publicObject getValue(finalint columnIndex,finalClass<?> type)throwsSQLException{
  29. return currentRow.get(columnIndex -1);
  30. }
  31. @Override
  32. publicObject getValue(finalString columnLabel,finalClass<?> type)throwsSQLException{
  33. Preconditions.checkState(labelAndIndexMap.containsKey(columnLabel),String.format("Can't find columnLabel: %s", columnLabel));
  34. return currentRow.get(labelAndIndexMap.get(columnLabel)-1);
  35. }
  36. }
  • currentRow 为当前结果记录,使用 #getValue()#getCalendarValue() 方法获得当前结果记录的查询列值。
  • currentGroupByValues下一条结果记录 GROUP BY 条件,通过 GroupByValue 生成:
    1. publicfinalclassGroupByValue{
      /**
    2. * 分组条件值数组
    3. */
    4. privatefinalList&lt;?&gt; groupValues;
    5. publicGroupByValue(finalResultSet resultSet,finalList&lt;OrderItem&gt; groupByItems)throwsSQLException{
    6.    groupValues = getGroupByValues(resultSet, groupByItems);
    7. }
    8. /**
    9. * 获得分组条件值数组
    10. * 例如,`GROUP BY user_id, order_status` 返回的某条记录结果为 `userId = 1, order_status = 3`,对应的 `groupValues = [1, 3]`
    11. * @param resultSet 结果集(单分片)
    12. * @param groupByItems 分组列
    13. * @return 分组条件值数组
    14. * @throws SQLException 当结果集关闭
    15. */
    16. privateList&lt;?&gt; getGroupByValues(finalResultSet resultSet,finalList&lt;OrderItem&gt; groupByItems)throwsSQLException{
    17. List&lt;Object&gt; result =newArrayList&lt;&gt;(groupByItems.size());
    18. for(OrderItem each : groupByItems){
    19.        result.add(resultSet.getObject(each.getIndex()));// 从结果集获得每个分组条件的值
    20. }
    21. return result;
    22. }
    23. }
  • GroupByStreamResultSetMerger 在创建时,当前结果记录实际未合并,需要先调用 #next(),在使用 #getValue() 等方法获取值,这个和 OrderByStreamResultSetMerger 不同,可能是个 BUG。

4.1 AggregationUnit

AggregationUnit,归并计算单元接口,有两个接口方法:
  • #merge():归并聚合值
  • #getResult():获取计算结果
一共有三个实现类:
  • AccumulationAggregationUnit:累加聚合单元,解决 COUNT、SUM 聚合列
  • ComparableAggregationUnit:比较聚合单元,解决 MAX、MIN 聚合列
  • AverageAggregationUnit:平均值聚合单元,解决 AVG 聚合列
实现都比较易懂,直接点击链接查看源码,我们就不浪费篇幅贴代码啦。

4.2 #next()

我们先看看大体的调用流程:
😈 看起来代码比较多,逻辑其实比较清晰,对照着顺序图顺序往下读即可。
  1. // GroupByStreamResultSetMerger.java
  2. @Override
  3. publicbooleannext()throwsSQLException{
  4. // 清除当前结果记录
  5.   currentRow.clear();
  6. if(getOrderByValuesQueue().isEmpty()){
  7. returnfalse;
  8. }
  9. //
  10. if(isFirstNext()){
  11. super.next();
  12. }
  13. // 顺序合并下面相同分组条件的记录
  14. if(aggregateCurrentGroupByRowAndNext()){
  15. // 生成下一条结果记录 GROUP BY 条件
  16.       currentGroupByValues =newGroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues();
  17. }
  18. returntrue;
  19. }
  20. privateboolean aggregateCurrentGroupByRowAndNext()throwsSQLException{
  21. boolean result =false;
  22. // 生成计算单元
  23. Map<AggregationSelectItem,AggregationUnit> aggregationUnitMap =Maps.toMap(selectStatement.getAggregationSelectItems(),newFunction<AggregationSelectItem,AggregationUnit>(){
  24. @Override
  25. publicAggregationUnit apply(finalAggregationSelectItem input){
  26. returnAggregationUnitFactory.create(input.getType());
  27. }
  28. });
  29. // 循环顺序合并下面相同分组条件的记录
  30. while(currentGroupByValues.equals(newGroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues())){
  31. // 归并聚合值
  32.       aggregate(aggregationUnitMap);
  33. // 缓存当前记录到结果记录
  34.       cacheCurrentRow();
  35. // 获取下一条记录
  36.       result =super.next();
  37. if(!result){
  38. break;
  39. }
  40. }
  41. // 设置当前记录的聚合字段结果
  42.   setAggregationValueToCurrentRow(aggregationUnitMap);
  43. return result;
  44. }
  45. privatevoid aggregate(finalMap<AggregationSelectItem,AggregationUnit> aggregationUnitMap)throwsSQLException{
  46. for(Entry<AggregationSelectItem,AggregationUnit> entry : aggregationUnitMap.entrySet()){
  47. List<Comparable<?>> values =newArrayList<>(2);
  48. if(entry.getKey().getDerivedAggregationSelectItems().isEmpty()){// SUM/COUNT/MAX/MIN 聚合列
  49.           values.add(getAggregationValue(entry.getKey()));
  50. }else{
  51. for(AggregationSelectItem each : entry.getKey().getDerivedAggregationSelectItems()){// AVG 聚合列
  52.               values.add(getAggregationValue(each));
  53. }
  54. }
  55.       entry.getValue().merge(values);
  56. }
  57. }
  58. privatevoid cacheCurrentRow()throwsSQLException{
  59. for(int i =0; i < getCurrentResultSet().getMetaData().getColumnCount(); i++){
  60.       currentRow.add(getCurrentResultSet().getObject(i +1));
  61. }
  62. }
  63. privateComparable<?> getAggregationValue(finalAggregationSelectItem aggregationSelectItem)throwsSQLException{
  64. Object result = getCurrentResultSet().getObject(aggregationSelectItem.getIndex());
  65. Preconditions.checkState(null== result || result instanceofComparable,"Aggregation value must implements Comparable");
  66. return(Comparable<?>) result;
  67. }
  68. privatevoid setAggregationValueToCurrentRow(finalMap<AggregationSelectItem,AggregationUnit> aggregationUnitMap){
  69. for(Entry<AggregationSelectItem,AggregationUnit> entry : aggregationUnitMap.entrySet()){
  70.       currentRow.set(entry.getKey().getIndex()-1, entry.getValue().getResult());// 获取计算结果
  71. }
  72. }

5. GroupByMemoryResultSetMerger

GroupByMemoryResultSetMerger,基于 内存 分组归并结果集实现。
区别于 GroupByStreamResultSetMerger,其无法使用每个分片结果集的有序的特点,只能在内存中合并后,进行整个重新排序。因而,性能和内存都较 GroupByStreamResultSetMerger 会差。
主流程如下:
  1. publicfinalclassGroupByMemoryResultSetMergerextendsAbstractMemoryResultSetMerger{
  2. /**
  3.     * Select SQL语句对象
  4.     */
  5. privatefinalSelectStatement selectStatement;
  6. /**
  7.     * 默认排序类型
  8.     */
  9. privatefinalOrderType nullOrderType;
  10. /**
  11.     * 内存结果集
  12.     */
  13. privatefinalIterator<MemoryResultSetRow> memoryResultSetRows;
  14. publicGroupByMemoryResultSetMerger(
  15. finalMap<String,Integer> labelAndIndexMap,finalList<ResultSet> resultSets,finalSelectStatement selectStatement,finalOrderType nullOrderType)throwsSQLException{
  16. super(labelAndIndexMap);
  17. this.selectStatement = selectStatement;
  18. this.nullOrderType = nullOrderType;
  19.        memoryResultSetRows = init(resultSets);
  20. }
  21. privateIterator<MemoryResultSetRow> init(finalList<ResultSet> resultSets)throwsSQLException{
  22. Map<GroupByValue,MemoryResultSetRow> dataMap =newHashMap<>(1024);// 分组条件值与内存记录映射
  23. Map<GroupByValue,Map<AggregationSelectItem,AggregationUnit>> aggregationMap =newHashMap<>(1024);// 分组条件值与聚合列映射
  24. // 遍历结果集
  25. for(ResultSet each : resultSets){
  26. while(each.next()){
  27. // 生成分组条件
  28. GroupByValue groupByValue =newGroupByValue(each, selectStatement.getGroupByItems());
  29. // 初始化分组条件到 dataMap、aggregationMap 映射
  30.                initForFirstGroupByValue(each, groupByValue, dataMap, aggregationMap);
  31. // 归并聚合值
  32.                aggregate(each, groupByValue, aggregationMap);
  33. }
  34. }
  35. // 设置聚合列结果到内存记录
  36.        setAggregationValueToMemoryRow(dataMap, aggregationMap);
  37. // 内存排序
  38. List<MemoryResultSetRow> result = getMemoryResultSetRows(dataMap);
  39. // 设置当前 ResultSet,这样 #getValue() 能拿到记录
  40. if(!result.isEmpty()){
  41.            setCurrentResultSetRow(result.get(0));
  42. }
  43. return result.iterator();
  44. }
  45. }
  • #initForFirstGroupByValue() 初始化分组条件dataMapaggregationMap 映射中,这样可以调用 #aggregate() 将聚合值归并到 aggregationMap 里的该分组条件。
    1. privatevoid initForFirstGroupByValue(finalResultSet resultSet,finalGroupByValue groupByValue,finalMap<GroupByValue,MemoryResultSetRow> dataMap,
    2. finalMap<GroupByValue,Map<AggregationSelectItem,AggregationUnit>> aggregationMap)throwsSQLException{
    3. // 初始化分组条件到 dataMap
    4. if(!dataMap.containsKey(groupByValue)){
    5.        dataMap.put(groupByValue,newMemoryResultSetRow(resultSet));
    6. }
    7. // 初始化分组条件到 aggregationMap
    8. if(!aggregationMap.containsKey(groupByValue)){
    9. Map<AggregationSelectItem,AggregationUnit> map =Maps.toMap(selectStatement.getAggregationSelectItems(),newFunction<AggregationSelectItem,AggregationUnit>(){
      @Override
    10. publicAggregationUnit apply(finalAggregationSelectItem input){
    11. returnAggregationUnitFactory.create(input.getType());
    12. }
    13. });
    14.    aggregationMap.put(groupByValue, map);
    15. }
    16. }
  • 聚合完每个分组条件后,将聚合列结果 aggregationMap 合并到 dataMap
    1. privatevoid setAggregationValueToMemoryRow(finalMap<GroupByValue,MemoryResultSetRow> dataMap,finalMap<GroupByValue,Map<AggregationSelectItem,AggregationUnit>> aggregationMap){
    2. for(Entry<GroupByValue,MemoryResultSetRow> entry : dataMap.entrySet()){// 遍 历内存记录
    3. for(AggregationSelectItem each : selectStatement.getAggregationSelectItems()){// 遍历 每个聚合列
    4.           entry.getValue().setCell(each.getIndex(), aggregationMap.get(entry.getKey()).get(each).getResult());
    5. }
    6. }
    7. }
  • 调用 #getMemoryResultSetRows() 方法对内存记录进行内存排序
  1. // GroupByMemoryResultSetMerger.java
  2. privateList<MemoryResultSetRow> getMemoryResultSetRows(finalMap<GroupByValue,MemoryResultSetRow> dataMap){
  3. List<MemoryResultSetRow> result =newArrayList<>(dataMap.values());
  4. Collections.sort(result,newGroupByRowComparator(selectStatement, nullOrderType));// 内存排序
  5. return result;
  6. }
  7. // GroupByRowComparator.java
  8. privateint compare(finalMemoryResultSetRow o1,finalMemoryResultSetRow o2,finalList<OrderItem> orderItems){
  9. for(OrderItem each : orderItems){
  10. Object orderValue1 = o1.getCell(each.getIndex());
  11. Preconditions.checkState(null== orderValue1 || orderValue1 instanceofComparable,"Order by value must implements Comparable");
  12. Object orderValue2 = o2.getCell(each.getIndex());
  13. Preconditions.checkState(null== orderValue2 || orderValue2 instanceofComparable,"Order by value must implements Comparable");
  14. int result =ResultSetUtil.compareTo((Comparable) orderValue1,(Comparable) orderValue2, each.getType(), nullOrderType);
  15. if(0!= result){
  16. return result;
  17. }
  18. }
  19. return0;
  20. }
  • 总的来说,GROUP BY 内存归并和我们日常使用 Map 计算用户订单数是比较相似的。

5.1 #next()

  1. @Override
  2. publicbooleannext()throwsSQLException{
  3. if(memoryResultSetRows.hasNext()){
  4.       setCurrentResultSetRow(memoryResultSetRows.next());
  5. returntrue;
  6. }
  7. returnfalse;
  8. }
  • 内存归并完成后,使用 memoryResultSetRows 不断获得下一条记录。

6. IteratorStreamResultSetMerger

IteratorStreamResultSetMerger,基于 Stream 迭代归并结果集实现。
  1. publicfinalclassIteratorStreamResultSetMergerextendsAbstractStreamResultSetMerger{
  2. /**
  3.     * ResultSet 数组迭代器
  4.     */
  5. privatefinalIterator<ResultSet> resultSets;
  6. publicIteratorStreamResultSetMerger(finalList<ResultSet> resultSets){
  7. this.resultSets = resultSets.iterator();
  8. // 设置当前 ResultSet,这样 #getValue() 能拿到记录
  9.        setCurrentResultSet(this.resultSets.next());
  10. }
  11. @Override
  12. publicbooleannext()throwsSQLException{
  13. // 当前 ResultSet 迭代下一条记录
  14. if(getCurrentResultSet().next()){
  15. returntrue;
  16. }
  17. if(!resultSets.hasNext()){
  18. returnfalse;
  19. }
  20. // 获得下一个ResultSet, 设置当前 ResultSet
  21.        setCurrentResultSet(resultSets.next());
  22. boolean hasNext = getCurrentResultSet().next();
  23. if(hasNext){
  24. returntrue;
  25. }
  26. while(!hasNext && resultSets.hasNext()){
  27.            setCurrentResultSet(resultSets.next());
  28.            hasNext = getCurrentResultSet().next();
  29. }
  30. return hasNext;
  31. }
  32. }

7. LimitDecoratorResultSetMerger

LimitDecoratorResultSetMerger,基于 Decorator 分页结果集归并实现。
  1. publicfinalclassLimitDecoratorResultSetMergerextendsAbstractDecoratorResultSetMerger{
  2. /**
  3.     * 分页条件
  4.     */
  5. privatefinalLimit limit;
  6. /**
  7.     * 是否全部记录都跳过了,即无符合条件记录
  8.     */
  9. privatefinalboolean skipAll;
  10. /**
  11.     * 当前已返回行数
  12.     */
  13. privateint rowNumber;
  14. publicLimitDecoratorResultSetMerger(finalResultSetMerger resultSetMerger,finalLimit limit)throwsSQLException{
  15. super(resultSetMerger);
  16. this.limit = limit;
  17.        skipAll = skipOffset();
  18. }
  19. privateboolean skipOffset()throwsSQLException{
  20. // 跳过 skip 记录
  21. for(int i =0; i < limit.getOffsetValue(); i++){
  22. if(!getResultSetMerger().next()){
  23. returntrue;
  24. }
  25. }
  26. // 行数
  27.        rowNumber = limit.isRowCountRewriteFlag()?0: limit.getOffsetValue();
  28. returnfalse;
  29. }
  30. @Override
  31. publicbooleannext()throwsSQLException{
  32. if(skipAll){
  33. returnfalse;
  34. }
  35. // 获得下一条记录
  36. if(limit.getRowCountValue()>-1){
  37. return++rowNumber <= limit.getRowCountValue()&& getResultSetMerger().next();
  38. }
  39. // 部分db 可以直 offset,不写 limit 行数,例如 oracle
  40. return getResultSetMerger().next();
  41. }
  42. }
  • LimitDecoratorResultSetMerger 可以对其他 ResultSetMerger 进行装饰,调用其他 ResultSetMerger 的 #next() 不断获得下一条记录。

666. 彩蛋

诶?应该是有蛮多地方解释的不是很清晰,如果让您阅读误解或是阻塞,非常抱歉。代码读起来比较易懂,使用文字来解释,对表述能力较差的自己,可能就绞尽脑汁,一脸懵逼。
恩,如果可以,还烦请把读起来不太爽的地方告诉我,谢谢。
厚着脸皮,道友,分享一波朋友圈可好?
如下是小礼包,嘿嘿
归并结果集接口SQL
OrderByStreamResultSetMergerSELECT*FROM t_order ORDER BY id
GroupByStreamResultSetMergerSELECT uid,AVG(id)FROM t_order GROUP BY uid
GroupByMemoryResultSetMergerSELECT uid FROM t_order GROUP BY id ORDER BY id DESC
IteratorStreamResultSetMergerSELECT*FROM t_order
LimitDecoratorResultSetMergerSELECT*FROM t_order ORDER BY id LIMIT10
继续阅读
阅读原文