原文地址:http://www.yunai.me/MyCAT/xa-distributed-transaction/

MyCat-Server带注释代码
地址 :https://github.com/YunaiV/Mycat-Server

😈本系列每 1-1 周更新一篇,欢迎订阅、关注、收藏 公众号
QQ :7685413

  • 1. 概述
  • 2. XA 概念
  • 3. MyCAT 代码实现
    • 3.1 JDBC Demo 代码
    • 3.2 MyCAT 开启 XA 事务
    • 3.3 MyCAT 接收 SQL
    • 3.4 MySQL 接收 COMMIT
      • 3.4.1 单节点事务 or 多节点事务
      • 3.4.2 协调日志
      • 3.4.3 MultiNodeCoordinator
      • 3.5 MyCAT 启动回滚 XA事务
  • 4. MyCAT 实现缺陷
    • 4.1 协调日志写入性能
    • 4.2 数据节点未全部 PREPARE 就进行 COMMIT
    • 4.3 MyCAT 启动回滚 PREPARE 的 XA事务
    • 4.4 单节点事务未记录协调日志
    • 4.5 XA COMMIT 部分节点挂了重新恢复后,未进一步处理
  • 5. 彩蛋

1. 概述

数据库拆分后,业务上会碰到需要分布式事务的场景。MyCAT 基于 XA 实现分布式事务。国内目前另外一款很火的数据库中间件 Sharding-JDBC 准备基于 TCC 实现分布式事务。
本文内容分成三部分:
  1. XA 概念简述
  2. MyCAT 代码如何实现 XA
  3. MyCAT 在实现 XA 存在的一些缺陷

2. XA 概念

> X/Open 组织(即现在的 Open Group )定义了分布式事务处理模型。 X/Open DTP 模型( 1994 )包括:
  1. 应用程序( AP )
  2. 事务管理器( TM )
  3. 资源管理器( RM )
  4. 通信资源管理器( 
    CRM
     )

    一般,常见的事务管理器( TM )是交易中间件,常见的资源管理器( 
    RM
     )是数据库,常见的通信资源管理器( 
    CRM
    )是消息中间件,下图是X/Open DTP模型:
一般的编程方式是这样的:
  1. 配置 TM ,通过 TM 或者 RM 提供的方式,把 RM 注册到 TM。可以理解为给 TM 注册 RM 作为数据源。一个 TM 可以注册多个 RM
  2. AP
     从 
    TM
     获取资源管理器的代理(例如:使用JTA接口,从TM管理的上下文中,获取出这个TM所管理的RM的JDBC连接或JMS连接)

    AP
     向 
    TM
     发起一个全局事务。这时,
    TM
     会通知各个 
    RM
    XID
    (全局事务ID)会通知到各个RM。
  3. AP 通过 TM 中获取的连接,间接操作 RM 进行业务操作。这时,TM 在每次 AP 操作时把 XID(包括所属分支的信息)传递给 RMRM 正是通过这个 XID 关联来操作和事务的关系的。
  4. AP 结束全局事务时,TM 会通知 RM 全局事务结束。开始二段提交,也就是prepare - commit的过程。

XA协议指的是TM(事务管理器)和RM(资源管理器)之间的接口。目前主流的关系型数据库产品都是实现了XA接口的。JTA(Java Transaction API)是符合X/Open DTP模型的,事务管理器和资源管理器之间也使用了XA协议。 本质上也是借助两阶段提交协议来实现分布式事务的,下面分别来看看XA事务成功和失败的模型图:

😈 看到这里是不是有种黑人问号的感觉?淡定!我们接下来看 MyCAT 代码层面是如何实现 XA 的。另外,有兴趣对概念了解更多的,可以参看如下文章:
  1. 《XA事务处理》
  2. 《XA Transaction SQL Syntax》
  3. 《MySQL XA 事务支持调研》

3. MyCAT 代码实现

  • MyCAT :TM,协调者。
  • 数据节点 :RM,参与者。

3.1 JDBC Demo 代码

  1. publicclassMyCATXAClientDemo{
  2. publicstaticvoid main(String[] args)throwsClassNotFoundException,SQLException{
  3. // 1. 获得数据库连接
  4. Class.forName("com.mysql.jdbc.Driver");
  5. Connection conn =DriverManager.getConnection("jdbc:mysql://127.0.0.1:8066/dbtest","root","123456");
  6.        conn.setAutoCommit(false);
  7. // 2. 开启 MyCAT XA 事务
  8.        conn.prepareStatement("set xa=on").execute();
  9. // 3. 插入 SQL
  10. // 3.1 SQL1 A库
  11. long uid =Math.abs(newRandom().nextLong());
  12. String username = UUID.randomUUID().toString();
  13. String password = UUID.randomUUID().toString();
  14. String sql1 =String.format("insert into t_user(id, username, password) VALUES (%d, '%s', '%s')",
  15.                uid, username, password);
  16.        conn.prepareStatement(sql1).execute();
  17. // 3.2 SQL2 B库
  18. long orderId =Math.abs(newRandom().nextLong());
  19. String nickname = UUID.randomUUID().toString();
  20. String sql2 =String.format("insert into t_order(id, uid, nickname) VALUES(%d, %s, '%s')", orderId, uid, nickname);
  21.        conn.prepareStatement(sql2).execute();
  22. // 4. 提交 XA 事务
  23.        conn.commit();
  24. }
  25. }
  • setxa=on MyCAT 开启 XA 事务。
  • conn.commit 提交 XA 事务。

3.2 MyCAT 开启 XA 事务

当 MyCAT 接收到 setxa=on 命令时,开启 XA 事务,并生成 XA 事务编号。XA 事务编号生成算法为 UUID。核心代码如下:
  1. // SetHandler.java
  2. publicstaticvoid handle(String stmt,ServerConnection c,int offset){
  3. int rs =ServerParseSet.parse(stmt, offset);
  4. switch(rs &0xff){
  5. // ... 省略代码
  6. case XA_FLAG_ON:{
  7. if(c.isAutocommit()){
  8.                c.writeErrMessage(ErrorCode.ERR_WRONG_USED,"set xa cmd on can't used in autocommit connection ");
  9. return;
  10. }
  11.            c.getSession2().setXATXEnabled(true);
  12.            c.write(c.writeToBuffer(OkPacket.OK, c.allocate()));
  13. break;
  14. }
  15. case XA_FLAG_OFF:{
  16.            c.writeErrMessage(ErrorCode.ERR_WRONG_USED,
  17. "set xa cmd off not for external use ");
  18. return;
  19. }
  20. // ... 省略代码
  21. }
  22. }
  23. // NonBlockingSession.java
  24. publicvoid setXATXEnabled(boolean xaTXEnabled){
  25. if(xaTXEnabled){
  26. if(this.xaTXID ==null){
  27.           xaTXID = genXATXID();// 😈😈😈获得 XA 事务编号
  28. }
  29. }else{
  30. this.xaTXID =null;
  31. }
  32. }
  33. privateString genXATXID(){
  34. returnMycatServer.getInstance().getXATXIDGLOBAL();
  35. }
  36. // MycatServer.java
  37. publicString getXATXIDGLOBAL(){
  38. return"'"+ getUUID()+"'";
  39. }
  40. publicstaticString getUUID(){// 😈😈😈
  41. String s = UUID.randomUUID().toString();
  42. return s.substring(0,8)+ s.substring(9,13)+ s.substring(14,18)+ s.substring(19,23)+ s.substring(24);
  43. }

3.3 MyCAT 接收 SQL

此处 SQL 指的是 insertupdatedelete 操作。
当向某个数据节点第一次发起 SQL 时,会在 SQL 前面附加 XA START'xaTranId',并设置该数据节点连接事务状态为 TxState.TX_STARTED_STATE分布式事务状态,下文会专门整理)。核心代码如下:
  1. // MySQLConnection.java
  2. privatevoid synAndDoExecute(String xaTxID,RouteResultsetNode rrn,
  3. int clientCharSetIndex,int clientTxIsoLation,
  4. boolean clientAutoCommit){
  5. String xaCmd =null;
  6. boolean conAutoComit =this.autocommit;
  7. String conSchema =this.schema;
  8. // never executed modify sql,so auto commit
  9. boolean expectAutocommit =!modifiedSQLExecuted || isFromSlaveDB()|| clientAutoCommit;
  10. if(expectAutocommit ==false&& xaTxID !=null&& xaStatus ==TxState.TX_INITIALIZE_STATE){// 😈😈😈
  11.       xaCmd ="XA START "+ xaTxID +';';
  12. this.xaStatus =TxState.TX_STARTED_STATE;
  13. }
  14. // .... 省略代码
  15. StringBuilder sb =newStringBuilder();
  16. // .... 省略代码
  17. if(xaCmd !=null){
  18.       sb.append(xaCmd);
  19. }
  20. // and our query sql to multi command at last
  21.   sb.append(rrn.getStatement()+";");
  22. // syn and execute others
  23. this.sendQueryCmd(sb.toString());
  24. }
举个 变量 sb 的例子:
  1. SET names utf8;SET autocommit=0;XA START '1f2da7353e8846e5833b8d8dd041cfb1','db2';insert into t_user(id, username, password) VALUES (3400,'b7c5ec1f-11cc-4599-851c-06ad617fec42','d2694679-f6a2-4623-a339-48d4a868be90');

3.4 MySQL 接收 COMMIT

3.4.1 单节点事务 or 多节点事务

COMMIT 执行时,MyCAT 会判断 XA 事务里,涉及到的数据库节点数量。
  • 如果节点数量为 1,单节点事务,使用 CommitNodeHandler 处理。
  • 如果节点数量 > 1,多节点事务,使用 MultiNodeCoordinator 处理。
CommitNodeHandler 相比 MultiNodeCoordinator 来说,只有一个数据节点,不需要进行多节点协调,逻辑会相对简单,有兴趣的同学可以另外看。我们主要分析 MultiNodeCoordinator

3.4.2 协调日志

协调日志,记录协调过程中各数据节点 XA 事务状态,处理MyCAT异常奔溃或者数据节点部分XA COMMIT,另外部分 XA PREPARE下的状态恢复。
XA 事务共有种
  1. TXINITIALIZESTATE :事务初始化
  2. TXSTARTEDSTATE :事务开始完成
  3. TXPREPAREDSTATE :事务准备完成
  4. TXCOMMITEDSTATE :事务提交完成
  5. TXROLLBACKEDSTATE :事务回滚完成
状态变更流 :TXINITIALIZESTATE => TXSTARTEDSTATE => TXPREPAREDSTATE => TXCOMMITEDSTATE / TXROLLBACKEDSTATE 。
协调日志包含两个部分
  1. CoordinatorLogEntry :协调者日志
  2. ParticipantLogEntry :参与者日志。此处,数据节点扮演参与者的角色。下文中,可能会出现参与者与数据节点混用的情况,望见谅。
一次 XA 事务,对应一条 CoordinatorLogEntry。一条 CoordinatorLogEntry 包含 N条 ParticipantLogEntry。 核心代码如下:
  1. // CoordinatorLogEntry :协调者日志
  2. publicclassCoordinatorLogEntryimplementsSerializable{
  3. /**
  4.     * XA 事务编号
  5.     */
  6. publicfinalString id;
  7. /**
  8.     * 参与者日志数组
  9.     */
  10. publicfinalParticipantLogEntry[] participants;
  11. }
  12. // ParticipantLogEntry :参与者日志
  13. publicclassParticipantLogEntryimplementsSerializable{
  14. /**
  15.     * XA 事务编号
  16.     */
  17. publicString coordinatorId;
  18. /**
  19.     * 数据库 uri
  20.     */
  21. publicString uri;
  22. /**
  23.     * 过期描述
  24.     */
  25. publiclong expires;
  26. /**
  27.     * XA 事务状态
  28.     */
  29. publicint txState;
  30. /**
  31.     * 参与者名字
  32.     */
  33. publicString resourceName;
  34. }
MyCAT 记录协调日志以 JSON格式 到文件每行包含一条 CoordinatorLogEntry。举个例子:
  1. {"id":"'e827b3fe666c4d968961350d19adda31'","participants":[{"uri":"127.0.0.1","state":"3","expires":0,"resourceName":"db3"},{"uri":"127.0.0.1","state":"3","expires":0,"resourceName":"db1"}]}
  2. {"id":"'f00b61fa17cb4ec5b8264a6d82f847d0'","participants":[{"uri":"127.0.0.1","state":"3","expires":0,"resourceName":"db2"},{"uri":"127.0.0.1","state":"3","expires":0,"resourceName":"db1"}]}
实现类为:
  1. // XA 协调者日志 存储接口:https://github.com/YunaiV/Mycat-Server/blob/1.6/src/main/java/io/mycat/backend/mysql/xa/recovery/Repository.java
  2. publicinterfaceRepository{}
  3. // XA 协调者日志 文件存储:https://github.com/YunaiV/Mycat-Server/blob/1.6/src/main/java/io/mycat/backend/mysql/xa/recovery/impl/FileSystemRepository.java
  4. publicclassFileSystemRepositoryimplementsRepository{}
  5. // XA 协调者日志 文件存储:https://github.com/YunaiV/Mycat-Server/blob/1.6/src/main/java/io/mycat/backend/mysql/xa/recovery/impl/InMemoryRepository.java
  6. publicclassInMemoryRepositoryimplementsRepository{}
目前日志文件写入的方式性能较差,这里我们不做分析,在【4. MyCAT 实现缺陷】里一起讲。

3.4.3 MultiNodeCoordinator

敲敲敲,这里是本文的重点之一噢。😈
第一阶段:发起 PREPARE。
  1. publicvoid executeBatchNodeCmd(SQLCtrlCommand cmdHandler){
  2. this.cmdHandler = cmdHandler;
  3. finalint initCount = session.getTargetCount();
  4.   runningCount.set(initCount);
  5.   nodeCount = initCount;
  6.   failed.set(false);
  7.   faileCount.set(0);
  8. //recovery nodes log
  9. ParticipantLogEntry[] participantLogEntry =newParticipantLogEntry[initCount];
  10. // 执行
  11. int started =0;
  12. for(RouteResultsetNode rrn : session.getTargetKeys()){
  13. if(rrn ==null){
  14. continue;
  15. }
  16. finalBackendConnection conn = session.getTarget(rrn);
  17. if(conn !=null){
  18.           conn.setResponseHandler(this);
  19. //process the XA_END XA_PREPARE Command
  20. MySQLConnection mysqlCon =(MySQLConnection) conn;
  21. String xaTxId =null;
  22. if(session.getXaTXID()!=null){
  23.               xaTxId = session.getXaTXID()+",'"+ mysqlCon.getSchema()+"'";
  24. }
  25. if(mysqlCon.getXaStatus()==TxState.TX_STARTED_STATE){// XA 事务
  26. //recovery Log
  27.               participantLogEntry[started]=newParticipantLogEntry(xaTxId, conn.getHost(),0, conn.getSchema(),((MySQLConnection) conn).getXaStatus());
  28. String[] cmds =newString[]{"XA END "+ xaTxId,// XA END 命令
  29. "XA PREPARE "+ xaTxId};// XA PREPARE 命令
  30.               mysqlCon.execBatchCmd(cmds);
  31. }else{// 非 XA 事务
  32. // recovery Log
  33.               participantLogEntry[started]=newParticipantLogEntry(xaTxId, conn.getHost(),0, conn.getSchema(),((MySQLConnection) conn).getXaStatus());
  34.               cmdHandler.sendCommand(session, conn);
  35. }
  36. ++started;
  37. }
  38. }
  39. // xa recovery log
  40. if(session.getXaTXID()!=null){
  41. CoordinatorLogEntry coordinatorLogEntry =newCoordinatorLogEntry(session.getXaTXID(),false, participantLogEntry);
  42.       inMemoryRepository.put(session.getXaTXID(), coordinatorLogEntry);
  43.       fileRepository.writeCheckpoint(inMemoryRepository.getAllCoordinatorLogEntries());
  44. }
  45. if(started < nodeCount){// TODO 疑问:如何触发
  46.       runningCount.set(started);
  47.       LOGGER.warn("some connection failed to execute "+(nodeCount - started));
  48. /**
  49.        * assumption: only caused by front-end connection close. <br/>
  50.        * Otherwise, packet must be returned to front-end
  51.        */
  52.       failed.set(true);
  53. }
  54. }
  • 向各数据节点发送 XAEND + XA PREPARE 指令。举个 变量 cmds 例子:
  1. XA END'4cbb18214d0b47adbdb0658598666677','db3';XA PREPARE '4cbb18214d0b47adbdb0658598666677','db3';
  • 记录协调日志。每条参与者日志状态为 TxState.TX_STARTED_STATE

第二阶段:发起 COMMIT。
  1. @Override
  2. publicvoid okResponse(byte[] ok,BackendConnection conn){
  3. // process the XA Transatcion 2pc commit
  4. if(conn instanceofMySQLConnection){
  5. MySQLConnection mysqlCon =(MySQLConnection) conn;
  6. switch(mysqlCon.getXaStatus()){
  7. caseTxState.TX_STARTED_STATE:
  8. //if there have many SQL execute wait the okResponse,will come to here one by one
  9. //should be wait all nodes ready ,then send xa commit to all nodes.
  10. if(mysqlCon.batchCmdFinished()){
  11. String xaTxId = session.getXaTXID();
  12. String cmd ="XA COMMIT "+ xaTxId +",'"+ mysqlCon.getSchema()+"'";
  13. if(LOGGER.isDebugEnabled()){
  14.                       LOGGER.debug("Start execute the cmd :"+ cmd +",current host:"+ mysqlCon.getHost()+":"+ mysqlCon.getPort());
  15. }
  16. // recovery log
  17. CoordinatorLogEntry coordinatorLogEntry = inMemoryRepository.get(xaTxId);
  18. for(int i =0; i < coordinatorLogEntry.participants.length; i++){
  19.                       LOGGER.debug("[In Memory CoordinatorLogEntry]"+ coordinatorLogEntry.participants[i]);
  20. if(coordinatorLogEntry.participants[i].resourceName.equals(conn.getSchema())){
  21.                           coordinatorLogEntry.participants[i].txState =TxState.TX_PREPARED_STATE;
  22. }
  23. }
  24.                   inMemoryRepository.put(xaTxId, coordinatorLogEntry);
  25.                   fileRepository.writeCheckpoint(inMemoryRepository.getAllCoordinatorLogEntries());
  26. // send commit
  27.                   mysqlCon.setXaStatus(TxState.TX_PREPARED_STATE);
  28.                   mysqlCon.execCmd(cmd);
  29. }
  30. return;
  31. caseTxState.TX_PREPARED_STATE:{
  32. // recovery log
  33. String xaTxId = session.getXaTXID();
  34. CoordinatorLogEntry coordinatorLogEntry = inMemoryRepository.get(xaTxId);
  35. for(int i =0; i < coordinatorLogEntry.participants.length; i++){
  36. if(coordinatorLogEntry.participants[i].resourceName.equals(conn.getSchema())){
  37.                       coordinatorLogEntry.participants[i].txState =TxState.TX_COMMITED_STATE;
  38. }
  39. }
  40.               inMemoryRepository.put(xaTxId, coordinatorLogEntry);
  41.               fileRepository.writeCheckpoint(inMemoryRepository.getAllCoordinatorLogEntries());
  42. // XA reset status now
  43.               mysqlCon.setXaStatus(TxState.TX_INITIALIZE_STATE);
  44. break;
  45. }
  46. default:
  47. }
  48. }
  49. // 释放连接
  50. if(this.cmdHandler.relaseConOnOK()){
  51.       session.releaseConnection(conn);
  52. }else{
  53.       session.releaseConnectionIfSafe(conn, LOGGER.isDebugEnabled(),false);
  54. }
  55. // 是否所有节点都完成commit,如果是,则返回Client 成功
  56. if(this.finished()){
  57.       cmdHandler.okResponse(session, ok);
  58. if(cmdHandler.isAutoClearSessionCons()){
  59.           session.clearResources(false);
  60. }
  61. /* 1.  事务提交后,xa 事务结束   */
  62. if(session.getXaTXID()!=null){
  63.           session.setXATXEnabled(false);
  64. }
  65. /* 2. preAcStates 为true,事务结束后,需要设置为true。preAcStates 为ac上一个状态    */
  66. if(session.getSource().isPreAcStates()){
  67.           session.getSource().setAutocommit(true);
  68. }
  69. }
  70. }
  • mysqlCon.batchCmdFinished() 每个数据节点,第一次返回的是 XAEND 成功,第二次返回的是 XA PREPARE。在 XA PREPARE 成功后,记录该数据节点的参与者日志状态为 TxState.TX_PREPARED_STATE。之后,向该数据节点发起 XA COMMIT 命令。
  • XA COMMIT 返回成功后,记录该数据节点的事务参与者日志状态为 TxState.TX_COMMITED_STATE
  • 当所有数据节点(参与者)都执行完成 XA COMMIT 返回,即 this.finished()==true,返回 MySQL Client XA 事务提交成功。
[x] XA PREPAREXA COMMIT,数据节点可能返回失败,目前暂时没模拟出来,对应方法为 #errorResponse(....)

3.5 MyCAT 启动回滚 XA事务

MyCAT 启动时,会回滚处于TxState.TXPREPAREDSTATEParticipantLogEntry 对应的数据节点的 XA 事务。代码如下:
  1. // MycatServer.java
  2. privatevoid performXARecoveryLog(){
  3. // fetch the recovery log
  4. CoordinatorLogEntry[] coordinatorLogEntries = getCoordinatorLogEntries();
  5. for(int i =0; i < coordinatorLogEntries.length; i++){
  6. CoordinatorLogEntry coordinatorLogEntry = coordinatorLogEntries[i];
  7. boolean needRollback =false;
  8. for(int j =0; j < coordinatorLogEntry.participants.length; j++){
  9. ParticipantLogEntry participantLogEntry = coordinatorLogEntry.participants[j];
  10. if(participantLogEntry.txState ==TxState.TX_PREPARED_STATE){
  11.               needRollback =true;
  12. break;
  13. }
  14. }
  15. if(needRollback){
  16. for(int j =0; j < coordinatorLogEntry.participants.length; j++){
  17. ParticipantLogEntry participantLogEntry = coordinatorLogEntry.participants[j];
  18. //XA rollback
  19. String xacmd ="XA ROLLBACK "+ coordinatorLogEntry.id +';';
  20. OneRawSQLQueryResultHandler resultHandler =newOneRawSQLQueryResultHandler(newString[0],newXARollbackCallback());
  21.               outloop:
  22. for(SchemaConfig schema :MycatServer.getInstance().getConfig().getSchemas().values()){
  23. for(TableConfig table : schema.getTables().values()){
  24. for(String dataNode : table.getDataNodes()){
  25. PhysicalDBNode dn =MycatServer.getInstance().getConfig().getDataNodes().get(dataNode);
  26. if(dn.getDbPool().getSource().getConfig().getIp().equals(participantLogEntry.uri)
  27. && dn.getDatabase().equals(participantLogEntry.resourceName)){
  28. //XA STATE ROLLBACK
  29.                               participantLogEntry.txState =TxState.TX_ROLLBACKED_STATE;
  30. SQLJob sqlJob =newSQLJob(xacmd, dn.getDatabase(), resultHandler, dn.getDbPool().getSource());
  31.                               sqlJob.run();
  32. break outloop;
  33. }
  34. }
  35. }
  36. }
  37. }
  38. }
  39. }
  40. // init into in memory cached
  41. for(int i =0; i < coordinatorLogEntries.length; i++){
  42. MultiNodeCoordinator.inMemoryRepository.put(coordinatorLogEntries[i].id, coordinatorLogEntries[i]);
  43. }
  44. // discard the recovery log
  45. MultiNodeCoordinator.fileRepository.writeCheckpoint(MultiNodeCoordinator.inMemoryRepository.getAllCoordinatorLogEntries());
  46. }

4. MyCAT 实现缺陷

MyCAT 1.6.5 版本实现弱XA事务,相对来说,笔者认为距离实际生产使用存在一些差距。下面罗列可能存在的缺陷,如有错误,麻烦指出。🙂希望 MyCAT 在分布式事务的实现上,能够越来越给力。

4.1 协调日志写入性能

1、 CoordinatorLogEntryParticipantLogEntry 在每次写入文件时,是将内存中所有的日志全部重新写入,导致写入性能随着 XA 事务次数的增加,性能会越来越糟糕,导致 XA 事务整体性能会非常差。另外,该方法是同步的,也加大了写入的延迟。
建议:先获得可写入文件的 OFFSET,写入协调日志到文件,内存维护好 XA事务编号 与 OFFSET 的映射关系,从而实现顺序写入 + 并行写入
2、内存里维护了所有的协调日志,占用内存会越来越大,并且无释放机制。即使重启,协调日志也会重新加载到内存。
建议:已完全回滚或者提交的协调日志不放入内存。另外有文件存储好 XA事务编号 与 OFFSET 的映射关系。
3、协调日志只写入单个文件。
建议:分拆协调日志文件。
PS:有兴趣的同学可以看下 RocketMQCommitLog 的存储,性能上很赞!

4.2 数据节点未全部 PREPARE 就进行 COMMIT

XA 事务定义,需要等待所有参与者全部XA PREPARE 成功完成后发起 XA COMMIT。目前 MyCAT 是某个数据节点 XA PREPARE 完成后立即进行 XA COMMIT。比如说:第一个数据节点提交了 XAEND;XA PREPARE 时,第二个数据节在进行 XAEND;XA PREAPRE; 前挂了,第一个节点依然会 XA COMMIT 成功。
建议:按照严格的 XA 事务定义。

4.3 MyCAT 启动回滚 PREPARE 的 XA事务

1、MyCAT 启动时,回滚所有的 PREPARE 的 XA 事务,可能某个 XA 事务,部分 COMMIT,部分 PREPARE。此时直接回滚,会导致数据不一致。
建议:当判断到某个 XA 事务存在 PREPARE 的参与者,同时判断该 XA 事务里其他参与者的事务状态以及数据节点里 XA 事务状态,比如参与者为 MySQL时,可以使用 XA RECOVER 查询处于 PREPARE 所有的 XA 事务。
2、回滚 PREPARE 是异步进行的,在未进行完成时已经设置文件里回滚成功。如果异步过程中失败,会导致 XA 事务状态不一致。
建议:回调成功后,更新该 XA 事务状态。

4.4 单节点事务未记录协调日志

该情况较为极端。发起 XA PREPARE完后,MyCAT 挂了。重启后,该 XA 事务在 MyCAT 里就“消失“了,参与者的该 XA 事务一直处于 PREPARE 状态。从理论上来说,需要回滚该 XA 事务。
建议:记录协调日志。

4.5 XA COMMIT 部分节点挂了重新恢复后,未进一步处理

当一部分节点 XA COMMIT 完成,另外一部分此时挂了。在管理员重启挂掉的节点,其对应的 XA 事务未进一步处理,导致数据不一致。
建议:😈木有建议。也很好奇,如果是这样的情况,如何处理较为合适。如有大大知道,烦请告知下。

5. 彩蛋

例行“彩蛋”?
  • 《Mycat源码篇 : MyCat事务管理机制分析》 来自 MyCAT Committer 的文章
  • 《MySQL · 捉虫动态 · 连接断开导致XA事务丢失》
  • 《分布式系统事务一致性解决方案》
  • 《MySQL数据库分布式事务XA优缺点与改进方案》
  • 《深入理解分布式系统的2PC和3PC》
  • 【分布式事务.xmind】 笔者拙作
  • 《RocketMQ 源码分析 —— 事务消息》 笔者拙作
继续阅读
阅读原文