本文主要基于 MyCAT 1.6.5 正式版
  • 1. 概述
  • 2. 接收请求,解析 SQL
  • 3. 获得路由结果
  • 4. 获得 MySQL 连接,执行 SQL
  • 5. 响应执行 SQL 结果

友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。
友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】】搞基嗨皮。
友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】】搞基嗨皮。

1. 概述

内容形态以 顺序图 + 核心代码 为主。 

如果有地方表述不错误或者不清晰,欢迎留言。 

对于内容形态,非常纠结,如果有建议,特别特别特别欢迎您提出。 

微信号:wangwenbin-server。
本文讲解 【单库单表】插入 所涉及到的代码。交互如下图:
单库单表插入简图
整个过程,MyCAT Server 流程如下:
  1. 接收 MySQL Client 请求,解析 SQL。
  2. 获得路由结果,进行路由。
  3. 获得 MySQL 连接,执行 SQL。
  4. 响应执行结果,发送结果给 MySQL Client。
我们逐个步骤分析,一起来看看源码。

2. 接收请求,解析 SQL

【单库单表】插入(01主流程)

【 1 - 2 】

接收一条 MySQL 命令。在【1】之前,还有请求数据读取、拆成单条 MySQL SQL。

【 3 】

不同 MySQL 命令,分发到不同的方法执行。核心代码如下:
1
:
// ⬇️⬇️⬇️【FrontendCommandHandler.java】
2
:
publicclassFrontendCommandHandlerimplementsNIOHandler
{

3
:

4
:    
@Override
5
:    
publicvoidhandle(byte[] data)
{

6
:    

7
:        
// .... 省略部分代码
8
:        
switch
(data[
4
])
//
9
:         {

10
:            
case
MySQLPacket.COM_INIT_DB:

11
:                 commands.doInitDB();

12
:                 source.initDB(data);

13
:                
break
;

14
:            
case
MySQLPacket.COM_QUERY:
// 查询命令
15
:                
// 计数查询命令
16
:                 commands.doQuery();

17
:                
// 执行查询命令
18
:                 source.query(data);

19
:                
break
;

20
:            
case
MySQLPacket.COM_PING:

21
:                 commands.doPing();

22
:                 source.ping();

23
:                
break
;

24
:            
// .... 省略部分case
25
:         }

26
:     }

27
:

28
: }

INSERT/SELECT/UPDATE/DELETE 等 SQL 归属于 MySQLPacket.COM_QUERY,详细可见:《MySQL协议分析#4.2 客户端命令请求报文(客户端 -> 服务器)》。

【 4 】

将 二进制数组 解析成 SQL。核心代码如下:
1
:
// ⬇️⬇️⬇️【FrontendConnection.java】
2
:
publicvoidquery(byte[] data)
{

3
:    
// 取得语句
4
:     String sql =
null
;      

5
:    
try
{

6
:         MySQLMessage mm =
new
MySQLMessage(data);

7
:         mm.position(
5
);

8
:         sql = mm.readString(charset);

9
:     }
catch
(UnsupportedEncodingException e) {

10
:         writeErrMessage(ErrorCode.ER_UNKNOWN_CHARACTER_SET,
"Unknown charset '"
+ charset +
"'"
);

11
:        
return
;

12
:     }      

13
:    
// 执行语句
14
:    
this
.query( sql );

15
: }

【 5 】

解析 SQL 类型。核心代码如下:
1
:
// ⬇️⬇️⬇️【ServerQueryHandler.java】
2
:
@Override
3
:
publicvoidquery(String sql)
{

4
:    
// 解析 SQL 类型
5
:    
int
rs = ServerParse.parse(sql);

6
:    
int
sqlType = rs &
0xff
;

7
:    

8
:    
switch
(sqlType) {

9
:    
//explain sql
10
:    
case
ServerParse.EXPLAIN:

11
:         ExplainHandler.handle(sql, c, rs >>>
8
);

12
:        
break
;

13
:    
// .... 省略部分case
14
:        
break
;

15
:    
case
ServerParse.SELECT:

16
:         SelectHandler.handle(sql, c, rs >>>
8
);

17
:        
break
;

18
:    
// .... 省略部分case
19
:    
default
:

20
:        
if
(readOnly){

21
:             LOGGER.warn(
new
StringBuilder().append(
"User readonly:"
).append(sql).toString());

22
:             c.writeErrMessage(ErrorCode.ER_USER_READ_ONLY,
"User readonly"
);

23
:            
break
;

24
:         }

25
:         c.execute(sql, rs &
0xff
);

26
:     }

27
: }

28
:

29
:

30
:
// ⬇️⬇️⬇️【ServerParse.java】
31
:
publicstaticintparse(String stmt)
{

32
:    
int
length = stmt.length();

33
:    
//FIX BUG FOR SQL SUCH AS /XXXX/SQL
34
:    
int
rt = -
1
;

35
:    
for
(
int
i =
0
; i < length; ++i) {

36
:        
switch
(stmt.charAt(i)) {

37
:        
// .... 省略部分case            case 'I':
38
:        
case'i'
:

39
:             rt = insertCheck(stmt, i);

40
:            
if
(rt != OTHER) {

41
:                
return
rt;

42
:             }

43
:            
continue
;

44
:            
// .... 省略部分case
45
:        
case'S'
:

46
:        
case's'
:

47
:             rt = sCheck(stmt, i);

48
:            
if
(rt != OTHER) {

49
:                
return
rt;

50
:             }

51
:            
continue
;

52
:            
// .... 省略部分case
53
:        
default
:

54
:            
continue
;

55
:         }

56
:     }

57
:    
return
OTHER;

58
: }

【 6 】

执行 SQL,详细解析见下文,核心代码如下:
1
:
// ⬇️⬇️⬇️【ServerConnection.java】
2
:
publicclassServerConnectionextendsFrontendConnection
{

3
:    
publicvoidexecute(String sql, int type)
{

4
:        
// .... 省略代码
5
:         SchemaConfig schema = MycatServer.getInstance().getConfig().getSchemas().get(db);

6
:        
if
(schema ==
null
) {

7
:             writeErrMessage(ErrorCode.ERR_BAD_LOGICDB,

8
:                    
"Unknown MyCAT Database '"
+ db +
"'"
);

9
:            
return
;

10
:         }

11
:

12
:        
// .... 省略代码
13
:

14
:        
// 路由到后端数据库,执行 SQL
15
:         routeEndExecuteSQL(sql, type, schema);

16
:     }

17
:    

18
:    
publicvoidrouteEndExecuteSQL(String sql, finalint type, final SchemaConfig schema)
{

19
:        
// 路由计算
20
:         RouteResultset rrs =
null
;

21
:        
try
{

22
:             rrs = MycatServer

23
:                     .getInstance()

24
:                     .getRouterservice()

25
:                     .route(MycatServer.getInstance().getConfig().getSystem(),

26
:                             schema, type, sql,
this
.charset,
this
);

27
:

28
:         }
catch
(Exception e) {

29
:             StringBuilder s =
new
StringBuilder();

30
:             LOGGER.warn(s.append(
this
).append(sql).toString() +
" err:"
+ e.toString(),e);

31
:             String msg = e.getMessage();

32
:             writeErrMessage(ErrorCode.ER_PARSE_ERROR, msg ==
null
? e.getClass().getSimpleName() : msg);

33
:            
return
;

34
:         }

35
:

36
:        
// 执行 SQL
37
:        
if
(rrs !=
null
) {

38
:            
// session执行
39
:             session.execute(rrs, rrs.isSelectForUpdate() ? ServerParse.UPDATE : type);

40
:         }

41
:        

42
:      }

43
:

44
: }

3. 获得路由结果

【单库单表】插入(02获取路由)

【 1 - 2 】【 12 】

获得路由主流程。核心代码如下:
1
:
// ⬇️⬇️⬇️【RouteService.java】
2
:
public
RouteResultset
route
(SystemConfig sysconf, SchemaConfig schema,

3
:        
int
sqlType, String stmt, String charset, ServerConnection sc)

 4:        
throws
SQLNonTransientException
{

5
:     RouteResultset rrs =
null
;

6
:    
// .... 省略代码
7
:    
int
hintLength = RouteService.isHintSql(stmt);

8
:    
if
(hintLength != -
1
){
// TODO 待读:hint
9
:        
// .... 省略代码
10
:         }

11
:     }
else
{

12
:         stmt = stmt.trim();

13
:         rrs = RouteStrategyFactory.getRouteStrategy().route(sysconf, schema, sqlType, stmt,

14
:                 charset, sc, tableId2DataNodeCache);

15
:     }

16
:

17
:    
// .... 省略代码        return rrs;
18
: }

19
:
// ⬇️⬇️⬇️【AbstractRouteStrategy.java】
20
:
@Override
21
:
public RouteResultset route
(SystemConfig sysConfig, SchemaConfig schema,
int
sqlType, String origSQL,

22
:         String charset, ServerConnection sc, LayerCachePool cachePool)
throws SQLNonTransientException
{

23
:

24
:    
// .... 省略代码
25
:

26
:    
// 处理一些路由之前的逻辑;全局序列号,父子表插入
27
:    
if
(beforeRouteProcess(schema, sqlType, origSQL, sc) ) {

28
:        
returnnull
;

29
:     }

30
:

31
:    
// .... 省略代码
32
:

33
:    
// 检查是否有分片
34
:    
if
(schema.isNoSharding() && ServerParse.SHOW != sqlType) {

35
:         rrs = RouterUtil.routeToSingleNode(rrs, schema.getDataNode(), stmt);

36
:     }
else
{

37
:         RouteResultset returnedSet = routeSystemInfo(schema, sqlType, stmt, rrs);

38
:        
if
(returnedSet ==
null
) {

39
:             rrs = routeNormalSqlWithAST(schema, stmt, rrs, charset, cachePool,sqlType,sc);

40
:         }

41
:     }

42
:

43
:    
return
rrs;

44
: }

路由 详细解析,我们另开文章,避免内容过多,影响大家对【插入】流程和逻辑的理解。

【 3 - 6 】

路由前置处理。当符合如下三种情况下,进行处理:
{ 1 } 使用全局序列号
insertintotable
(
id
,
name
)
values
(
NEXTVALUEFOR
MYCATSEQ_ID,
'name'
)

{ 2 } ER 子表插入 

{ 3 } 主键使用自增 ID 插入:
insertintotable
(
name
)
values
(
'name'
)

===>

insertintotable
(
id
,
name
)
values
(
NEXTVALUEFOR
MYCATSEQ_ID,
'name'
)

情况 { 1 } { 3 } 情况类似,使用全局序列号。
核心代码如下:
1
:
// ⬇️⬇️⬇️【AbstractRouteStrategy.java】
2
:
privatebooleanbeforeRouteProcess(SchemaConfig schema, int sqlType, String origSQL, ServerConnection sc)
 3:        
throws
SQLNonTransientException
{

4
:    
return// 处理 id 使用 全局序列号
5
:             RouterUtil.processWithMycatSeq(schema, sqlType, origSQL, sc)

6
:            
// 处理 ER 子表
7
:             || (sqlType == ServerParse.INSERT && RouterUtil.processERChildTable(schema, origSQL, sc))

8
:            
// 处理 id 自增长
9
:             || (sqlType == ServerParse.INSERT && RouterUtil.processInsert(schema, sqlType, origSQL, sc));

10
: }

RouterUtil.java 处理 SQL 考虑性能,实现会比较 C-style,代码咱就不贴了,传送门:https://github.com/YunaiV/Mycat-Server/blob/1.6/src/main/java/io/mycat/route/util/RouterUtil.java。 (😈该仓库从官方 Fork,逐步完善中文注释,欢迎 Star)

【 7 - 11 】

前置路由处理全局序列号时,添加到全局序列处理器(MyCATSequnceProcessor)。该处理器会异步生成 ID,替换 SQL 内的 NEXT VALUE FOR MYCATSEQ_ 正则。例如:
insertintotable
(
id
,
name
)
values
(
NEXTVALUEFOR
MYCATSEQ_ID,
'name'
)

===>

insertintotable
(
id
,
name
)
values
(
868348974560579584
,
'name'
)

异步处理完后,调用 ServerConnection#routeEndExecuteSQL(sql, type, schema) 方法重新执行 SQL。
核心代码如下:
1
:
// ⬇️⬇️⬇️【RouterUtil.java】
2
:
publicstaticvoidprocessSQL(ServerConnection sc,SchemaConfig schema,String sql,int sqlType)
{

3
:     SessionSQLPair sessionSQLPair =
new
SessionSQLPair(sc.getSession2(), schema, sql, sqlType);

4
:     MycatServer.getInstance().getSequnceProcessor().addNewSql(sessionSQLPair);

5
: }

6
:
// ⬇️⬇️⬇️【MyCATSequnceProcessor.java】
7
:
publicclassMyCATSequnceProcessor
{

8
:    
private
LinkedBlockingQueue<SessionSQLPair> seqSQLQueue =
new
LinkedBlockingQueue<SessionSQLPair>();

9
:    
privatevolatileboolean
running=
true
;

10
:    

11
:    
publicvoidaddNewSql(SessionSQLPair pair)
{

12
:         seqSQLQueue.add(pair);

13
:     }

14
:

15
:    
privatevoidexecuteSeq(SessionSQLPair pair)
{

16
:        
try
{

17
:            

18
:            
// 使用Druid解析器实现sequence处理  @兵临城下
19
:             DruidSequenceHandler sequenceHandler =
new
DruidSequenceHandler(MycatServer

20
:                     .getInstance().getConfig().getSystem().getSequnceHandlerType());

21
:

22
:            
// 生成可执行 SQL :目前主要是生成 id
23
:             String charset = pair.session.getSource().getCharset();

24
:             String executeSql = sequenceHandler.getExecuteSql(pair.sql,charset ==
null
?
"utf-8"
:charset);

25
:

26
:            
// 执行 SQL
27
:             pair.session.getSource().routeEndExecuteSQL(executeSql, pair.type,pair.schema);

28
:         }
catch
(Exception e) {

29
:             LOGGER.error(
"MyCATSequenceProcessor.executeSeq(SesionSQLPair)"
,e);

30
:             pair.session.getSource().writeErrMessage(ErrorCode.ER_YES,
"mycat sequnce err."
+ e);

31
:            
return
;

32
:         }

33
:     }

34
:    

35
:    
classExecuteThreadextendsThread
{

36
:        

37
:        
publicExecuteThread()
{

38
:             setDaemon(
true
);
// 设置为后台线程,防止throw RuntimeExecption进程仍然存在的问题
39
:         }

40
:        

41
:        
publicvoidrun()
{

42
:            
while
(running) {

43
:                
try
{

44
:                     SessionSQLPair pair=seqSQLQueue.poll(
100
,TimeUnit.MILLISECONDS);

45
:                    
if
(pair!=
null
){

46
:                         executeSeq(pair);

47
:                     }

48
:                 }
catch
(Exception e) {

49
:                     LOGGER.warn(
"MyCATSequenceProcessor$ExecutorThread"
,e);

50
:                 }

51
:             }

52
:         }

53
:     }

54
: }

❓此处有个疑问:MyCATSequnceProcessor 是单线程,会不会插入性能有一定的影响?后续咱做下性能测试。

4. 获得 MySQL 连接,执行 SQL

【单库单表】插入(03执行 SQL)

【 1 - 8 】

获得 MySQL 连接。
  • PhysicalDBNode :物理数据库节点。
  • PhysicalDatasource :物理数据库数据源。

【 9 - 13 】

发送 SQL 到 MySQL Server,执行 SQL。

5. 响应执行 SQL 结果

【单库单表】插入(04执行响应)

【 1 - 4 】

处理 MySQL Server 响应数据包。

【 5 - 8 】

发送插入成功结果给 MySQL Client。
继续阅读
阅读原文