摘要: 原创出处 http://www.iocoder.cn/Sharding-JDBC/sql-execute/ 「芋道源码」欢迎转载,保留摘要,谢谢!
本文主要基于 Sharding-JDBC 1.5.0 正式版
  • 1. 概述
  • 2. ExecutorEngine
    • 2.1 ListeningExecutorService
    • 2.2 关闭
    • 2.3 执行 SQL 任务
  • 3. Executor
    • 3.1 StatementExecutor
    • 3.2 PreparedStatementExecutor
    • 3.3 BatchPreparedStatementExecutor
  • 4. ExecutionEvent
    • 4.1 EventBus
    • 4.2 BestEffortsDeliveryListener
  • 666. 彩蛋

🙂🙂🙂关注微信公众号:【芋道源码】有福利:
  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

越过千山万水(SQL 解析、SQL 路由、SQL 改写),我们终于来到了 SQL 执行。开森不开森?!
本文主要分享SQL 执行的过程,不包括结果聚合。《结果聚合》 东半球第二良心笔者会更新,关注微信公众号【芋道源码】完稿后第一时间通知您哟。
绿框部分 SQL 执行主流程。

Sharding-JDBC 正在收集使用公司名单:传送门。

🙂 你的登记,会让更多人参与和使用 Sharding-JDBC。传送门

Sharding-JDBC 也会因此,能够覆盖更多的业务场景。传送门

登记吧,骚年!传送门

2. ExecutorEngine

ExecutorEngine,SQL执行引擎。
分表分库,需要执行的 SQL 数量从单条变成了多条,此时有两种方式执行:
  • 串行执行 SQL
  • 并行执行 SQL
前者,编码容易,性能较差,总耗时是多条 SQL 执行时间累加。

后者,编码复杂,性能较好,总耗时约等于执行时间最长的 SQL。
👼 ExecutorEngine 当然采用的是后者,并行执行 SQL。

2.1 ListeningExecutorService

Guava( Java 工具库 ) 提供的继承自 ExecutorService 的线程服务接口,提供创建 ListenableFuture 功能。ListenableFuture 接口,继承 Future 接口,有如下好处:
我们强烈地建议你在代码中多使用ListenableFuture来代替JDK的 Future, 因为:
  • 大多数Futures 方法中需要它。
  • 转到ListenableFuture 编程比较容易。
  • Guava提供的通用公共类封装了公共的操作方方法,不需要提供Future和ListenableFuture的扩展方法。
传统JDK中的Future通过异步的方式计算返回结果:在多线程运算中可能或者可能在没有结束返回结果,Future是运行中的多线程的一个引用句柄,确保在服务执行返回一个Result。
ListenableFuture可以允许你注册回调方法(callbacks),在运算(多线程执行)完成的时候进行调用, 或者在运算(多线程执行)完成后立即执行。这样简单的改进,使得可以明显的支持更多的操作,这样的功能在JDK concurrent中的Future是不支持的。
如上内容来自《Google Guava包的ListenableFuture解析 》,文章写的很棒。下文你会看到 Sharding-JDBC 是如何通过 ListenableFuture 简化并发编程的
下面看看 ExecutorEngine 如何初始化 ListeningExecutorService
  1. // ShardingDataSource.java
  2. publicShardingDataSource(finalShardingRule shardingRule,finalProperties props){
  3. // .... 省略部分代码
  4.   shardingProperties =newShardingProperties(props);
  5. int executorSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_SIZE);
  6.   executorEngine =newExecutorEngine(executorSize);
  7. // .... 省略部分代码
  8. }
  9. // ExecutorEngine
  10. publicExecutorEngine(finalint executorSize){
  11.   executorService =MoreExecutors.listeningDecorator(newThreadPoolExecutor(
  12.           executorSize, executorSize,0,TimeUnit.MILLISECONDS,newLinkedBlockingQueue<Runnable>(),
  13. newThreadFactoryBuilder().setDaemon(true).setNameFormat("ShardingJDBC-%d").build()));
  14. MoreExecutors.addDelayedShutdownHook(executorService,60,TimeUnit.SECONDS);
  15. }
  • 一个分片数据源( ShardingDataSource ) 独占 一个 SQL执行引擎( ExecutorEngine )。
  • MoreExecutors#listeningDecorator() 创建 ListeningExecutorService,这样 #submit(), #invokeAll() 可以返回 ListenableFuture。
  • 默认情况下,线程池大小为 8。可以根据实际业务需要,设置 ShardingProperties 进行调整。
  • #setNameFormat() 并发编程时,一定要对线程名字做下定义,这样排查问题会方便很多。
  • MoreExecutors#addDelayedShutdownHook()应用关闭时,等待所有任务全部完成再关闭。默认配置等待时间为 60 秒,建议将等待时间做成可配的。

2.2 关闭

数据源关闭时,会调用 ExecutorEngine 也进行关闭。
  1. // ShardingDataSource.java
  2. @Override
  3. publicvoid close(){
  4.   executorEngine.close();
  5. }
  6. // ExecutorEngine
  7. @Override
  8. publicvoid close(){
  9.   executorService.shutdownNow();
  10. try{
  11.       executorService.awaitTermination(5,TimeUnit.SECONDS);
  12. }catch(finalInterruptedException ignored){
  13. }
  14. if(!executorService.isTerminated()){
  15. thrownewShardingJdbcException("ExecutorEngine can not been terminated");
  16. }
  17. }
  • #shutdownNow() 尝试使用 Thread.interrupt() 打断正在执行中的任务,未执行的任务不再执行。建议打印下哪些任务未执行,因为 SQL 未执行,可能数据未能持久化。
  • #awaitTermination() 因为 #shutdownNow() 打断不是立即结束,需要一个过程,因此这里等待了 5 秒。
  • 等待 5 秒后,线程池不一定已经关闭,此时抛出异常给上层。建议打印下日志,记录出现这个情况。

2.3 执行 SQL 任务

ExecutorEngine 对外暴露 #executeStatement()#executePreparedStatement()#executeBatch()
三个方法分别提供给 StatementExecutor、PreparedStatementExecutor、BatchPreparedStatementExecutor 调用。而这三个方法,内部调用的都是 #execute() 私有方法。
  1. // ExecutorEngine.java
  2. /**
  3. * 执行Statement.
  4. * @param sqlType SQL类型
  5. * @param statementUnits 语句对象执行单元集合
  6. * @param executeCallback 执行回调函数
  7. * @param <T> 返回值类型
  8. * @return 执行结果
  9. */
  10. public<T>List<T> executeStatement(finalSQLType sqlType,finalCollection<StatementUnit> statementUnits,finalExecuteCallback<T> executeCallback){
  11. return execute(sqlType, statementUnits,Collections.<List<Object>>emptyList(), executeCallback);
  12. }
  13. /**
  14. * 执行PreparedStatement.
  15. * @param sqlType SQL类型
  16. * @param preparedStatementUnits 语句对象执行单元集合
  17. * @param parameters 参数列表
  18. * @param executeCallback 执行回调函数
  19. * @param <T> 返回值类型
  20. * @return 执行结果
  21. */
  22. public<T>List<T> executePreparedStatement(
  23. finalSQLType sqlType,finalCollection<PreparedStatementUnit> preparedStatementUnits,finalList<Object> parameters,finalExecuteCallback<T> executeCallback){
  24. return execute(sqlType, preparedStatementUnits,Collections.singletonList(parameters), executeCallback);
  25. }
  26. /**
  27. * 执行Batch.
  28. * @param sqlType SQL类型
  29. * @param batchPreparedStatementUnits 语句对象执行单元集合
  30. * @param parameterSets 参数列表集
  31. * @param executeCallback 执行回调函数
  32. * @return 执行结果
  33. */
  34. publicList<int[]> executeBatch(
  35. finalSQLType sqlType,finalCollection<BatchPreparedStatementUnit> batchPreparedStatementUnits,finalList<List<Object>> parameterSets,finalExecuteCallback<int[]> executeCallback){
  36. return execute(sqlType, batchPreparedStatementUnits, parameterSets, executeCallback);
  37. }
#execute() 执行过程大体流程如下图:
  1. /**
  2. * 执行
  3. *
  4. * @param sqlType SQL 类型
  5. * @param baseStatementUnits 语句对象执行单元集合
  6. * @param parameterSets 参数列表集
  7. * @param executeCallback 执行回调函数
  8. * @param <T> 返回值类型
  9. * @return 执行结果
  10. */
  11. private<T>List<T> execute(
  12. finalSQLType sqlType,finalCollection<?extendsBaseStatementUnit> baseStatementUnits,finalList<List<Object>> parameterSets,finalExecuteCallback<T> executeCallback){
  13. if(baseStatementUnits.isEmpty()){
  14. returnCollections.emptyList();
  15. }
  16. Iterator<?extendsBaseStatementUnit> iterator = baseStatementUnits.iterator();
  17. BaseStatementUnit firstInput = iterator.next();
  18. // 第二个任务开始所有 SQL任务 提交线程池【异步】执行任务
  19. ListenableFuture<List<T>> restFutures = asyncExecute(sqlType,Lists.newArrayList(iterator), parameterSets, executeCallback);
  20.   T firstOutput;
  21. List<T> restOutputs;
  22. try{
  23. // 第一个任务【同步】执行任务
  24.       firstOutput = syncExecute(sqlType, firstInput, parameterSets, executeCallback);
  25. // 等待第二个任务开始所有 SQL任务完成
  26.       restOutputs = restFutures.get();
  27. //CHECKSTYLE:OFF
  28. }catch(finalException ex){
  29. //CHECKSTYLE:ON
  30. ExecutorExceptionHandler.handleException(ex);
  31. returnnull;
  32. }
  33. // 返回结果
  34. List<T> result =Lists.newLinkedList(restOutputs);
  35.   result.add(0, firstOutput);
  36. return result;
  37. }
  • 第一个任务【同步】调用 #executeInternal() 执行任务。
  1. private<T> T syncExecute(finalSQLType sqlType,finalBaseStatementUnit baseStatementUnit,finalList<List<Object>> parameterSets,finalExecuteCallback<T> executeCallback)throwsException{
  2. // 【同步】执行任务
  3. return executeInternal(sqlType, baseStatementUnit, parameterSets, executeCallback,ExecutorExceptionHandler.isExceptionThrown(),ExecutorDataMap.getDataMap());
  4. }
  • 第二个开始的任务提交线程池异步调用 #executeInternal() 执行任务。
  1. private<T>ListenableFuture<List<T>> asyncExecute(
  2. finalSQLType sqlType,finalCollection<BaseStatementUnit> baseStatementUnits,finalList<List<Object>> parameterSets,finalExecuteCallback<T> executeCallback){
  3. List<ListenableFuture<T>> result =newArrayList<>(baseStatementUnits.size());
  4. finalboolean isExceptionThrown =ExecutorExceptionHandler.isExceptionThrown();
  5. finalMap<String,Object> dataMap =ExecutorDataMap.getDataMap();
  6. for(finalBaseStatementUnit each : baseStatementUnits){
  7. // 提交线程池【异步】执行任务
  8.       result.add(executorService.submit(newCallable<T>(){
  9. @Override
  10. public T call()throwsException{
  11. return executeInternal(sqlType, each, parameterSets, executeCallback, isExceptionThrown, dataMap);
  12. }
  13. }));
  14. }
  15. // 返回 ListenableFuture
  16. returnFutures.allAsList(result);
  17. }
  • 我们注意下 Futures.allAsList(result); 和 restOutputs=restFutures.get();。神器 Guava 简化并发编程 的好处就提现出来了。 ListenableFuture#get() 当所有任务都成功时,返回所有任务执行结果;当任何一个任务失败时,马上抛出异常,无需等待其他任务执行完成。
😮 Guava 真她喵神器,公众号:【芋道源码】会更新 Guava 源码分享的一个系列哟!老司机还不赶紧上车?
  • 为什么会分同步执行和异步执行呢?猜测,当SQL 执行是单表时,只要进行第一个任务的同步调用,性能更加优秀。等跟张亮大神请教确认原因后,咱会进行更新。
  1. // ExecutorEngine.java
  2. private<T> T executeInternal(finalSQLType sqlType,finalBaseStatementUnit baseStatementUnit,finalList<List<Object>> parameterSets,finalExecuteCallback<T> executeCallback,
  3. finalboolean isExceptionThrown,finalMap<String,Object> dataMap)throwsException{
  4. synchronized(baseStatementUnit.getStatement().getConnection()){
  5.       T result;
  6. ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
  7. ExecutorDataMap.setDataMap(dataMap);
  8. List<AbstractExecutionEvent> events =newLinkedList<>();
  9. // 生成 Event
  10. if(parameterSets.isEmpty()){
  11.           events.add(getExecutionEvent(sqlType, baseStatementUnit,Collections.emptyList()));
  12. }else{
  13. for(List<Object> each : parameterSets){
  14.               events.add(getExecutionEvent(sqlType, baseStatementUnit, each));
  15. }
  16. }
  17. // EventBus 发布 EventExecutionType.BEFORE_EXECUTE
  18. for(AbstractExecutionEventevent: events){
  19. EventBusInstance.getInstance().post(event);
  20. }
  21. try{
  22. // 执行回调函数
  23.           result = executeCallback.execute(baseStatementUnit);
  24. }catch(finalSQLException ex){
  25. // EventBus 发布 EventExecutionType.EXECUTE_FAILURE
  26. for(AbstractExecutionEvent each : events){
  27.               each.setEventExecutionType(EventExecutionType.EXECUTE_FAILURE);
  28.               each.setException(Optional.of(ex));
  29. EventBusInstance.getInstance().post(each);
  30. ExecutorExceptionHandler.handleException(ex);
  31. }
  32. returnnull;
  33. }
  34. // EventBus 发布 EventExecutionType.EXECUTE_SUCCESS
  35. for(AbstractExecutionEvent each : events){
  36.           each.setEventExecutionType(EventExecutionType.EXECUTE_SUCCESS);
  37. EventBusInstance.getInstance().post(each);
  38. }
  39. return result;
  40. }
  41. }
  • result=executeCallback.execute(baseStatementUnit); 执行回调函数。StatementExecutor,PreparedStatementExecutor,BatchPreparedStatementExecutor 通过传递执行回调函数( ExecuteCallback )实现给 ExecutorEngine 实现并行执行。
  1. publicinterfaceExecuteCallback<T>{
  2. /**
  3.     * 执行任务.
  4.     *
  5.     * @param baseStatementUnit 语句对象执行单元
  6.     * @return 处理结果
  7.     * @throws Exception 执行期异常
  8.     */
  9.    T execute(BaseStatementUnit baseStatementUnit)throwsException;
  10. }
  • synchronized(baseStatementUnit.getStatement().getConnection()) 原以为 Connection 非线程安全,因此需要用同步,后翻查资料《数据库连接池为什么要建立多个连接》,Connection 是线程安全的。等跟张亮大神请教确认原因后,咱会进行更新。
    FROM https://github.com/dangdangdotcom/sharding-jdbc/issues/166

    druid的数据源的stat这种filter在并发使用同一个connection链接时没有考虑线程安全的问题,故造成多个线程修改filter中的状态异常。 改造这个问题时,考虑到mysql驱动在执行statement时对同一个connection是线程安全的。也就是说同一个数据库链接的会话是串行执行的。故在sjdbc的executor对于多线程执行的情况也进行了针对数据库链接级别的同步。故该方案不会降低sjdbc的性能。 同时jdk1.7版本的同步采用了锁升级技术,在碰撞较低的情况下开销也是很小的。
    • 解答:MySQL、Oracle 的 Connection 实现是线程安全的。数据库连接池实现的 Connection 不一定是线程安全,例如 Druid 的线程池 Connection 非线程安全
  • ExecutionEvent 这里先不解释,在本文第四节【EventBus】分享。
  • ExecutorExceptionHandler、ExecutorDataMap 和 柔性事务 ( AbstractSoftTransaction ),放在《柔性事务》分享。

3. Executor

Executor,执行器,目前一共有三个执行器。不同的执行器对应不同的执行单元 (BaseStatementUnit)。
执行器类执行器名执行单元
StatementExecutor静态语句对象执行单元StatementUnit
PreparedStatementExecutor预编译语句对象请求的执行器PreparedStatementUnit
BatchPreparedStatementExecutor批量预编译语句对象请求的执行器BatchPreparedStatementUnit
  • 执行器提供的方法不同,因此不存在公用接口或者抽象类。
  • 执行单元继承自 BaseStatementUnit

3.1 StatementExecutor

StatementExecutor,多线程执行静态语句对象请求的执行器,一共有三类方法:
  • #executeQuery()
  1. // StatementExecutor.java
  2. /**
  3. * 执行SQL查询.
  4. * @return 结果集列表
  5. */
  6. publicList<ResultSet> executeQuery(){
  7. Context context =MetricsContext.start("ShardingStatement-executeQuery");
  8. List<ResultSet> result;
  9. try{
  10.       result = executorEngine.executeStatement(sqlType, statementUnits,newExecuteCallback<ResultSet>(){
  11. @Override
  12. publicResultSet execute(finalBaseStatementUnit baseStatementUnit)throwsException{
  13. return baseStatementUnit.getStatement().executeQuery(baseStatementUnit.getSqlExecutionUnit().getSql());
  14. }
  15. });
  16. }finally{
  17. MetricsContext.stop(context);
  18. }
  19. return result;
  20. }
  • #executeUpdate() 因为有四个不同情况的 #executeUpdate(),所以抽象了 Updater 接口,从而达到逻辑重用。
  1. // StatementExecutor.java
  2. /**
  3. * 执行SQL更新.
  4. * @return 更新数量
  5. */
  6. publicint executeUpdate(){
  7. return executeUpdate(newUpdater(){
  8. @Override
  9. publicint executeUpdate(finalStatement statement,finalString sql)throwsSQLException{
  10. return statement.executeUpdate(sql);
  11. }
  12. });
  13. }
  14. privateint executeUpdate(finalUpdater updater){
  15. Context context =MetricsContext.start("ShardingStatement-executeUpdate");
  16. try{
  17. List<Integer> results = executorEngine.executeStatement(sqlType, statementUnits,newExecuteCallback<Integer>(){
  18. @Override
  19. publicInteger execute(finalBaseStatementUnit baseStatementUnit)throwsException{
  20. return updater.executeUpdate(baseStatementUnit.getStatement(), baseStatementUnit.getSqlExecutionUnit().getSql());
  21. }
  22. });
  23. return accumulate(results);
  24. }finally{
  25. MetricsContext.stop(context);
  26. }
  27. }
  28. /**
  29. * 计算总的更新数量
  30. * @param results 更新数量数组
  31. * @return 更新数量
  32. */
  33. privateint accumulate(finalList<Integer> results){
  34. int result =0;
  35. for(Integer each : results){
  36.       result +=null== each ?0: each;
  37. }
  38. return result;
  39. }
  • #execute() 因为有四个不同情况的 #execute(),所以抽象了 Executor 接口,从而达到逻辑重用。
  1. /**
  2. * 执行SQL请求.
  3. * @return true表示执行DQL语句, false表示执行的DML语句
  4. */
  5. publicboolean execute(){
  6. return execute(newExecutor(){
  7. @Override
  8. publicboolean execute(finalStatement statement,finalString sql)throwsSQLException{
  9. return statement.execute(sql);
  10. }
  11. });
  12. }
  13. privateboolean execute(finalExecutor executor){
  14. Context context =MetricsContext.start("ShardingStatement-execute");
  15. try{
  16. List<Boolean> result = executorEngine.executeStatement(sqlType, statementUnits,newExecuteCallback<Boolean>(){
  17. @Override
  18. publicBoolean execute(finalBaseStatementUnit baseStatementUnit)throwsException{
  19. return executor.execute(baseStatementUnit.getStatement(), baseStatementUnit.getSqlExecutionUnit().getSql());
  20. }
  21. });
  22. if(null== result || result.isEmpty()||null== result.get(0)){
  23. returnfalse;
  24. }
  25. return result.get(0);
  26. }finally{
  27. MetricsContext.stop(context);
  28. }
  29. }

3.2 PreparedStatementExecutor

PreparedStatementExecutor,多线程执行预编译语句对象请求的执行器。比 StatementExecutor 多了 parameters 参数,方法逻辑上基本一致,就不重复分享啦。

3.3 BatchPreparedStatementExecutor

BatchPreparedStatementExecutor,多线程执行批量预编译语句对象请求的执行器。
  1. // BatchPreparedStatementExecutor.java
  2. /**
  3. * 执行批量SQL.
  4. *
  5. * @return 执行结果
  6. */
  7. publicint[] executeBatch(){
  8. Context context =MetricsContext.start("ShardingPreparedStatement-executeBatch");
  9. try{
  10. return accumulate(executorEngine.executeBatch(sqlType, batchPreparedStatementUnits, parameterSets,newExecuteCallback<int[]>(){
  11. @Override
  12. publicint[] execute(finalBaseStatementUnit baseStatementUnit)throwsException{
  13. return baseStatementUnit.getStatement().executeBatch();
  14. }
  15. }));
  16. }finally{
  17. MetricsContext.stop(context);
  18. }
  19. }
  20. /**
  21. * 计算每个语句的更新数量
  22. *
  23. * @param results 每条 SQL 更新数量
  24. * @return 每个语句的更新数量
  25. */
  26. privateint[] accumulate(finalList<int[]> results){
  27. int[] result =newint[parameterSets.size()];
  28. int count =0;
  29. // 每个语句按照顺序,读取到其对应的每个分片SQL影响的行数进行累加
  30. for(BatchPreparedStatementUnit each : batchPreparedStatementUnits){
  31. for(Map.Entry<Integer,Integer> entry : each.getJdbcAndActualAddBatchCallTimesMap().entrySet()){
  32.           result[entry.getKey()]+=null== results.get(count)?0: results.get(count)[entry.getValue()];
  33. }
  34.       count++;
  35. }
  36. return result;
  37. }
眼尖的同学会发现,为什么有 BatchPreparedStatementExecutor,而没有 BatchStatementExecutor 呢?目前 Sharding-JDBC 不支持 Statement 批量操作,只能进行 PreparedStatement 的批操作。
  1. // PreparedStatement 批量操作,不会报错
  2. PreparedStatement ps = conn.prepareStatement(sql)
  3. ps.addBatch();
  4. ps.addBatch();
  5. // Statement 批量操作,会报错
  6. ps.addBatch(sql);// 报错:at com.dangdang.ddframe.rdb.sharding.jdbc.unsupported.AbstractUnsupportedOperationStatement.addBatch

4. ExecutionEvent

AbstractExecutionEvent,SQL 执行事件抽象接口。
  1. publicabstractclassAbstractExecutionEvent{
  2. /**
  3.     * 事件编号
  4.     */
  5. privatefinalString id;
  6. /**
  7.     * 数据源
  8.     */
  9. privatefinalString dataSource;
  10. /**
  11.     * SQL
  12.     */
  13. privatefinalString sql;
  14. /**
  15.     * 参数
  16.     */
  17. privatefinalList<Object> parameters;
  18. /**
  19.     * 事件类型
  20.     */
  21. privateEventExecutionType eventExecutionType;
  22. /**
  23.     * 异常
  24.     */
  25. privateOptional<SQLException> exception;
  26. }
AbstractExecutionEvent 有两个实现子类:
  • DMLExecutionEvent:DML类SQL执行时事件
  • DQLExecutionEvent:DQL类SQL执行时事件
EventExecutionType,事件触发类型。
  • BEFORE_EXECUTE:执行前
  • EXECUTE_SUCCESS:执行成功
  • EXECUTE_FAILURE:执行失败

4.1 EventBus

那究竟有什么用途呢? Sharding-JDBC 使用 Guava(没错,又是它)的 EventBus 实现了事件的发布和订阅。从上文 ExecutorEngine#executeInternal() 我们可以看到每个分片 SQL 执行的过程中会发布相应事件:
  • 执行 SQL 前:发布类型类型为 BEFORE_EXECUTE 的事件
  • 执行 SQL 成功:发布类型类型为 EXECUTE_SUCCESS 的事件
  • 执行 SQL 失败:发布类型类型为 EXECUTE_FAILURE 的事件
怎么订阅事件呢?非常简单,例子如下:
  1. EventBusInstance.getInstance().register(newRunnable(){
  2. @Override
  3. publicvoid run(){
  4. }
  5. @Subscribe// 订阅
  6. @AllowConcurrentEvents// 是否允许并发执行,即线程安全
  7. publicvoid listen(finalDMLExecutionEventevent){// DMLExecutionEvent
  8. System.out.println("DMLExecutionEvent:"+event.getSql()+"\t"+event.getEventExecutionType());
  9. }
  10. @Subscribe// 订阅
  11. @AllowConcurrentEvents// 是否允许并发执行,即线程安全
  12. publicvoid listen2(finalDQLExecutionEventevent){//DQLExecutionEvent
  13. System.out.println("DQLExecutionEvent:"+event.getSql()+"\t"+event.getEventExecutionType());
  14. }
  15. });
  • #register() 任何类都可以,并非一定需要使用 Runnable 类。此处例子单纯因为方便
  • @Subscribe 注解在方法上,实现对事件的订阅
  • @AllowConcurrentEvents 注解在方法上,表示线程安全,允许并发执行
  • 方法上的参数对应的类即是订阅的事件。例如, #listen() 订阅了 DMLExecutionEvent 事件
  • EventBus#post() 发布事件,同步调用订阅逻辑
  • 推荐阅读文章:《Guava学习笔记:EventBus》
Sharding-JDBC 正在收集使用公司名单:传送门。

🙂 你的登记,会让更多人参与和使用 Sharding-JDBC。传送门

Sharding-JDBC 也会因此,能够覆盖更多的业务场景。传送门

登记吧,骚年!传送门

4.2 BestEffortsDeliveryListener

BestEffortsDeliveryListener,最大努力送达型事务监听器。
本文暂时暂时不分析其实现,仅仅作为另外一个订阅者的例子。我们会在《柔性事务》进行分享。
  1. publicfinalclassBestEffortsDeliveryListener{
  2. @Subscribe
  3. @AllowConcurrentEvents
  4. publicvoid listen(finalDMLExecutionEventevent){
  5. if(!isProcessContinuously()){
  6. return;
  7. }
  8. SoftTransactionConfiguration transactionConfig =SoftTransactionManager.getCurrentTransactionConfiguration().get();
  9. TransactionLogStorage transactionLogStorage =TransactionLogStorageFactory.createTransactionLogStorage(transactionConfig.buildTransactionLogDataSource());
  10. BEDSoftTransaction bedSoftTransaction =(BEDSoftTransaction)SoftTransactionManager.getCurrentTransaction().get();
  11. switch(event.getEventExecutionType()){
  12. case BEFORE_EXECUTE:
  13. //TODO 对于批量执行的SQL需要解析成两层列表
  14.                transactionLogStorage.add(newTransactionLog(event.getId(), bedSoftTransaction.getTransactionId(), bedSoftTransaction.getTransactionType(),
  15. event.getDataSource(),event.getSql(),event.getParameters(),System.currentTimeMillis(),0));
  16. return;
  17. case EXECUTE_SUCCESS:
  18.                transactionLogStorage.remove(event.getId());
  19. return;
  20. case EXECUTE_FAILURE:
  21. boolean deliverySuccess =false;
  22. for(int i =0; i < transactionConfig.getSyncMaxDeliveryTryTimes(); i++){
  23. if(deliverySuccess){
  24. return;
  25. }
  26. boolean isNewConnection =false;
  27. Connection conn =null;
  28. PreparedStatement preparedStatement =null;
  29. try{
  30.                        conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(),SQLType.UPDATE);
  31. if(!isValidConnection(conn)){
  32.                            bedSoftTransaction.getConnection().release(conn);
  33.                            conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(),SQLType.UPDATE);
  34.                            isNewConnection =true;
  35. }
  36.                        preparedStatement = conn.prepareStatement(event.getSql());
  37. //TODO 对于批量事件需要解析成两层列表
  38. for(int parameterIndex =0; parameterIndex <event.getParameters().size(); parameterIndex++){
  39.                            preparedStatement.setObject(parameterIndex +1,event.getParameters().get(parameterIndex));
  40. }
  41.                        preparedStatement.executeUpdate();
  42.                        deliverySuccess =true;
  43.                        transactionLogStorage.remove(event.getId());
  44. }catch(finalSQLException ex){
  45.                        log.error(String.format("Delivery times %s error, max try times is %s", i +1, transactionConfig.getSyncMaxDeliveryTryTimes()), ex);
  46. }finally{
  47.                        close(isNewConnection, conn, preparedStatement);
  48. }
  49. }
  50. return;
  51. default:
  52. thrownewUnsupportedOperationException(event.getEventExecutionType().toString());
  53. }
  54. }
  55. }

666. 彩蛋

本文完,但也未完。
跨分片事务问题。例如:
  1. UPDATE t_order SET nickname =? WHERE user_id =?
A 节点
connection.commit()
时,应用突然挂了!B节点
connection.commit()
还来不及执行。

我们一起去《柔性事务》寻找答案。
道友,分享一波朋友圈可好?
继续阅读
阅读原文