摘要: 原创出处 http://www.iocoder.cn/Sharding-JDBC/transaction-bed/ 「芋道源码」欢迎转载,保留摘要,谢谢!
微信排版崩崩的,建议使用 PC 点击【阅读原文】。
本文主要基于 Sharding-JDBC 1.5.0 正式版
  • 1. 概述
  • 2. 最大努力送达型
  • 3. 柔性事务管理器
    • 3.3.1 创建柔性事务
    • 3.1 概念
    • 3.2 柔性事务配置
    • 3.3 柔性事务
  • 4. 事务日志存储器
    • 4.1 #add()
    • 4.2 #remove()
    • 4.3 #findEligibleTransactionLogs()
    • 4.4 #increaseAsyncDeliveryTryTimes()
    • 4.5 #processData()
  • 5. 最大努力送达型事务监听器
  • 6. 最大努力送达型异步作业
    • 6.1 BestEffortsDeliveryJob
    • 6.2 AsyncSoftTransactionJobConfiguration
    • 6.3 Elastic-Job 是否必须?
  • 7. 适用场景
  • 8. 开发指南 & 开发示例
  • 666. 彩蛋
🙂🙂🙂关注微信公众号:【芋道源码】有福利:
  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

数据库表分库后,业务场景下的单库本地事务可能变成跨库分布式事务。虽然我们可以通过合适的分库规则让操作的数据在同库下,继续保证单库本地事务,这也是非常推崇的,但不是所有场景下都能适用。如果这些场景对事务的一致性有要求,我们就不得不解决分布式事务的“麻烦”。
分布式事务是个很大的话题,我们来看看 Sharding-JDBC 对她的权衡:
Sharding-JDBC由于性能方面的考量,决定不支持强一致性分布式事务。我们已明确规划线路图,未来会支持最终一致性的柔性事务。
Sharding-JDBC 提供了两种 柔性事务
  • 最大努力送达型 BED :已经实现
  • 事务补偿型 TCC :计划中
本文分享 最大努力送达型 的实现。建议前置阅读:《Sharding-JDBC 源码分析 —— SQL 执行》。
Sharding-JDBC 正在收集使用公司名单:传送门。

🙂 你的登记,会让更多人参与和使用 Sharding-JDBC。传送门

Sharding-JDBC 也会因此,能够覆盖更多的业务场景。传送门

登记吧,骚年!传送门

2. 最大努力送达型

概念
在分布式数据库的场景下,相信对于该数据库的操作最终一定可以成功,所以通过最大努力反复尝试送达操作。
从概念看,可能不是很直白的理解是什么意思,本文会最大努力让你干净理解。
架构图
执行过程有 四种 情况:
  1. 【红线】执行成功
  2. 【棕线】执行失败,同步重试成功
  3. 【粉线】执行失败,同步重试失败,异步重试成功
  4. 【绿线】执行失败,同步重试失败,异步重试失败,事务日志保留
整体成漏斗倒三角,上一个阶段失败,交给下一个阶段重试:
整个过程通过如下 组件 完成:
  • 柔性事务管理器
  • 最大努力送达型柔性事务
  • 最大努力送达型事务监听器
  • 事务日志存储器
  • 最大努力送达型异步作业
下面,我们逐节分享每个组件。

3. 柔性事务管理器

3.1 概念

柔性事务管理器,SoftTransactionManager 实现,负责对柔性事务配置( SoftTransactionConfiguration ) 、柔性事务( AbstractSoftTransaction )的管理。

3.2 柔性事务配置

调用 #init() 初始化柔性管理器:
  1. // SoftTransactionManager.java
  2. /**
  3. * 柔性事务配置对象
  4. */
  5. @Getter
  6. privatefinalSoftTransactionConfiguration transactionConfig;
  7. // SoftTransactionManager.java
  8. /**
  9. * 初始化事务管理器.
  10. */
  11. publicvoid init()throwsSQLException{
  12. // 初始化 最大努力送达型事务监听器
  13. EventBusInstance.getInstance().register(newBestEffortsDeliveryListener());
  14. // 初始化 事务日志数据库存储表
  15. if(TransactionLogDataSourceType.RDB == transactionConfig.getStorageType()){
  16. Preconditions.checkNotNull(transactionConfig.getTransactionLogDataSource());
  17.       createTable();
  18. }
  19. // 初始化 内嵌的最大努力送达型异步作业
  20. if(transactionConfig.getBestEffortsDeliveryJobConfiguration().isPresent()){
  21. newNestedBestEffortsDeliveryJobFactory(transactionConfig).init();
  22. }
  23. }
  • 将最大努力送达型事务监听器( BestEffortsDeliveryListener )注册到事务总线 ( EventBus )。在『最大努力送达型事务监听器』小节会详细分享
  • 当使用数据库存储事务日志( TransactionLog ) 时,若事务日志表( transaction_log )不存在则进行创建。在『事务日志存储器』小节会详细分享
  • 当配置使用内嵌的最大努力送达型异步作业( NestedBestEffortsDeliveryJob ) 时,进行初始化。在『最大努力送达型异步作业』小节会详细分享
SoftTransactionConfiguration
SoftTransactionConfiguration,柔性事务配置对象。
  1. publicclassSoftTransactionConfiguration{
  2. /**
  3.     * 事务管理器管理的数据源.
  4.     */
  5. @Getter(AccessLevel.NONE)
  6. privatefinalDataSource targetDataSource;
  7. /**
  8.     * 同步的事务送达的最大尝试次数.
  9.     */
  10. privateint syncMaxDeliveryTryTimes =3;
  11. /**
  12.     * 事务日志存储类型.
  13.     */
  14. privateTransactionLogDataSourceType storageType = RDB;
  15. /**
  16.     * 存储事务日志的数据源.
  17.     */
  18. privateDataSource transactionLogDataSource;
  19. /**
  20.     * 内嵌的最大努力送达型异步作业配置对象.
  21.     */
  22. privateOptional<NestedBestEffortsDeliveryJobConfiguration> bestEffortsDeliveryJobConfiguration =Optional.absent();
  23. }

3.3 柔性事务

在 Sharding-JDBC 里,目前柔性事务分成两种:
  • BEDSoftTransaction :最大努力送达型柔性事务
  • TCCSoftTransaction :TCC型柔性事务
继承 AbstractSoftTransaction
  1. publicabstractclassAbstractSoftTransaction{
  2. /**
  3.     * 分片连接原自动提交状态
  4.     */
  5. privateboolean previousAutoCommit;
  6. /**
  7.     * 分片连接
  8.     */
  9. @Getter
  10. privateShardingConnection connection;
  11. /**
  12.     * 事务类型
  13.     */
  14. @Getter
  15. privateSoftTransactionType transactionType;
  16. /**
  17.     * 事务编号
  18.     */
  19. @Getter
  20. privateString transactionId;
  21. }
AbstractSoftTransaction 实现了开启柔性事务、关闭柔性事务两个方法提供给子类调用:
  • #beginInternal()
    • 对异常处理的代码:ExecutorExceptionHandler#setExceptionThrown()
    • 对于其他 SQL,不会因为 SQL 错误不执行,会继续执行
    • 对于上层业务,不会因为 SQL 错误终止逻辑,会继续执行。这里有一点要注意下,上层业务不能对该 SQL 执行结果有强依赖,因为 SQL 错误需要重试达到数据最终一致性
    • 对于最大努力型事务( TCC暂未实现 ),会对执行错误的 SQL 进行重试
    • 调用 ExecutorExceptionHandler.setExceptionThrown(false) 设置执行 SQL 错误时,也不抛出异常。
    • 调用 connection.setAutoCommit(true);,设置执行自动提交。使用最大努力型事务时,上层业务执行 SQL 会马上提交,即使调用 Connection#rollback() 也是无法回滚的,这点一定要注意。
    1. /**
    2. * 开启柔性
    3. *
    4. * @param conn 分片连接
    5. * @param type 事务类型
    6. * @throws SQLException
    7. */
    8. protectedfinalvoid beginInternal(finalConnection conn,finalSoftTransactionType type)throwsSQLException{
    9. // TODO 判断如果在传统事务中,则抛异常
    10. Preconditions.checkArgument(conn instanceofShardingConnection,"Only ShardingConnection can support eventual consistency transaction.");
    11. // 设置执行错误,不抛出异常
    12. ExecutorExceptionHandler.setExceptionThrown(false);
    13.   connection =(ShardingConnection) conn;
    14.   transactionType = type;
    15. // 设置自动提交状态
    16.   previousAutoCommit = connection.getAutoCommit();
    17.   connection.setAutoCommit(true);
    18. // 生成事务编号
    19. // TODO 替换UUID为更有效率的id生成器
    20.   transactionId = UUID.randomUUID().toString();
    21. }
  • #end()
    • 事务结束后,一定要记得调用 #end() 清理线程变量。否则,下次请求使用到该线程,会继续在这个柔性事务内。
    1. /**
    2. * 结束柔性事务.
    3. */
    4. publicfinalvoidend()throwsSQLException{
    5. if(connection !=null){
    6. ExecutorExceptionHandler.setExceptionThrown(true);
    7.      connection.setAutoCommit(previousAutoCommit);
    8. SoftTransactionManager.closeCurrentTransactionManager();
    9. }
    10. }
    11. // SoftTransactionManager.java
    12. /**
    13. * 关闭当前的柔性事务管理器.
    14. */
    15. staticvoid closeCurrentTransactionManager(){
    16. ExecutorDataMap.getDataMap().put(TRANSACTION,null);
    17. ExecutorDataMap.getDataMap().put(TRANSACTION_CONFIG,null);
    18. }
BEDSoftTransaction
BEDSoftTransaction,最大努力送达型柔性事务。
  1. publicclassBEDSoftTransactionextendsAbstractSoftTransaction{
  2. /**
  3.     * 开启柔性事务.
  4.     *
  5.     * @param connection 数据库连接对象
  6.     */
  7. publicvoidbegin(finalConnection connection)throwsSQLException{
  8.        beginInternal(connection,SoftTransactionType.BestEffortsDelivery);
  9. }
  10. }
TCCSoftTransaction
TCCSoftTransaction,TCC 型柔性事务,暂未实现。实现后,会更新到 《Sharding-JDBC 源码分析 —— 分布式事务(二)之事务补偿型》。

3.3.1 创建柔性事务

通过调用 SoftTransactionManager#getTransaction() 创建柔性事务对象:
  1. /**
  2. * {@link ExecutorDataMap#dataMap} 柔性事务对象 key
  3. */
  4. privatestaticfinalString TRANSACTION ="transaction";
  5. /**
  6. * {@link ExecutorDataMap#dataMap} 柔性事务配置 key
  7. */
  8. privatestaticfinalString TRANSACTION_CONFIG ="transactionConfig";
  9. // SoftTransactionManager.java
  10. /**
  11. * 创建柔性事务.
  12. *
  13. * @param type 柔性事务类型
  14. * @return 柔性事务
  15. */
  16. publicAbstractSoftTransaction getTransaction(finalSoftTransactionType type){
  17. AbstractSoftTransaction result;
  18. switch(type){
  19. caseBestEffortsDelivery:
  20.           result =newBEDSoftTransaction();
  21. break;
  22. caseTryConfirmCancel:
  23.           result =newTCCSoftTransaction();
  24. break;
  25. default:
  26. thrownewUnsupportedOperationException(type.toString());
  27. }
  28. // TODO 目前使用不支持嵌套事务,以后这里需要可配置
  29. if(getCurrentTransaction().isPresent()){
  30. thrownewUnsupportedOperationException("Cannot support nested transaction.");
  31. }
  32. ExecutorDataMap.getDataMap().put(TRANSACTION, result);
  33. ExecutorDataMap.getDataMap().put(TRANSACTION_CONFIG, transactionConfig);
  34. return result;
  35. }
  • 后续可以从 ExecutorDataMap 中获取当前线程的柔性事务和柔性事务配置:
    1. // SoftTransactionManager.java
    2. /**
    3. * 获取当前线程的柔性事务配置.
    4. *
    5. * @return 当前线程的柔性事务配置
    6. */
    7. publicstaticOptional<SoftTransactionConfiguration> getCurrentTransactionConfiguration(){
    8. Object transactionConfig =ExecutorDataMap.getDataMap().get(TRANSACTION_CONFIG);
    9. return(null== transactionConfig)
    10. ?Optional.<SoftTransactionConfiguration>absent()
    11. :Optional.of((SoftTransactionConfiguration) transactionConfig);
    12. }
    13. /**
    14. * 获取当前的柔性事务.
    15. *
    16. * @return 当前的柔性事务
    17. */
    18. publicstaticOptional<AbstractSoftTransaction> getCurrentTransaction(){
    19. Object transaction =ExecutorDataMap.getDataMap().get(TRANSACTION);
    20. return(null== transaction)
    21. ?Optional.<AbstractSoftTransaction>absent()
    22. :Optional.of((AbstractSoftTransaction) transaction);
    23. }

4. 事务日志存储器

柔性事务执行过程中,会通过事务日志( TransactionLog ) 记录每条 SQL 执行状态:
  • SQL 执行前,记录一条事务日志
  • SQL 执行成功,移除对应的事务日志
通过实现事务日志存储器接口( TransactionLogStorage ),提供存储功能。目前有两种实现:
  • MemoryTransactionLogStorage :基于内存的事务日志存储器。主要用于开发测试,生产环境下不要使用
  • RdbTransactionLogStorage :基于数据库的事务日志存储器。
本节只分析 RdbTransactionLogStorage。对 MemoryTransactionLogStorage 感兴趣的同学可以点击链接传送到达。
TransactionLogStorage 有五个接口方法,下文每个小标题都是一个方法。

4.1 #add()

  1. // TransactionLogStorage.java
  2. /**
  3. * 存储事务日志.
  4. *
  5. * @param transactionLog 事务日志
  6. */
  7. void add(TransactionLog transactionLog);
  8. // RdbTransactionLogStorage.java
  9. @Override
  10. publicvoid add(finalTransactionLog transactionLog){
  11. String sql ="INSERT INTO `transaction_log` (`id`, `transaction_type`, `data_source`, `sql`, `parameters`, `creation_time`) VALUES (?, ?, ?, ?, ?, ?);";
  12. try(
  13. // ... 省略你熟悉的代码
  14. }catch(finalSQLException ex){
  15. thrownewTransactionLogStorageException(ex);
  16. }
  17. }
  • 注意:如果插入事务日志失败,SQL 会继续执行,如果此时 SQL 执行失败,则该 SQL 会不见了。建议: #add() 和下文的 #remove() 异常时,都打印下异常日志都文件系统
TransactionLog (transaction_log) 数据库表结构如下:
字段名字数据库类型备注
id事件编号VARCHAR(40)EventBus 事件编号,非事务编号
transaction_type柔性事务类型VARCHAR(30)
data_source真实数据源名VARCHAR(255)
sql执行 SQLTEXT已经改写过的 SQL
parameters占位符参数TEXTJSON 字符串存储
creation_time记录时间LONG
asyncdeliverytry_times已异步重试次数INT

4.2 #remove()

  1. // TransactionLogStorage.java
  2. /**
  3. * 根据主键删除事务日志.
  4. *
  5. * @param id 事务日志主键
  6. */
  7. void remove(String id);
  8. // RdbTransactionLogStorage.java    
  9. @Override
  10. publicvoid remove(finalString id){
  11. String sql ="DELETE FROM `transaction_log` WHERE `id`=?;";
  12. try(
  13. // ... 省略你熟悉的代码
  14. }catch(finalSQLException ex){
  15. thrownewTransactionLogStorageException(ex);
  16. }
  17. }

4.3 #findEligibleTransactionLogs()

  1. // TransactionLogStorage.java
  2. /**
  3. * 读取需要处理的事务日志.
  4. *
  5. * <p>需要处理的事务日志为: </p>
  6. * <p>1. 异步处理次数小于最大处理次数.</p>
  7. * <p>2. 异步处理的事务日志早于异步处理的间隔时间.</p>
  8. *
  9. * @param size 获取日志的数量
  10. * @param maxDeliveryTryTimes 事务送达的最大尝试次数
  11. * @param maxDeliveryTryDelayMillis 执行送达事务的延迟毫秒数.
  12. */
  13. List<TransactionLog> findEligibleTransactionLogs(int size,int maxDeliveryTryTimes,long maxDeliveryTryDelayMillis);
  14. // RdbTransactionLogStorage.java
  15. @Override
  16. publicList<TransactionLog> findEligibleTransactionLogs(finalint size,finalint maxDeliveryTryTimes,finallong maxDeliveryTryDelayMillis){
  17. List<TransactionLog> result =newArrayList<>(size);
  18. String sql ="SELECT `id`, `transaction_type`, `data_source`, `sql`, `parameters`, `creation_time`, `async_delivery_try_times` "
  19. +"FROM `transaction_log` WHERE `async_delivery_try_times`<? AND `transaction_type`=? AND `creation_time`<? LIMIT ?;";
  20. try(Connection conn = dataSource.getConnection()){
  21. // ... 省略你熟悉的代码
  22. }catch(finalSQLException ex){
  23. thrownewTransactionLogStorageException(ex);
  24. }
  25. return result;
  26. }

4.4 #increaseAsyncDeliveryTryTimes()

  1. // TransactionLogStorage.java
  2. /**
  3. * 增加事务日志异步重试次数.
  4. *
  5. * @param id 事务主键
  6. */
  7. void increaseAsyncDeliveryTryTimes(String id);
  8. // RdbTransactionLogStorage.java
  9. @Override
  10. publicvoid increaseAsyncDeliveryTryTimes(finalString id){
  11. String sql ="UPDATE `transaction_log` SET `async_delivery_try_times`=`async_delivery_try_times`+1 WHERE `id`=?;";
  12. try(
  13. // ... 省略你熟悉的代码
  14. }catch(finalSQLException ex){
  15. thrownewTransactionLogStorageException(ex);
  16. }
  17. }

4.5 #processData()

  1. // TransactionLogStorage.java
  2. /**
  3. * 处理事务数据.
  4. *
  5. * @param connection 业务数据库连接
  6. * @param transactionLog 事务日志
  7. * @param maxDeliveryTryTimes 事务送达的最大尝试次数
  8. */
  9. boolean processData(Connection connection,TransactionLog transactionLog,int maxDeliveryTryTimes);
  10. // RdbTransactionLogStorage.java
  11. @Override
  12. publicboolean processData(finalConnection connection,finalTransactionLog transactionLog,finalint maxDeliveryTryTimes){
  13. // 重试执行失败 SQL
  14. try(
  15. Connection conn = connection;
  16. PreparedStatement preparedStatement = conn.prepareStatement(transactionLog.getSql())){
  17. for(int parameterIndex =0; parameterIndex < transactionLog.getParameters().size(); parameterIndex++){
  18.           preparedStatement.setObject(parameterIndex +1, transactionLog.getParameters().get(parameterIndex));
  19. }
  20.       preparedStatement.executeUpdate();
  21. }catch(finalSQLException ex){
  22. // 重试失败,更新事务日志,增加已异步重试次数
  23.       increaseAsyncDeliveryTryTimes(transactionLog.getId());
  24. thrownewTransactionCompensationException(ex);
  25. }
  26. // 移除重试执行成功 SQL 对应的事务日志
  27.   remove(transactionLog.getId());
  28. returntrue;
  29. }
  • 不同于前四个增删改查接口方法的实现, #processData() 是带有一些逻辑的。根据事务日志( TransactionLog )重试执行失败的 SQL,若成功,移除事务日志;若失败,更新事务日志,增加已异步重试次数
  • 该方法会被最大努力送达型异步作业调用到

5. 最大努力送达型事务监听器

最大努力送达型事务监听器,BestEffortsDeliveryListener,负责记录事务日志、同步重试执行失败 SQL。
  1. // BestEffortsDeliveryListener.java
  2. @Subscribe
  3. @AllowConcurrentEvents
  4. publicvoid listen(finalDMLExecutionEventevent){
  5. if(!isProcessContinuously()){
  6. return;
  7. }
  8. SoftTransactionConfiguration transactionConfig =SoftTransactionManager.getCurrentTransactionConfiguration().get();
  9. TransactionLogStorage transactionLogStorage =TransactionLogStorageFactory.createTransactionLogStorage(transactionConfig.buildTransactionLogDataSource());
  10. BEDSoftTransaction bedSoftTransaction =(BEDSoftTransaction)SoftTransactionManager.getCurrentTransaction().get();
  11. switch(event.getEventExecutionType()){
  12. case BEFORE_EXECUTE:// 执行前,插入事务日志
  13. //TODO 对于批量执行的SQL需要解析成两层列表
  14.           transactionLogStorage.add(newTransactionLog(event.getId(), bedSoftTransaction.getTransactionId(), bedSoftTransaction.getTransactionType(),
  15. event.getDataSource(),event.getSql(),event.getParameters(),System.currentTimeMillis(),0));
  16. return;
  17. case EXECUTE_SUCCESS:// 执行成功,移除事务日志
  18.           transactionLogStorage.remove(event.getId());
  19. return;
  20. case EXECUTE_FAILURE:// 执行失败,同步重试
  21. boolean deliverySuccess =false;
  22. for(int i =0; i < transactionConfig.getSyncMaxDeliveryTryTimes(); i++){// 同步【多次】重试
  23. if(deliverySuccess){
  24. return;
  25. }
  26. boolean isNewConnection =false;
  27. Connection conn =null;
  28. PreparedStatement preparedStatement =null;
  29. try{
  30. // 获得数据库连接
  31.                   conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(),SQLType.DML);
  32. if(!isValidConnection(conn)){// 因为可能执行失败是数据库连接异常,所以判断一次,如果无效,重新获取数据库连接
  33.                       bedSoftTransaction.getConnection().release(conn);
  34.                       conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(),SQLType.DML);
  35.                       isNewConnection =true;
  36. }
  37.                   preparedStatement = conn.prepareStatement(event.getSql());
  38. // 同步重试
  39. //TODO 对于批量事件需要解析成两层列表
  40. for(int parameterIndex =0; parameterIndex <event.getParameters().size(); parameterIndex++){
  41.                       preparedStatement.setObject(parameterIndex +1,event.getParameters().get(parameterIndex));
  42. }
  43.                   preparedStatement.executeUpdate();
  44.                   deliverySuccess =true;
  45. // 同步重试成功,移除事务日志
  46.                   transactionLogStorage.remove(event.getId());
  47. }catch(finalSQLException ex){
  48.                   log.error(String.format("Delivery times %s error, max try times is %s", i +1, transactionConfig.getSyncMaxDeliveryTryTimes()), ex);
  49. }finally{
  50.                   close(isNewConnection, conn, preparedStatement);
  51. }
  52. }
  53. return;
  54. default:
  55. thrownewUnsupportedOperationException(event.getEventExecutionType().toString());
  56. }
  57. }
  • BestEffortsDeliveryListener 通过 EventBus 实现监听 SQL 的执行。Sharding-JDBC 如何实现 EventBus 的,请看《Sharding-JDBC 源码分析 —— SQL 执行》
  • 调用 #isProcessContinuously() 方法判断是否处于最大努力送达型事务中,当且仅当处于该状态才进行监听事件处理
  • SQL 执行,插入事务日志
  • SQL 执行成功,移除事务日志
  • SQL 执行失败,根据柔性事务配置( SoftTransactionConfiguration )同步的事务送达的最大尝试次数( syncMaxDeliveryTryTimes )进行多次重试直到成功。总体逻辑和 RdbTransactionLogStorage#processData() 方法逻辑类似,区别在于获取分片数据库连接的特殊处理:此处调用失败,数据库连接可能是异常无效的,因此调用了 #isValidConnection() 判断连接的有效性。若无效,则重新获取分片数据库连接。另外,若是重新获取分片数据库连接,需要进行关闭释放 ( Connection#close()):
    1. // BestEffortsDeliveryListener.java
    2. /**
    3. * 通过 SELECT 1 校验数据库连接是否有效
    4. *
    5. * @param conn 数据库连接
    6. * @return 是否有效
    7. */
    8. privateboolean isValidConnection(finalConnection conn){
    9. try(PreparedStatement preparedStatement = conn.prepareStatement("SELECT 1")){
    10. try(ResultSet rs = preparedStatement.executeQuery()){
    11. return rs.next()&&1== rs.getInt("1");
    12. }
    13. }catch(finalSQLException ex){
    14. returnfalse;
    15. }
    16. }
    17. /**
    18. * 关闭释放预编译SQL对象和数据库连接
    19. *
    20. * @param isNewConnection 是否新创建的数据库连接,是的情况下才释放
    21. * @param conn 数据库连接
    22. * @param preparedStatement 预编译SQL
    23. */
    24. privatevoid close(finalboolean isNewConnection,finalConnection conn,finalPreparedStatement preparedStatement){
    25. if(null!= preparedStatement){
    26. try{
    27.           preparedStatement.close();
    28. }catch(finalSQLException ex){
    29.           log.error("PreparedStatement closed error:", ex);
    30. }
    31. }
    32. if(isNewConnection &&null!= conn){
    33. try{
    34.           conn.close();
    35. }catch(finalSQLException ex){
    36.           log.error("Connection closed error:", ex);
    37. }
    38. }
    39. }

6. 最大努力送达型异步作业

当最大努力送达型事务监听器( BestEffortsDeliveryListener )多次同步重试失败后,交给最大努力送达型异步作业进行多次异步重试,并且多次执行有固定间隔
Sharding-JDBC 提供了两个最大努力送达型异步作业实现:
  • NestedBestEffortsDeliveryJob :内嵌的最大努力送达型异步作业
  • BestEffortsDeliveryJob :最大努力送达型异步作业
两者实现代码逻辑基本一致。前者相比后者,用于开发测试,去除对 Zookeeper 依赖,无法实现高可用,因此生产环境下不适合使用

6.1 BestEffortsDeliveryJob

BestEffortsDeliveryJob 所在 Maven 项目为 sharding-jdbc-transaction-async-job,基于当当开源的 Elastic-Job 实现。如下是官方对该 Maven 项目的简要说明:
由于柔性事务采用异步尝试,需要部署独立的作业和Zookeeper。sharding-jdbc-transaction采用elastic-job实现的sharding-jdbc-transaction-async-job,通过简单配置即可启动高可用作业异步送达柔性事务,启动脚本为start.sh。
BestEffortsDeliveryJob
  1. publicclassBestEffortsDeliveryJobextendsAbstractIndividualThroughputDataFlowElasticJob<TransactionLog>{
  2. /**
  3.     * 最大努力送达型异步作业配置对象
  4.     */
  5. @Setter
  6. privateBestEffortsDeliveryConfiguration bedConfig;
  7. /**
  8.     * 事务日志存储器对象
  9.     */
  10. @Setter
  11. privateTransactionLogStorage transactionLogStorage;
  12. @Override
  13. publicList<TransactionLog> fetchData(finalJobExecutionMultipleShardingContext context){
  14. return transactionLogStorage.findEligibleTransactionLogs(context.getFetchDataCount(),
  15.            bedConfig.getJobConfig().getMaxDeliveryTryTimes(), bedConfig.getJobConfig().getMaxDeliveryTryDelayMillis());
  16. }
  17. @Override
  18. publicboolean processData(finalJobExecutionMultipleShardingContext context,finalTransactionLog data){
  19. try(
  20. Connection conn = bedConfig.getTargetDataSource(data.getDataSource()).getConnection()){
  21.            transactionLogStorage.processData(conn, data, bedConfig.getJobConfig().getMaxDeliveryTryTimes());
  22. }catch(finalSQLException|TransactionCompensationException ex){
  23.            log.error(String.format("Async delivery times %s error, max try times is %s, exception is %s", data.getAsyncDeliveryTryTimes()+1,
  24.                bedConfig.getJobConfig().getMaxDeliveryTryTimes(), ex.getMessage()));
  25. returnfalse;
  26. }
  27. returntrue;
  28. }
  29. @Override
  30. publicboolean isStreamingProcess(){
  31. returnfalse;
  32. }
  33. }
  • 调用 #fetchData() 方法获取需要处理的事务日志 (TransactionLog),内部调用了 TransactionLogStorage#findEligibleTransactionLogs() 方法
  • 调用 #processData() 方法处理事务日志,重试执行失败的 SQL,内部调用了 TransactionLogStorage#processData()
  • #fetchData() 和 #processData() 调用是 Elastic-Job 控制的。每一轮定时调度,每条事务日志只执行一次。当超过最大异步调用次数后,该条事务日志不再处理,所以生产使用时,最好增加下相应监控超过最大异步重试次数的事务日志

6.2 AsyncSoftTransactionJobConfiguration

AsyncSoftTransactionJobConfiguration,异步柔性事务作业配置对象。
  1. publicclassAsyncSoftTransactionJobConfiguration{
  2. /**
  3.     * 作业名称.
  4.     */
  5. privateString name ="bestEffortsDeliveryJob";
  6. /**
  7.     * 触发作业的cron表达式.
  8.     */
  9. privateString cron ="0/5 * * * * ?";
  10. /**
  11.     * 每次作业获取的事务日志最大数量.
  12.     */
  13. privateint transactionLogFetchDataCount =100;
  14. /**
  15.     * 事务送达的最大尝试次数.
  16.     */
  17. privateint maxDeliveryTryTimes =3;
  18. /**
  19.     * 执行事务的延迟毫秒数.
  20.     *
  21.     * <p>早于此间隔时间的入库事务才会被作业执行.</p>
  22.     */
  23. privatelong maxDeliveryTryDelayMillis =60*1000L;
  24. }

6.3 Elastic-Job 是否必须?

Sharding-JDBC 提供的最大努力送达型异步作业实现( BestEffortsDeliveryJob ),通过与 Elastic-Job 集成,可以很便捷并且有质量保证的高可用高性能使用。一部分团队,可能已经引入或自研了类似 Elastic-Job 的分布式作业中间件解决方案,每多一个中间件,就是多一个学习与运维成本。那么是否可以使用自己的分布式作业解决方案?答案是,可以的。参考 BestEffortsDeliveryJob 的实现,通过调用 TransactionLogStorage 来实现:
  1. // 伪代码(不考虑性能、异常)
  2. List<TransactionLog> transactionLogs = transactionLogStorage.findEligibleTransactionLogs(....);
  3. for(TransactionLog transactionLog : transactionLogs){
  4.       transactionLogStorage.processData(conn, log, maxDeliveryTryTimes);
  5. }
当然,个人还是很推荐 Elastic-Job。
😈 笔者要开始写《Elastic-Job 源码分析》

另外,如果有支持事务消息的分布式队列系统,可以通过 TransactionLogStorage 实现存储事务消息存储成消息。为什么要支持事务消息?如果 SQL 执行是成功的,需要回滚(删除)事务消息。

7. 适用场景

见《官方文档 - 事务支持》。

8. 开发指南 & 开发示例

见《官方文档 - 事务支持》。

666. 彩蛋

哈哈哈
算是坚持把这个系列写完了,给自己 32 個赞。
满足!
《Elastic-Job 源码分析》 走起!不 High 不结束!
继续阅读
阅读原文