🙂🙂🙂关注微信公众号有福利:
  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右

  • 1. 概述
  • 2. 主流程
  • 3. 查询操作
  • 4. 插入操作
  • 5. 彩蛋

1. 概述

可能你在看到这个标题会小小的吃惊,MyCAT 能使用 MongoDB 做数据节点。是的,没错,确实可以。

吼吼吼,让我们开启这段神奇的“旅途”。
本文主要分成四部分:
  1. 总体流程,让你有个整体的认识
  2. 查询操作
  3. 插入操作
  4. 彩蛋,😈彩蛋,🙂彩蛋
建议你看过这两篇文章(非必须):
  1. 《MyCAT 源码分析 —— 【单库单表】插入》
  2. 《MyCAT 源码分析 —— 【单库单表】查询》

2. 主流程

  1. MyCATServer 接收 MySQLClient 基于 MySQL协议 的请求,翻译 SQL 成 MongoDB操作 发送给 MongoDBServer
  2. MyCATServer 接收 MongoDBServer 返回的 MongoDB数据,翻译成 MySQL数据结果 返回给 MySQLClient
这样一看,MyCAT 连接 MongoDB 是不是少神奇一点列。

Java数据库连接,(Java Database Connectivity,简称JDBC)是Java语言中用来规范客户端程序如何来访问数据库的应用程序接口,提供了诸如查询和更新数据库中数据的方法。JDBC也是Sun Microsystems的商标。JDBC是面向关系型数据库的。
MyCAT 使用 JDBC 规范,抽象了对 MongoDB 的访问。通过这样的方式,MyCAT 也抽象了 SequoiaDB 的访问。可能这样说法有些抽象,看个类图压压惊。
是不是熟悉的味道。不得不说 JDBC 规范的精妙。

3. 查询操作

  1. SELECT id, name FROM user WHERE name >'' ORDER BY _id DESC;
看顺序图已经很方便的理解整体逻辑,我就不多废话啦。我们来看几个核心的代码逻辑。
1、查询 MongoDB
  1. // MongoSQLParser.java
  2. publicMongoData query()throwsMongoSQLException{
  3. if(!(statement instanceofSQLSelectStatement)){
  4. //return null;
  5. thrownewIllegalArgumentException("not a query sql statement");
  6. }
  7. MongoData mongo =newMongoData();
  8. DBCursor c =null;
  9. SQLSelectStatement selectStmt =(SQLSelectStatement) statement;
  10. SQLSelectQuery sqlSelectQuery = selectStmt.getSelect().getQuery();
  11. int icount =0;
  12. if(sqlSelectQuery instanceofMySqlSelectQueryBlock){
  13. MySqlSelectQueryBlock mysqlSelectQuery =(MySqlSelectQueryBlock) selectStmt.getSelect().getQuery();
  14. BasicDBObject fields =newBasicDBObject();
  15. // 显示(返回)的字段
  16. for(SQLSelectItem item : mysqlSelectQuery.getSelectList()){
  17. //System.out.println(item.toString());
  18. if(!(item.getExpr()instanceofSQLAllColumnExpr)){
  19. if(item.getExpr()instanceofSQLAggregateExpr){
  20. SQLAggregateExpr expr =(SQLAggregateExpr) item.getExpr();
  21. if(expr.getMethodName().equals("COUNT")){// TODO 待读:count(*)
  22.                       icount =1;
  23.                       mongo.setField(getExprFieldName(expr),Types.BIGINT);
  24. }
  25.                   fields.put(getExprFieldName(expr),1);
  26. }else{
  27.                   fields.put(getFieldName(item),1);
  28. }
  29. }
  30. }
  31. // 表名
  32. SQLTableSource table = mysqlSelectQuery.getFrom();
  33. DBCollection coll =this._db.getCollection(table.toString());
  34.       mongo.setTable(table.toString());
  35. // WHERE
  36. SQLExpr expr = mysqlSelectQuery.getWhere();
  37. DBObject query = parserWhere(expr);
  38. // GROUP BY
  39. SQLSelectGroupByClause groupby = mysqlSelectQuery.getGroupBy();
  40. BasicDBObject gbkey =newBasicDBObject();
  41. if(groupby !=null){
  42. for(SQLExpr gbexpr : groupby.getItems()){
  43. if(gbexpr instanceofSQLIdentifierExpr){
  44. String name =((SQLIdentifierExpr) gbexpr).getName();
  45.                   gbkey.put(name,Integer.valueOf(1));
  46. }
  47. }
  48.           icount =2;
  49. }
  50. // SKIP / LIMIT
  51. int limitoff =0;
  52. int limitnum =0;
  53. if(mysqlSelectQuery.getLimit()!=null){
  54.           limitoff = getSQLExprToInt(mysqlSelectQuery.getLimit().getOffset());
  55.           limitnum = getSQLExprToInt(mysqlSelectQuery.getLimit().getRowCount());
  56. }
  57. if(icount ==1){// COUNT(*)
  58.           mongo.setCount(coll.count(query));
  59. }elseif(icount ==2){// MapReduce
  60. BasicDBObject initial =newBasicDBObject();
  61.           initial.put("num",0);
  62. String reduce ="function (obj, prev) { "+"  prev.num++}";
  63.           mongo.setGrouyBy(coll.group(gbkey, query, initial, reduce));
  64. }else{
  65. if((limitoff >0)||(limitnum >0)){
  66.               c = coll.find(query, fields).skip(limitoff).limit(limitnum);
  67. }else{
  68.               c = coll.find(query, fields);
  69. }
  70. // order by
  71. SQLOrderByorderby= mysqlSelectQuery.getOrderBy();
  72. if(orderby!=null){
  73. BasicDBObject order =newBasicDBObject();
  74. for(int i =0; i <orderby.getItems().size(); i++){
  75. SQLSelectOrderByItem orderitem =orderby.getItems().get(i);
  76.                   order.put(orderitem.getExpr().toString(), getSQLExprToAsc(orderitem.getType()));
  77. }
  78.               c.sort(order);
  79. // System.out.println(order);
  80. }
  81. }
  82.       mongo.setCursor(c);
  83. }
  84. return mongo;
  85. }
2、查询条件
  1. // MongoSQLParser.java
  2. privatevoid parserWhere(SQLExpr aexpr,BasicDBObject o){
  3. if(aexpr instanceofSQLBinaryOpExpr){
  4. SQLBinaryOpExpr expr =(SQLBinaryOpExpr) aexpr;
  5. SQLExpr exprL = expr.getLeft();
  6. if(!(exprL instanceofSQLBinaryOpExpr)){
  7. if(expr.getOperator().getName().equals("=")){
  8.               o.put(exprL.toString(), getExpValue(expr.getRight()));
  9. }else{
  10. String op ="";
  11. if(expr.getOperator().getName().equals("<")){
  12.                   op ="$lt";
  13. }elseif(expr.getOperator().getName().equals("<=")){
  14.                   op ="$lte";
  15. }elseif(expr.getOperator().getName().equals(">")){
  16.                   op ="$gt";
  17. }elseif(expr.getOperator().getName().equals(">=")){
  18.                   op ="$gte";
  19. }elseif(expr.getOperator().getName().equals("!=")){
  20.                   op ="$ne";
  21. }elseif(expr.getOperator().getName().equals("<>")){
  22.                   op ="$ne";
  23. }
  24.               parserDBObject(o, exprL.toString(), op, getExpValue(expr.getRight()));
  25. }
  26. }else{
  27. if(expr.getOperator().getName().equals("AND")){
  28.               parserWhere(exprL, o);
  29.               parserWhere(expr.getRight(), o);
  30. }elseif(expr.getOperator().getName().equals("OR")){
  31.               orWhere(exprL, expr.getRight(), o);
  32. }else{
  33. thrownewRuntimeException("Can't identify the operation of  of where");
  34. }
  35. }
  36. }
  37. }
  38. privatevoid orWhere(SQLExpr exprL,SQLExpr exprR,BasicDBObject ob){
  39. BasicDBObject xo =newBasicDBObject();
  40. BasicDBObject yo =newBasicDBObject();
  41.   parserWhere(exprL, xo);
  42.   parserWhere(exprR, yo);
  43.   ob.put("$or",newObject[]{xo, yo});
  44. }
3、解析 MongoDB 数据
  1. // MongoResultSet.java
  2. publicMongoResultSet(MongoData mongo,String schema)throwsSQLException{
  3. this._cursor = mongo.getCursor();
  4. this._schema = schema;
  5. this._table = mongo.getTable();
  6. this.isSum = mongo.getCount()>0;
  7. this._sum = mongo.getCount();
  8. this.isGroupBy = mongo.getType();
  9. if(this.isGroupBy){
  10.       dblist = mongo.getGrouyBys();
  11. this.isSum =true;
  12. }
  13. if(this._cursor !=null){
  14. select= _cursor.getKeysWanted().keySet().toArray(newString[0]);
  15. // 解析 fields
  16. if(this._cursor.hasNext()){
  17.           _cur = _cursor.next();
  18. if(_cur !=null){
  19. if(select.length ==0){
  20. SetFields(_cur.keySet());
  21. }
  22.               _row =1;
  23. }
  24. }
  25. // 设置 fields 类型
  26. if(select.length ==0){
  27. select=newString[]{"_id"};
  28. SetFieldType(true);
  29. }else{
  30. SetFieldType(false);
  31. }
  32. }else{
  33. SetFields(mongo.getFields().keySet());//new String[]{"COUNT(*)"};
  34. SetFieldType(mongo.getFields());
  35. }
  36. }
  • 当使用 SELECT* 查询字段时,fields 使用第一条数据返回的 fields。即使,后面的数据有其他 fields,也不返回。
4、返回数据给 MySQL Client
  1. // JDBCConnection.java
  2. privatevoid ouputResultSet(ServerConnection sc,String sql)
  3. throwsSQLException{
  4. ResultSet rs =null;
  5. Statement stmt =null;
  6. try{
  7.       stmt = con.createStatement();
  8.       rs = stmt.executeQuery(sql);
  9. // header
  10. List<FieldPacket> fieldPks =newLinkedList<>();
  11. ResultSetUtil.resultSetToFieldPacket(sc.getCharset(), fieldPks, rs,this.isSpark);
  12. int colunmCount = fieldPks.size();
  13. ByteBuffer byteBuf = sc.allocate();
  14. ResultSetHeaderPacket headerPkg =newResultSetHeaderPacket();
  15.       headerPkg.fieldCount = fieldPks.size();
  16.       headerPkg.packetId =++packetId;
  17.       byteBuf = headerPkg.write(byteBuf, sc,true);
  18.       byteBuf.flip();
  19. byte[] header =newbyte[byteBuf.limit()];
  20.       byteBuf.get(header);
  21.       byteBuf.clear();
  22. List<byte[]> fields =newArrayList<byte[]>(fieldPks.size());
  23. for(FieldPacket curField : fieldPks){
  24.           curField.packetId =++packetId;
  25.           byteBuf = curField.write(byteBuf, sc,false);
  26.           byteBuf.flip();
  27. byte[] field =newbyte[byteBuf.limit()];
  28.           byteBuf.get(field);
  29.           byteBuf.clear();
  30.           fields.add(field);
  31. }
  32. // header eof
  33. EOFPacket eofPckg =newEOFPacket();
  34.       eofPckg.packetId =++packetId;
  35.       byteBuf = eofPckg.write(byteBuf, sc,false);
  36.       byteBuf.flip();
  37. byte[] eof =newbyte[byteBuf.limit()];
  38.       byteBuf.get(eof);
  39.       byteBuf.clear();
  40. this.respHandler.fieldEofResponse(header, fields, eof,this);
  41. // row
  42. while(rs.next()){
  43. RowDataPacket curRow =newRowDataPacket(colunmCount);
  44. for(int i =0; i < colunmCount; i++){
  45. int j = i +1;
  46. if(MysqlDefs.isBianry((byte) fieldPks.get(i).type)){
  47.                   curRow.add(rs.getBytes(j));
  48. }elseif(fieldPks.get(i).type ==MysqlDefs.FIELD_TYPE_DECIMAL ||
  49.                       fieldPks.get(i).type ==(MysqlDefs.FIELD_TYPE_NEW_DECIMAL -256)){// field type is unsigned byte
  50. // ensure that do not use scientific notation format
  51. BigDecimal val = rs.getBigDecimal(j);
  52.                   curRow.add(StringUtil.encode(val !=null? val.toPlainString():null, sc.getCharset()));
  53. }else{
  54.                   curRow.add(StringUtil.encode(rs.getString(j), sc.getCharset()));
  55. }
  56. }
  57.           curRow.packetId =++packetId;
  58.           byteBuf = curRow.write(byteBuf, sc,false);
  59.           byteBuf.flip();
  60. byte[] row =newbyte[byteBuf.limit()];
  61.           byteBuf.get(row);
  62.           byteBuf.clear();
  63. this.respHandler.rowResponse(row,this);
  64. }
  65.       fieldPks.clear();
  66. // row eof
  67.       eofPckg =newEOFPacket();
  68.       eofPckg.packetId =++packetId;
  69.       byteBuf = eofPckg.write(byteBuf, sc,false);
  70.       byteBuf.flip();
  71.       eof =newbyte[byteBuf.limit()];
  72.       byteBuf.get(eof);
  73.       sc.recycle(byteBuf);
  74. this.respHandler.rowEofResponse(eof,this);
  75. }finally{
  76. if(rs !=null){
  77. try{
  78.               rs.close();
  79. }catch(SQLException e){
  80. }
  81. }
  82. if(stmt !=null){
  83. try{
  84.               stmt.close();
  85. }catch(SQLException e){
  86. }
  87. }
  88. }
  89. }
  90. // MongoResultSet.java
  91. @Override
  92. publicString getString(String columnLabel)throwsSQLException{
  93. Object x = getObject(columnLabel);
  94. if(x ==null){
  95. returnnull;
  96. }
  97. return x.toString();
  98. }
  • 当返回字段值是 Object 时,返回该对象.toString()。例如:
  1. mysql>select*from user order by _id asc;
  2. +--------------------------+------+-------------------------------+
  3. | _id                      | name | profile                       |
  4. +--------------------------+------+-------------------------------+
  5. |1|123|{"age":1,"height":100}|

4. 插入操作

  1. // MongoSQLParser.java
  2. publicint executeUpdate()throwsMongoSQLException{
  3. if(statement instanceofSQLInsertStatement){
  4. returnInsertData((SQLInsertStatement) statement);
  5. }
  6. if(statement instanceofSQLUpdateStatement){
  7. returnUpData((SQLUpdateStatement) statement);
  8. }
  9. if(statement instanceofSQLDropTableStatement){
  10. return dropTable((SQLDropTableStatement) statement);
  11. }
  12. if(statement instanceofSQLDeleteStatement){
  13. returnDeleteDate((SQLDeleteStatement) statement);
  14. }
  15. if(statement instanceofSQLCreateTableStatement){
  16. return1;
  17. }
  18. return1;
  19. }
  20. privateintInsertData(SQLInsertStatement state){
  21. if(state.getValues().getValues().size()==0){
  22. thrownewRuntimeException("number of  columns error");
  23. }
  24. if(state.getValues().getValues().size()!= state.getColumns().size()){
  25. thrownewRuntimeException("number of values and columns have to match");
  26. }
  27. SQLTableSource table = state.getTableSource();
  28. BasicDBObject o =newBasicDBObject();
  29. int i =0;
  30. for(SQLExpr col : state.getColumns()){
  31.       o.put(getFieldName2(col), getExpValue(state.getValues().getValues().get(i)));
  32.       i++;
  33. }
  34. DBCollection coll =this._db.getCollection(table.toString());
  35.   coll.insert(o);
  36. return1;
  37. }

5. 彩蛋

老铁,看到这里,来一波微信公众号关注吧?!
1、支持多 MongoDB ,并使用 MyCAT 进行分片。
MyCAT 配置:multi_mongodb
2、支持 MongoDB + MySQL 作为同一个 MyCAT Table 的数据节点。查询时,可以合并数据结果。
查询时,返回 MySQL 数据记录字段要比 MongoDB 数据记录字段全,否则,合并结果时会报错。
MyCAT 配置:singlemongodbmysql
3、MongoDB 作为数据节点时,可以使用 MyCAT 提供的数据库主键字段功能。
MyCAT 配置:single_mongodb
继续阅读
阅读原文