数据库中间件 MyCAT 源码分析 —— SQL ON MongoDB
🙂🙂🙂关注微信公众号有福利:
RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址 您对于源码的疑问每条留言都将得到认真回复。甚至不知道如何读源码也可以请教噢。 新的源码解析文章实时收到通知。每周更新一篇左右。
- 1. 概述
- 2. 主流程
- 3. 查询操作
- 4. 插入操作
- 5. 彩蛋
1. 概述
可能你在看到这个标题会小小的吃惊,MyCAT 能使用 MongoDB 做数据节点。是的,没错,确实可以。
吼吼吼,让我们开启这段神奇的“旅途”。
本文主要分成四部分:
- 总体流程,让你有个整体的认识
- 查询操作
- 插入操作
- 彩蛋,😈彩蛋,🙂彩蛋
建议你看过这两篇文章(非必须):
- 《MyCAT 源码分析 —— 【单库单表】插入》
- 《MyCAT 源码分析 —— 【单库单表】查询》
2. 主流程
MyCATServer
接收MySQLClient
基于 MySQL协议 的请求,翻译 SQL 成 MongoDB操作 发送给MongoDBServer
。MyCATServer
接收MongoDBServer
返回的 MongoDB数据,翻译成MySQL数据结果
返回给MySQLClient
。
这样一看,MyCAT 连接 MongoDB 是不是少神奇一点列。
Java数据库连接,(Java Database Connectivity,简称JDBC)是Java语言中用来规范客户端程序如何来访问数据库的应用程序接口,提供了诸如查询和更新数据库中数据的方法。JDBC也是Sun Microsystems的商标。JDBC是面向关系型数据库的。
MyCAT 使用 JDBC 规范,抽象了对 MongoDB 的访问。通过这样的方式,MyCAT 也抽象了 SequoiaDB 的访问。可能这样说法有些抽象,看个类图压压惊。
是不是熟悉的味道。不得不说 JDBC 规范的精妙。
3. 查询操作
SELECT id, name FROM user WHERE name >'' ORDER BY _id DESC;
看顺序图已经很方便的理解整体逻辑,我就不多废话啦。我们来看几个核心的代码逻辑。
1、查询 MongoDB
// MongoSQLParser.java
publicMongoData query()throwsMongoSQLException{
if(!(statement instanceofSQLSelectStatement)){
//return null;
thrownewIllegalArgumentException("not a query sql statement");
}
MongoData mongo =newMongoData();
DBCursor c =null;
SQLSelectStatement selectStmt =(SQLSelectStatement) statement;
SQLSelectQuery sqlSelectQuery = selectStmt.getSelect().getQuery();
int icount =0;
if(sqlSelectQuery instanceofMySqlSelectQueryBlock){
MySqlSelectQueryBlock mysqlSelectQuery =(MySqlSelectQueryBlock) selectStmt.getSelect().getQuery();
BasicDBObject fields =newBasicDBObject();
// 显示(返回)的字段
for(SQLSelectItem item : mysqlSelectQuery.getSelectList()){
//System.out.println(item.toString());
if(!(item.getExpr()instanceofSQLAllColumnExpr)){
if(item.getExpr()instanceofSQLAggregateExpr){
SQLAggregateExpr expr =(SQLAggregateExpr) item.getExpr();
if(expr.getMethodName().equals("COUNT")){// TODO 待读:count(*)
icount =1;
mongo.setField(getExprFieldName(expr),Types.BIGINT);
}
fields.put(getExprFieldName(expr),1);
}else{
fields.put(getFieldName(item),1);
}
}
}
// 表名
SQLTableSource table = mysqlSelectQuery.getFrom();
DBCollection coll =this._db.getCollection(table.toString());
mongo.setTable(table.toString());
// WHERE
SQLExpr expr = mysqlSelectQuery.getWhere();
DBObject query = parserWhere(expr);
// GROUP BY
SQLSelectGroupByClause groupby = mysqlSelectQuery.getGroupBy();
BasicDBObject gbkey =newBasicDBObject();
if(groupby !=null){
for(SQLExpr gbexpr : groupby.getItems()){
if(gbexpr instanceofSQLIdentifierExpr){
String name =((SQLIdentifierExpr) gbexpr).getName();
gbkey.put(name,Integer.valueOf(1));
}
}
icount =2;
}
// SKIP / LIMIT
int limitoff =0;
int limitnum =0;
if(mysqlSelectQuery.getLimit()!=null){
limitoff = getSQLExprToInt(mysqlSelectQuery.getLimit().getOffset());
limitnum = getSQLExprToInt(mysqlSelectQuery.getLimit().getRowCount());
}
if(icount ==1){// COUNT(*)
mongo.setCount(coll.count(query));
}elseif(icount ==2){// MapReduce
BasicDBObject initial =newBasicDBObject();
initial.put("num",0);
String reduce ="function (obj, prev) { "+" prev.num++}";
mongo.setGrouyBy(coll.group(gbkey, query, initial, reduce));
}else{
if((limitoff >0)||(limitnum >0)){
c = coll.find(query, fields).skip(limitoff).limit(limitnum);
}else{
c = coll.find(query, fields);
}
// order by
SQLOrderByorderby= mysqlSelectQuery.getOrderBy();
if(orderby!=null){
BasicDBObject order =newBasicDBObject();
for(int i =0; i <orderby.getItems().size(); i++){
SQLSelectOrderByItem orderitem =orderby.getItems().get(i);
order.put(orderitem.getExpr().toString(), getSQLExprToAsc(orderitem.getType()));
}
c.sort(order);
// System.out.println(order);
}
}
mongo.setCursor(c);
}
return mongo;
}
2、查询条件
// MongoSQLParser.java
privatevoid parserWhere(SQLExpr aexpr,BasicDBObject o){
if(aexpr instanceofSQLBinaryOpExpr){
SQLBinaryOpExpr expr =(SQLBinaryOpExpr) aexpr;
SQLExpr exprL = expr.getLeft();
if(!(exprL instanceofSQLBinaryOpExpr)){
if(expr.getOperator().getName().equals("=")){
o.put(exprL.toString(), getExpValue(expr.getRight()));
}else{
String op ="";
if(expr.getOperator().getName().equals("<")){
op ="$lt";
}elseif(expr.getOperator().getName().equals("<=")){
op ="$lte";
}elseif(expr.getOperator().getName().equals(">")){
op ="$gt";
}elseif(expr.getOperator().getName().equals(">=")){
op ="$gte";
}elseif(expr.getOperator().getName().equals("!=")){
op ="$ne";
}elseif(expr.getOperator().getName().equals("<>")){
op ="$ne";
}
parserDBObject(o, exprL.toString(), op, getExpValue(expr.getRight()));
}
}else{
if(expr.getOperator().getName().equals("AND")){
parserWhere(exprL, o);
parserWhere(expr.getRight(), o);
}elseif(expr.getOperator().getName().equals("OR")){
orWhere(exprL, expr.getRight(), o);
}else{
thrownewRuntimeException("Can't identify the operation of of where");
}
}
}
}
privatevoid orWhere(SQLExpr exprL,SQLExpr exprR,BasicDBObject ob){
BasicDBObject xo =newBasicDBObject();
BasicDBObject yo =newBasicDBObject();
parserWhere(exprL, xo);
parserWhere(exprR, yo);
ob.put("$or",newObject[]{xo, yo});
}
3、解析 MongoDB 数据
// MongoResultSet.java
publicMongoResultSet(MongoData mongo,String schema)throwsSQLException{
this._cursor = mongo.getCursor();
this._schema = schema;
this._table = mongo.getTable();
this.isSum = mongo.getCount()>0;
this._sum = mongo.getCount();
this.isGroupBy = mongo.getType();
if(this.isGroupBy){
dblist = mongo.getGrouyBys();
this.isSum =true;
}
if(this._cursor !=null){
select= _cursor.getKeysWanted().keySet().toArray(newString[0]);
// 解析 fields
if(this._cursor.hasNext()){
_cur = _cursor.next();
if(_cur !=null){
if(select.length ==0){
SetFields(_cur.keySet());
}
_row =1;
}
}
// 设置 fields 类型
if(select.length ==0){
select=newString[]{"_id"};
SetFieldType(true);
}else{
SetFieldType(false);
}
}else{
SetFields(mongo.getFields().keySet());//new String[]{"COUNT(*)"};
SetFieldType(mongo.getFields());
}
}
- 当使用
SELECT*
查询字段时,fields 使用第一条数据返回的 fields。即使,后面的数据有其他 fields,也不返回。
4、返回数据给 MySQL Client
// JDBCConnection.java
privatevoid ouputResultSet(ServerConnection sc,String sql)
throwsSQLException{
ResultSet rs =null;
Statement stmt =null;
try{
stmt = con.createStatement();
rs = stmt.executeQuery(sql);
// header
List<FieldPacket> fieldPks =newLinkedList<>();
ResultSetUtil.resultSetToFieldPacket(sc.getCharset(), fieldPks, rs,this.isSpark);
int colunmCount = fieldPks.size();
ByteBuffer byteBuf = sc.allocate();
ResultSetHeaderPacket headerPkg =newResultSetHeaderPacket();
headerPkg.fieldCount = fieldPks.size();
headerPkg.packetId =++packetId;
byteBuf = headerPkg.write(byteBuf, sc,true);
byteBuf.flip();
byte[] header =newbyte[byteBuf.limit()];
byteBuf.get(header);
byteBuf.clear();
List<byte[]> fields =newArrayList<byte[]>(fieldPks.size());
for(FieldPacket curField : fieldPks){
curField.packetId =++packetId;
byteBuf = curField.write(byteBuf, sc,false);
byteBuf.flip();
byte[] field =newbyte[byteBuf.limit()];
byteBuf.get(field);
byteBuf.clear();
fields.add(field);
}
// header eof
EOFPacket eofPckg =newEOFPacket();
eofPckg.packetId =++packetId;
byteBuf = eofPckg.write(byteBuf, sc,false);
byteBuf.flip();
byte[] eof =newbyte[byteBuf.limit()];
byteBuf.get(eof);
byteBuf.clear();
this.respHandler.fieldEofResponse(header, fields, eof,this);
// row
while(rs.next()){
RowDataPacket curRow =newRowDataPacket(colunmCount);
for(int i =0; i < colunmCount; i++){
int j = i +1;
if(MysqlDefs.isBianry((byte) fieldPks.get(i).type)){
curRow.add(rs.getBytes(j));
}elseif(fieldPks.get(i).type ==MysqlDefs.FIELD_TYPE_DECIMAL ||
fieldPks.get(i).type ==(MysqlDefs.FIELD_TYPE_NEW_DECIMAL -256)){// field type is unsigned byte
// ensure that do not use scientific notation format
BigDecimal val = rs.getBigDecimal(j);
curRow.add(StringUtil.encode(val !=null? val.toPlainString():null, sc.getCharset()));
}else{
curRow.add(StringUtil.encode(rs.getString(j), sc.getCharset()));
}
}
curRow.packetId =++packetId;
byteBuf = curRow.write(byteBuf, sc,false);
byteBuf.flip();
byte[] row =newbyte[byteBuf.limit()];
byteBuf.get(row);
byteBuf.clear();
this.respHandler.rowResponse(row,this);
}
fieldPks.clear();
// row eof
eofPckg =newEOFPacket();
eofPckg.packetId =++packetId;
byteBuf = eofPckg.write(byteBuf, sc,false);
byteBuf.flip();
eof =newbyte[byteBuf.limit()];
byteBuf.get(eof);
sc.recycle(byteBuf);
this.respHandler.rowEofResponse(eof,this);
}finally{
if(rs !=null){
try{
rs.close();
}catch(SQLException e){
}
}
if(stmt !=null){
try{
stmt.close();
}catch(SQLException e){
}
}
}
}
// MongoResultSet.java
@Override
publicString getString(String columnLabel)throwsSQLException{
Object x = getObject(columnLabel);
if(x ==null){
returnnull;
}
return x.toString();
}
- 当返回字段值是 Object 时,返回该对象.toString()。例如:
mysql>select*from user order by _id asc;
+--------------------------+------+-------------------------------+
| _id | name | profile |
+--------------------------+------+-------------------------------+
|1|123|{"age":1,"height":100}|
4. 插入操作
// MongoSQLParser.java
publicint executeUpdate()throwsMongoSQLException{
if(statement instanceofSQLInsertStatement){
returnInsertData((SQLInsertStatement) statement);
}
if(statement instanceofSQLUpdateStatement){
returnUpData((SQLUpdateStatement) statement);
}
if(statement instanceofSQLDropTableStatement){
return dropTable((SQLDropTableStatement) statement);
}
if(statement instanceofSQLDeleteStatement){
returnDeleteDate((SQLDeleteStatement) statement);
}
if(statement instanceofSQLCreateTableStatement){
return1;
}
return1;
}
privateintInsertData(SQLInsertStatement state){
if(state.getValues().getValues().size()==0){
thrownewRuntimeException("number of columns error");
}
if(state.getValues().getValues().size()!= state.getColumns().size()){
thrownewRuntimeException("number of values and columns have to match");
}
SQLTableSource table = state.getTableSource();
BasicDBObject o =newBasicDBObject();
int i =0;
for(SQLExpr col : state.getColumns()){
o.put(getFieldName2(col), getExpValue(state.getValues().getValues().get(i)));
i++;
}
DBCollection coll =this._db.getCollection(table.toString());
coll.insert(o);
return1;
}
5. 彩蛋
老铁,看到这里,来一波微信公众号关注吧?!
1、支持多 MongoDB ,并使用 MyCAT 进行分片。
MyCAT 配置:multi_mongodb
2、支持 MongoDB + MySQL 作为同一个 MyCAT Table 的数据节点。查询时,可以合并数据结果。
查询时,返回 MySQL 数据记录字段要比 MongoDB 数据记录字段全,否则,合并结果时会报错。
MyCAT 配置:singlemongodbmysql
3、MongoDB 作为数据节点时,可以使用 MyCAT 提供的数据库主键字段功能。
MyCAT 配置:single_mongodb
阅读原文 最新评论
推荐文章
作者最新文章
你可能感兴趣的文章
Copyright Disclaimer: The copyright of contents (including texts, images, videos and audios) posted above belong to the User who shared or the third-party website which the User shared from. If you found your copyright have been infringed, please send a DMCA takedown notice to [email protected]. For more detail of the source, please click on the button "Read Original Post" below. For other communications, please send to [email protected].
版权声明:以上内容为用户推荐收藏至CareerEngine平台,其内容(含文字、图片、视频、音频等)及知识版权均属用户或用户转发自的第三方网站,如涉嫌侵权,请通知[email protected]进行信息删除。如需查看信息来源,请点击“查看原文”。如需洽谈其它事宜,请联系[email protected]。
版权声明:以上内容为用户推荐收藏至CareerEngine平台,其内容(含文字、图片、视频、音频等)及知识版权均属用户或用户转发自的第三方网站,如涉嫌侵权,请通知[email protected]进行信息删除。如需查看信息来源,请点击“查看原文”。如需洽谈其它事宜,请联系[email protected]。