本文主要基于 MyCAT 1.6.5 正式版
  • 1. 概述
  • 2. 接收请求,解析 SQL
  • 3. 获得路由结果
  • 4. 获得 MySQL 连接,执行 SQL
  • 5. 响应执行 SQL 结果
  • 6. 其他 :更新 / 删除

友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。
友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】】搞基嗨皮。
友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】】搞基嗨皮。

1. 概述

内容形态以 顺序图 + 核心代码 为主。 

如果有地方表述不错误或者不清晰,欢迎留言。 

对于内容形态,非常纠结,如果有建议,特别特别特别欢迎您提出。 

微信号:wangwenbin-server。
本文讲解 【单库单表】查询 所涉及到的代码。
😂内容和 《MyCAT 源码分析 —— 【单库单表】插入》 超级相似,一方面本身流程基本相同,另外一方面文章结构没拆分好。我们使用 🚀 标记差异的逻辑。
交互如下图:
单库单表查询简图
整个过程,MyCAT Server 流程如下:
  1. 接收 MySQL Client 请求,解析 SQL。
  2. 获得路由结果,进行路由。
  3. 获得 MySQL 连接,执行 SQL。
  4. 响应执行结果,发送结果给 MySQL Client。
我们逐个步骤分析,一起来看看源码。

2. 接收请求,解析 SQL

【单库单表】查询(01主流程)

【1 - 2】

接收一条 MySQL 命令。在【1】之前,还有请求数据读取、拆成单条 MySQL SQL。

【3】

1
:
// ⬇️⬇️⬇️【FrontendCommandHandler.java】
2
:
publicclassFrontendCommandHandlerimplementsNIOHandler
{

3
:

4
:    
@Override
5
:    
publicvoidhandle(byte[] data)
{

6
:    

7
:        
// .... 省略部分代码
8
:        
switch
(data[
4
])
//
9
:         {

10
:            
case
MySQLPacket.COM_INIT_DB:

11
:                 commands.doInitDB();

12
:                 source.initDB(data);

13
:                
break
;

14
:            
case
MySQLPacket.COM_QUERY:
// 查询命令
15
:                
// 计数查询命令
16
:                 commands.doQuery();

17
:                
// 执行查询命令
18
:                 source.query(data);

19
:                
break
;

20
:            
case
MySQLPacket.COM_PING:

21
:                 commands.doPing();

22
:                 source.ping();

23
:                
break
;

24
:            
// .... 省略部分case
25
:         }

26
:     }

27
:

28
: }

INSERT/SELECT/UPDATE/DELETE 等 SQL 归属于 MySQLPacket.COM_QUERY,详细可见:《MySQL协议分析#4.2 客户端命令请求报文(客户端 -> 服务器)》。

【4】

将 二进制数组 解析成 SQL。核心代码如下:
1
:
// ⬇️⬇️⬇️【FrontendConnection.java】
2
:
publicvoidquery(byte[] data)
{

3
:    
// 取得语句
4
:     String sql =
null
;      

5
:    
try
{

6
:         MySQLMessage mm =
new
MySQLMessage(data);

7
:         mm.position(
5
);

8
:         sql = mm.readString(charset);

9
:     }
catch
(UnsupportedEncodingException e) {

10
:         writeErrMessage(ErrorCode.ER_UNKNOWN_CHARACTER_SET,
"Unknown charset '"
+ charset +
"'"
);

11
:        
return
;

12
:     }      

13
:    
// 执行语句
14
:    
this
.query( sql );

15
: }

【5】

解析 SQL 类型。核心代码如下:
1
:
// ⬇️⬇️⬇️【ServerQueryHandler.java】
2
:
@Override
3
:
publicvoidquery(String sql)
{

4
:    
// 解析 SQL 类型
5
:    
int
rs = ServerParse.parse(sql);

6
:    
int
sqlType = rs &
0xff
;

7
:    

8
:    
switch
(sqlType) {

9
:    
//explain sql
10
:    
case
ServerParse.EXPLAIN:

11
:         ExplainHandler.handle(sql, c, rs >>>
8
);

12
:        
break
;

13
:    
// .... 省略部分case
14
:        
break
;

15
:    
case
ServerParse.SELECT:

16
:         SelectHandler.handle(sql, c, rs >>>
8
);

17
:        
break
;

18
:    
// .... 省略部分case
19
:    
default
:

20
:        
if
(readOnly){

21
:             LOGGER.warn(
new
StringBuilder().append(
"User readonly:"
).append(sql).toString());

22
:             c.writeErrMessage(ErrorCode.ER_USER_READ_ONLY,
"User readonly"
);

23
:            
break
;

24
:         }

25
:         c.execute(sql, rs &
0xff
);

26
:     }

27
: }

28
:

29
:

30
:
// ⬇️⬇️⬇️【ServerParse.java】
31
:
publicstaticintparse(String stmt)
{

32
:    
int
length = stmt.length();

33
:    
//FIX BUG FOR SQL SUCH AS /XXXX/SQL
34
:    
int
rt = -
1
;

35
:    
for
(
int
i =
0
; i < length; ++i) {

36
:        
switch
(stmt.charAt(i)) {

37
:        
// .... 省略部分case            case 'I':
38
:        
case'i'
:

39
:             rt = insertCheck(stmt, i);

40
:            
if
(rt != OTHER) {

41
:                
return
rt;

42
:             }

43
:            
continue
;

44
:            
// .... 省略部分case
45
:        
case'S'
:

46
:        
case's'
:

47
:             rt = sCheck(stmt, i);

48
:            
if
(rt != OTHER) {

49
:                
return
rt;

50
:             }

51
:            
continue
;

52
:            
// .... 省略部分case
53
:        
default
:

54
:            
continue
;

55
:         }

56
:     }

57
:    
return
OTHER;

58
: }

🚀【6】【7】

解析 Select SQL 类型,分发到对应的逻辑。核心代码如下:
1
:
// ⬇️⬇️⬇️【SelectHandler.java】
2
:
publicstaticvoidhandle(String stmt, ServerConnection c, int offs)
{

3
:    
int
offset = offs;

4
:    
switch
(ServerParseSelect.parse(stmt, offs)) {
// 解析 Select SQL 类型
5
:    
case
ServerParseSelect.VERSION_COMMENT:
// select @@VERSION_COMMENT;
6
:         SelectVersionComment.response(c);

7
:        
break
;

8
:    
case
ServerParseSelect.DATABASE:
// select DATABASE();
9
:         SelectDatabase.response(c);

10
:        
break
;

11
:    
case
ServerParseSelect.USER:
// select CURRENT_USER();
12
:         SelectUser.response(c);

13
:        
break
;

14
:    
case
ServerParseSelect.VERSION:
// select VERSION();
15
:         SelectVersion.response(c);

16
:        
break
;

17
:    
case
ServerParseSelect.SESSION_INCREMENT:
// select @@session.auto_increment_increment;
18
:         SessionIncrement.response(c);

19
:        
break
;

20
:    
case
ServerParseSelect.SESSION_ISOLATION:
// select @@session.tx_isolation;
21
:         SessionIsolation.response(c);

22
:        
break
;

23
:    
case
ServerParseSelect.LAST_INSERT_ID:
// select LAST_INSERT_ID();
24
:        
// ....省略代码
25
:        
break
;

26
:    
case
ServerParseSelect.IDENTITY:
// select @@identity
27
:        
// ....省略代码
28
:        
break
;

29
:    
case
ServerParseSelect.SELECT_VAR_ALL:
//
30
:         SelectVariables.execute(c,stmt);

31
:            
break
;

32
:    
case
ServerParseSelect.SESSION_TX_READ_ONLY:
//
33
:         SelectTxReadOnly.response(c);

34
:            
break
;

35
:    
default
:
// 其他,例如 select * from table
36
:         c.execute(stmt, ServerParse.SELECT);

37
:     }

38
: }

39
:
// ⬇️⬇️⬇️【ServerParseSelect.java】
40
:
publicstaticintparse(String stmt, int offset)
{

41
:    
int
i = offset;

42
:    
for
(; i < stmt.length(); ++i) {

43
:        
switch
(stmt.charAt(i)) {

44
:        
case' '
:

45
:            
continue
;

46
:        
case'/'
:

47
:        
case'#'
:

48
:             i = ParseUtil.comment(stmt, i);

49
:            
continue
;

50
:        
case'@'
:

51
:            
return
select2Check(stmt, i);

52
:        
case'D'
:

53
:        
case'd'
:

54
:            
return
databaseCheck(stmt, i);

55
:        
case'L'
:

56
:        
case'l'
:

57
:            
return
lastInsertCheck(stmt, i);

58
:        
case'U'
:

59
:        
case'u'
:

60
:            
return
userCheck(stmt, i);

61
:        
case'C'
:

62
:        
case'c'
:

63
:            
return
currentUserCheck(stmt, i);

64
:        
case'V'
:

65
:        
case'v'
:

66
:            
return
versionCheck(stmt, i);

67
:        
default
:

68
:            
return
OTHER;

69
:         }

70
:     }

71
:    
return
OTHER;

72
: }

【8】

执行 SQL,详细解析见下文,核心代码如下:
1
:
// ⬇️⬇️⬇️【ServerConnection.java】
2
:
publicclassServerConnectionextendsFrontendConnection
{

3
:    
publicvoidexecute(String sql, int type)
{

4
:        
// .... 省略代码
5
:         SchemaConfig schema = MycatServer.getInstance().getConfig().getSchemas().get(db);

6
:        
if
(schema ==
null
) {

7
:             writeErrMessage(ErrorCode.ERR_BAD_LOGICDB,

8
:                    
"Unknown MyCAT Database '"
+ db +
"'"
);

9
:            
return
;

10
:         }

11
:

12
:        
// .... 省略代码
13
:

14
:        
// 路由到后端数据库,执行 SQL
15
:         routeEndExecuteSQL(sql, type, schema);

16
:     }

17
:    

18
:    
publicvoidrouteEndExecuteSQL(String sql, finalint type, final SchemaConfig schema)
{

19
:        
// 路由计算
20
:         RouteResultset rrs =
null
;

21
:        
try
{

22
:             rrs = MycatServer

23
:                     .getInstance()

24
:                     .getRouterservice()

25
:                     .route(MycatServer.getInstance().getConfig().getSystem(),

26
:                             schema, type, sql,
this
.charset,
this
);

27
:

28
:         }
catch
(Exception e) {

29
:             StringBuilder s =
new
StringBuilder();

30
:             LOGGER.warn(s.append(
this
).append(sql).toString() +
" err:"
+ e.toString(),e);

31
:             String msg = e.getMessage();

32
:             writeErrMessage(ErrorCode.ER_PARSE_ERROR, msg ==
null
? e.getClass().getSimpleName() : msg);

33
:            
return
;

34
:         }

35
:

36
:        
// 执行 SQL
37
:        
if
(rrs !=
null
) {

38
:            
// session执行
39
:             session.execute(rrs, rrs.isSelectForUpdate() ? ServerParse.UPDATE : type);

40
:         }

41
:        

42
:      }

43
:

44
: }

3. 获得路由结果

【单库单表】插入(02获取路由)

【 1 - 5 】

获得路由主流程。核心代码如下:
1
:
// ⬇️⬇️⬇️【SelectHandler.java】
2
:
public
RouteResultset
route
(SystemConfig sysconf, SchemaConfig schema,

3
:        
int
sqlType, String stmt, String charset, ServerConnection sc)

 4:        
throws
SQLNonTransientException
{

5
:     RouteResultset rrs =
null
;

6
:

7
:    
// SELECT 类型的SQL, 检测缓存是否存在
8
:    
if
(sqlType == ServerParse.SELECT) {

9
:         cacheKey = schema.getName() + stmt;        

10
:         rrs = (RouteResultset) sqlRouteCache.get(cacheKey);

11
:        
if
(rrs !=
null
) {

12
:             checkMigrateRule(schema.getName(),rrs,sqlType);

13
:            
return
rrs;

14
:             }

15
:         }

16
:     }

17
:

18
:    
// .... 省略代码
19
:    
int
hintLength = RouteService.isHintSql(stmt);

20
:    
if
(hintLength != -
1
){
// TODO 待读:hint
21
:        
// .... 省略代码
22
:         }

23
:     }
else
{

24
:         stmt = stmt.trim();

25
:         rrs = RouteStrategyFactory.getRouteStrategy().route(sysconf, schema, sqlType, stmt,

26
:                 charset, sc, tableId2DataNodeCache);

27
:     }

28
:

29
:    
// 记录查询命令路由结果缓存
30
:    
if
(rrs !=
null
&& sqlType == ServerParse.SELECT && rrs.isCacheAble()) {

31
:         sqlRouteCache.putIfAbsent(cacheKey, rrs);

32
:     }

33
:    
// .... 省略代码        return rrs;
34
: }

35
:
// ⬇️⬇️⬇️【AbstractRouteStrategy.java】
36
:
@Override
37
:
public RouteResultset route
(SystemConfig sysConfig, SchemaConfig schema,
int
sqlType, String origSQL,

38
:         String charset, ServerConnection sc, LayerCachePool cachePool)
throws SQLNonTransientException
{

39
:

40
:    
// .... 省略代码
41
:

42
:    
// 处理一些路由之前的逻辑;全局序列号,父子表插入
43
:    
if
(beforeRouteProcess(schema, sqlType, origSQL, sc) ) {

44
:        
returnnull
;

45
:     }

46
:

47
:    
// .... 省略代码
48
:

49
:    
// 检查是否有分片
50
:    
if
(schema.isNoSharding() && ServerParse.SHOW != sqlType) {

51
:         rrs = RouterUtil.routeToSingleNode(rrs, schema.getDataNode(), stmt);

52
:     }
else
{

53
:         RouteResultset returnedSet = routeSystemInfo(schema, sqlType, stmt, rrs);

54
:        
if
(returnedSet ==
null
) {

55
:             rrs = routeNormalSqlWithAST(schema, stmt, rrs, charset, cachePool,sqlType,sc);

56
:         }

57
:     }

58
:

59
:    
return
rrs;

60
: }

🚀【3】第 7 至 16 行 :当 Select SQL 存在路由结果缓存时,直接返回缓存。

🚀【6】第 29 至 32 行 :记录 Select SQL 路由结果到缓存。
路由 详细解析,我们另开文章,避免内容过多,影响大家对【插入】流程和逻辑的理解。

4. 获得 MySQL 连接,执行 SQL

【单库单表】查询(03执行 SQL)

【 1 - 8 】

获得 MySQL 连接。
  • PhysicalDBNode :物理数据库节点。
  • PhysicalDatasource :物理数据库数据源。

【 9 - 13 】

发送 SQL 到 MySQL Server,执行 SQL。

🚀 5. 响应执行 SQL 结果

【单库单表】查询(04执行响应)
核心代码如下:
1
:
// ⬇️⬇️⬇️【MySQLConnectionHandler.java】
2
:
@Override
3
:
protectedvoidhandleData(byte[] data)
{

4
:    
switch
(resultStatus) {

5
:    
case
RESULT_STATUS_INIT:

6
:        
switch
(data[
4
]) {

7
:        
case
OkPacket.FIELD_COUNT:

8
:             handleOkPacket(data);

9
:            
break
;

10
:        
case
ErrorPacket.FIELD_COUNT:

11
:             handleErrorPacket(data);

12
:            
break
;

13
:        
case
RequestFilePacket.FIELD_COUNT:

14
:             handleRequestPacket(data);

15
:            
break
;

16
:        
default
:
// 初始化 header fields
17
:             resultStatus = RESULT_STATUS_HEADER;

18
:             header = data;

19
:             fields =
new
ArrayList<
byte
[]>((
int
) ByteUtil.readLength(data,

20
:                    
4
));

21
:         }

22
:        
break
;

23
:    
case
RESULT_STATUS_HEADER:

24
:        
switch
(data[
4
]) {

25
:        
case
ErrorPacket.FIELD_COUNT:

26
:             resultStatus = RESULT_STATUS_INIT;

27
:             handleErrorPacket(data);

28
:            
break
;

29
:        
case
EOFPacket.FIELD_COUNT:
// 解析 fields 结束
30
:             resultStatus = RESULT_STATUS_FIELD_EOF;

31
:             handleFieldEofPacket(data);

32
:            
break
;

33
:        
default
:
// 解析 fields
34
:             fields.add(data);

35
:         }

36
:        
break
;

37
:    
case
RESULT_STATUS_FIELD_EOF:

38
:        
switch
(data[
4
]) {

39
:        
case
ErrorPacket.FIELD_COUNT:

40
:             resultStatus = RESULT_STATUS_INIT;

41
:             handleErrorPacket(data);

42
:            
break
;

43
:        
case
EOFPacket.FIELD_COUNT:
// 解析 每行记录 结束
44
:             resultStatus = RESULT_STATUS_INIT;

45
:             handleRowEofPacket(data);

46
:            
break
;

47
:        
default
:
// 每行记录
48
:             handleRowPacket(data);

49
:         }

50
:        
break
;

51
:    
default
:

52
:        
thrownew
RuntimeException(
"unknown status!"
);

53
:     }

54
: }

6. 其他 :更新 / 删除

流程基本和 《MyCAT源码分析:【单库单表】插入》 相同。我们就不另外文章解析。
继续阅读
阅读原文