原文地址:
http://www.yunai.me/MyCAT/two-table-share-join/?mp
MyCat-Server带注释代码
地址 :https://github.com/YunaiV/Mycat-Server

😈本系列每 --- -更新一篇,欢迎订阅、关注、收藏 公众号
QQ :7685413

  • 1. 概述
  • 2. 主流程
  • 3. ShareJoin
    • 3.1 JoinParser
    • 3.2 ShareJoin.processSQL(...)
    • 3.3 BatchSQLJob
    • 3.4 ShareDBJoinHandler
    • 3.5 ShareRowOutPutDataHandler
  • 4. 彩蛋

1. 概述

MyCAT 支持跨库表 Join,目前版本仅支持跨库表 Join。虽然如此,已经能够满足我们大部分的业务场景。况且,Join 过多的表可能带来的性能问题也是很麻烦的。
本文主要分享:
  1. 整体流程、调用顺序图
  2. 核心代码的分析
前置阅读:《MyCAT 源码分析 —— 【单库单表】查询》。
OK,Let's Go。

2. 主流程

当执行跨库两表 Join SQL 时,经历的大体流程如下:
SQL 上,需要添加注解 /*!mycat:catlet=io.mycat.catlets.ShareJoin */${SQL}RouteService#route(...) 解析注解 mycat:catlet 后,路由给 HintCatletHandler 作进一步处理。
HintCatletHandler 获取注解对应的 Catlet 实现类, io.mycat.catlets.ShareJoin 就是其中一种实现(目前也只有这一种实现),提供了跨库两表 Join 的功能。从类命名上看, ShareJoin 很大可能性后续会提供完整的跨库多表的 Join 功能。
核心代码如下:
  1. // HintCatletHandler.java
  2. publicRouteResultset route(SystemConfig sysConfig,SchemaConfig schema,
  3. int sqlType,String realSQL,String charset,ServerConnection sc,
  4. LayerCachePool cachePool,String hintSQLValue,int hintSqlType,Map hintMap)
  5. throwsSQLNonTransientException{
  6. String cateletClass = hintSQLValue;
  7. if(LOGGER.isDebugEnabled()){
  8.       LOGGER.debug("load catelet class:"+ hintSQLValue +" to run sql "+ realSQL);
  9. }
  10. try{
  11. Catlet catlet =(Catlet)MycatServer.getInstance().getCatletClassLoader().getInstanceofClass(cateletClass);
  12.       catlet.route(sysConfig, schema, sqlType, realSQL, charset, sc, cachePool);
  13.       catlet.processSQL(realSQL,newEngineCtx(sc.getSession2()));
  14. }catch(Exception e){
  15.       LOGGER.warn("catlet error "+ e);
  16. thrownewSQLNonTransientException(e);
  17. }
  18. returnnull;
  19. }

3. ShareJoin

目前支持跨库表 Join。 ShareJoin 将 SQL 拆分成左表 SQL 和 右表 SQL,发送给各数据节点执行,汇总数据结果进行合后返回。
伪代码如下:
  1. // SELECT u.id, o.id FROM t_order o
  2. // INNER JOIN t_user u ON o.uid = u.id
  3. // 【顺序】查询左表
  4. String leftSQL ="SELECT o.id, u.id FROM t_order o";
  5. List leftList = dn[0].select(leftSQL)+ dn[1].select(leftSQL)+...+ dn[n].select(leftsql);
  6. // 【并行】查询右表
  7. String rightSQL ="SELECT u.id FROM t_user u WHERE u.id IN (${leftList.uid})";
  8. for(dn : dns){// 此处是并行执行,使用回调逻辑
  9. for(rightRecord : dn.select(rightSQL)){// 查询右表
  10. // 合并结果
  11. for(leftRecord : leftList){
  12. if(leftRecord.uid == rightRecord.id){
  13.                write(leftRecord + leftRecord.uid 拼接结果);
  14. }
  15. }
  16. }
  17. }
实际情况会更加复杂,我们接下来一点点往下看。

3.1 JoinParser

JoinParser 负责对 SQL 进行解析。整体流程如下:
举个例子, /*!mycat:catlet=io.mycat.catlets.ShareJoin */SELECT o.id,u.usernamefromt_order o join t_user u on o.uid=u.id; 解析后, TableFilter 结果如下:
  • tName :表名
  • tAlia :表自定义命名
  • where :过滤条件
  • order :排序条件
  • parenTable :左连接的 Join 的表名。 t_user表 在 join属性 的 parenTable 为 "o",即 t_order
  • joinParentkey :左连接的 Join 字段
  • joinKey :join 字段。 t_user表 在 join属性 为 id
  • join :子 tableFilter。即,该表连接的右边的表。
  • parent :和 join属性 相对。
看到此处,大家可能有疑问,为什么要把 SQL 解析成 TableFilterJoinParser 根据 TableFilter 生成数据节点执行 SQL。代码如下:
  1. // TableFilter.java
  2. publicString getSQL(){
  3. String sql ="";
  4. // fields
  5. for(Entry<String,String> entry : fieldAliasMap.entrySet()){
  6. String key = entry.getKey();
  7. String val = entry.getValue();
  8. if(val ==null){
  9.           sql = unionsql(sql, getFieldfrom(key),",");
  10. }else{
  11.           sql = unionsql(sql, getFieldfrom(key)+" as "+ val,",");
  12. }
  13. }
  14. // where
  15. if(parent ==null){// on/where 等于号左边的表
  16. String parentJoinKey = getJoinKey(true);
  17. // fix sharejoin bug:
  18. // (AbstractConnection.java:458) -close connection,reason:program err:java.lang.IndexOutOfBoundsException:
  19. // 原因是左表的select列没有包含 join 列,在获取结果时报上面的错误
  20. if(sql !=null&& parentJoinKey !=null&&
  21. !sql.toUpperCase().contains(parentJoinKey.trim().toUpperCase())){
  22.           sql +=", "+ parentJoinKey;
  23. }
  24.       sql ="select "+ sql +" from "+ tName;
  25. if(!(where.trim().equals(""))){
  26.           sql +=" where "+where.trim();
  27. }
  28. }else{// on/where 等于号右边边的表
  29. if(allField){
  30.           sql ="select "+ sql +" from "+ tName;
  31. }else{
  32.           sql = unionField("select "+ joinKey, sql,",");
  33.           sql = sql +" from "+ tName;
  34. //sql="select "+joinKey+","+sql+" from "+tName;
  35. }
  36. if(!(where.trim().equals(""))){
  37.           sql +=" where "+where.trim()+" and ("+ joinKey +" in %s )";
  38. }else{
  39.           sql +=" where "+ joinKey +" in %s ";
  40. }
  41. }
  42. // order
  43. if(!(order.trim().equals(""))){
  44.       sql +=" order by "+ order.trim();
  45. }
  46. // limit
  47. if(parent ==null){
  48. if((rowCount >0)&&(offset >0)){
  49.           sql +=" limit"+ offset +","+ rowCount;
  50. }else{
  51. if(rowCount >0){
  52.               sql +=" limit "+ rowCount;
  53. }
  54. }
  55. }
  56. return sql;
  57. }
  • 当 parent 为空时,即on/where 等于号左边的表。例如: selectid,uidfromt_order
  • 当 parent 不为空时,即on/where 等于号右边的表。例如: selectid,usernamefromt_userwhereidin(1,2,3)

3.2 ShareJoin.processSQL(...)

当 SQL 解析完后,生成左边的表执行的 SQL,发送给对应的数据节点查询数据。大体流程如下:
当 SQL 为 /*!mycat:catlet=io.mycat.catlets.ShareJoin */SELECT o.id,u.usernamefromt_order o join t_user u on o.uid=u.id; 时, sql=getSql() 的返回结果为 selectid,uidfromt_order
生成左边的表执行的 SQL 后,顺序顺序顺序发送给对应的数据节点查询数据。具体顺序查询是怎么实现的,我们来看下章 BatchSQLJob

3.3 BatchSQLJob

EngineCtxBatchSQLJob 封装,提供上层两个方法:
  1. executeNativeSQLSequnceJob :顺序(非并发)在每个数据节点执行SQL任务
  2. executeNativeSQLParallJob :并发在每个数据节点执行SQL任务
核心代码如下:
  1. // EngineCtx.java
  2. publicvoid executeNativeSQLSequnceJob(String[] dataNodes,String sql,
  3. SQLJobHandler jobHandler){
  4. for(String dataNode : dataNodes){
  5. SQLJob job =newSQLJob(jobId.incrementAndGet(), sql, dataNode,
  6.                jobHandler,this);
  7.        bachJob.addJob(job,false);
  8. }
  9. }
  10. publicvoid executeNativeSQLParallJob(String[] dataNodes,String sql,
  11. SQLJobHandler jobHandler){
  12. for(String dataNode : dataNodes){
  13. SQLJob job =newSQLJob(jobId.incrementAndGet(), sql, dataNode,
  14.                jobHandler,this);
  15.        bachJob.addJob(job,true);
  16. }
  17. }

BatchSQLJob 通过执行中任务列表待执行任务列表来实现顺序/并发执行任务。核心代码如下:
  1. // BatchSQLJob.java
  2. /**
  3. * 执行中任务列表
  4. */
  5. privateConcurrentHashMap<Integer,SQLJob> runningJobs =newConcurrentHashMap<Integer,SQLJob>();
  6. /**
  7. * 待执行任务列表
  8. */
  9. privateConcurrentLinkedQueue<SQLJob> waitingJobs =newConcurrentLinkedQueue<SQLJob>();
  10. publicvoid addJob(SQLJob newJob,boolean parallExecute){
  11. if(parallExecute){
  12.       runJob(newJob);
  13. }else{
  14.       waitingJobs.offer(newJob);
  15. if(runningJobs.isEmpty()){// 若无正在执行中的任务,则从等待队列里获取任务进行执行。
  16. SQLJob job = waitingJobs.poll();
  17. if(job !=null){
  18.               runJob(job);
  19. }
  20. }
  21. }
  22. }
  23. publicboolean jobFinished(SQLJob sqlJob){
  24.    runningJobs.remove(sqlJob.getId());
  25. SQLJob job = waitingJobs.poll();
  26. if(job !=null){
  27.        runJob(job);
  28. returnfalse;
  29. }else{
  30. if(noMoreJobInput){
  31. return runningJobs.isEmpty()&& waitingJobs.isEmpty();
  32. }else{
  33. returnfalse;
  34. }
  35. }
  36. }
  • 顺序执行时,当 runningJobs 存在执行中的任务时, #addJob(...) 时,不立即执行,添加到 waitingJobs。当 SQLJob 完成时,顺序调用下一个任务。
  • 并发执行时, #addJob(...) 时,立即执行。

SQLJob SQL 异步执行任务。其 jobHandler(SQLJobHandler) 属性,在 SQL 执行有返回结果时,会进行回调,从而实现异步执行。
ShareJoin 里, SQLJobHandler 有两个实现: ShareDBJoinHandlerShareRowOutPutDataHandler。前者,左边的表执行的 SQL 回调;后者,右边的表执行的 SQL 回调。

3.4 ShareDBJoinHandler

ShareDBJoinHandler左边的表执行的 SQL 回调。流程如下:
  • #fieldEofResponse(...) :接收数据节点返回的 fields,放入内存。
  • #rowResponse(...) :接收数据节点返回的 row,放入内存。
  • #rowEofResponse(...) :接收完一个数据节点返回所有的 row。当所有数据节点都完成 SQL 执行时,提交右边的表执行的 SQL 任务,并行执行,即图中#createQryJob(...)
当 SQL 为 /*!mycat:catlet=io.mycat.catlets.ShareJoin */SELECT o.id,u.usernamefromt_order o join t_user u on o.uid=u.id; 时, sql=getChildSQL() 的返回结果为 selectid,usernamefromt_userwhereidin(1,2,3)
核心代码如下:
  1. // ShareJoin.java
  2. privatevoid createQryJob(int batchSize){
  3. int count =0;
  4. Map<String,byte[]> batchRows =newConcurrentHashMap<String,byte[]>();
  5. String theId =null;
  6. StringBuilder sb =newStringBuilder().append('(');
  7. String svalue ="";
  8. for(Map.Entry<String,String> e : ids.entrySet()){
  9.       theId = e.getKey();
  10. byte[] rowbyte = rows.remove(theId);
  11. if(rowbyte !=null){
  12.           batchRows.put(theId, rowbyte);
  13. }
  14. if(!svalue.equals(e.getValue())){
  15. if(joinKeyType ==Fields.FIELD_TYPE_VAR_STRING
  16. || joinKeyType ==Fields.FIELD_TYPE_STRING){// joinkey 为varchar
  17.               sb.append("'").append(e.getValue()).append("'").append(',');// ('digdeep','yuanfang')
  18. }else{// 默认joinkey为int/long
  19.               sb.append(e.getValue()).append(',');// (1,2,3)
  20. }
  21. }
  22.       svalue = e.getValue();
  23. if(count++> batchSize){
  24. break;
  25. }
  26. }
  27. if(count ==0){
  28. return;
  29. }
  30.   jointTableIsData =true;
  31.   sb.deleteCharAt(sb.length()-1).append(')');
  32. String sql =String.format(joinParser.getChildSQL(), sb);
  33.   getRoute(sql);
  34.   ctx.executeNativeSQLParallJob(getDataNodes(), sql,newShareRowOutPutDataHandler(this, fields, joinindex, joinParser.getJoinRkey(), batchRows, ctx.getSession()));
  35. }

3.5 ShareRowOutPutDataHandler

ShareRowOutPutDataHandler右边的表执行的 SQL 回调。流程如下:
  • #fieldEofResponse(...) :接收数据节点返回的 fields,返回 header 给 MySQL Client。
  • #rowResponse(...) :接收数据节点返回的 row,匹配左表的记录,返回合并后返回的 row 给 MySQL Client。
  • #rowEofResponse(...) :当所有 row 都返回完后,返回 eof 给 MySQL Client。
核心代码如下:
  1. // ShareRowOutPutDataHandler.java
  2. publicboolean onRowData(String dataNode,byte[] rowData){
  3. RowDataPacket rowDataPkgold =ResultSetUtil.parseRowData(rowData, bfields);
  4. //拷贝一份batchRows
  5. Map<String,byte[]> batchRowsCopy =newConcurrentHashMap<String,byte[]>();
  6.   batchRowsCopy.putAll(arows);
  7. // 获取Id字段,
  8. String id =ByteUtil.getString(rowDataPkgold.fieldValues.get(joinR));
  9. // 查找ID对应的A表的记录
  10. byte[] arow = getRow(batchRowsCopy, id, joinL);
  11. while(arow !=null){
  12. RowDataPacket rowDataPkg =ResultSetUtil.parseRowData(arow, afields);//ctx.getAllFields());
  13. for(int i =1; i < rowDataPkgold.fieldCount; i++){
  14. // 设置b.name 字段
  15. byte[] bname = rowDataPkgold.fieldValues.get(i);
  16.           rowDataPkg.add(bname);
  17.           rowDataPkg.addFieldCount(1);
  18. }
  19. // huangyiming add
  20. MiddlerResultHandler middlerResultHandler = session.getMiddlerResultHandler();
  21. if(null== middlerResultHandler){
  22.           ctx.writeRow(rowDataPkg);
  23. }else{
  24. if(middlerResultHandler instanceofMiddlerQueryResultHandler){
  25. byte[] columnData = rowDataPkg.fieldValues.get(0);
  26. if(columnData !=null&& columnData.length >0){
  27. String rowValue =newString(columnData);
  28.                   middlerResultHandler.add(rowValue);
  29. }
  30. //}
  31. }
  32. }
  33.       arow = getRow(batchRowsCopy, id, joinL);
  34. }
  35. returnfalse;
  36. }

4. 彩蛋

如下是本文涉及到的核心类,有兴趣的同学可以翻一翻。
ShareJoin 另外不支持的功能:
  1. 只支持 inner join,不支持 left join、right join 等等连接。
  2. 不支持 order by。
  3. 不支持 group by 以及 相关聚合函数。
  4. 即使 join 左表的字段未声明为返回 fields 也会返回。
恩,MyCAT 弱XA 源码继续走起!
继续阅读
阅读原文