数据库分库分表中间件 Sharding-JDBC 源码分析 —— SQL 执行
摘要: 原创出处 http://www.iocoder.cn/Sharding-JDBC/sql-execute/ 「芋道源码」欢迎转载,保留摘要,谢谢!
本文主要基于 Sharding-JDBC 1.5.0 正式版
- 1. 概述
- 2. ExecutorEngine
- 2.1 ListeningExecutorService
- 2.2 关闭
- 2.3 执行 SQL 任务
- 3. Executor
- 3.1 StatementExecutor
- 3.2 PreparedStatementExecutor
- 3.3 BatchPreparedStatementExecutor
- 4. ExecutionEvent
- 4.1 EventBus
- 4.2 BestEffortsDeliveryListener
- 666. 彩蛋
🙂🙂🙂关注微信公众号:【芋道源码】有福利:
RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址 您对于源码的疑问每条留言都将得到认真回复。甚至不知道如何读源码也可以请教噢。 新的源码解析文章实时收到通知。每周更新一篇左右。 认真的源码交流微信群。
1. 概述
越过千山万水(SQL 解析、SQL 路由、SQL 改写),我们终于来到了 SQL 执行。开森不开森?!
本文主要分享SQL 执行的过程,不包括结果聚合。《结果聚合》 东半球第二良心笔者会更新,关注微信公众号【芋道源码】完稿后第一时间通知您哟。
绿框部分 SQL 执行主流程。
Sharding-JDBC 正在收集使用公司名单:传送门。🙂 你的登记,会让更多人参与和使用 Sharding-JDBC。传送门Sharding-JDBC 也会因此,能够覆盖更多的业务场景。传送门登记吧,骚年!传送门
2. ExecutorEngine
ExecutorEngine,SQL执行引擎。
分表分库,需要执行的 SQL 数量从单条变成了多条,此时有两种方式执行:
- 串行执行 SQL
- 并行执行 SQL
前者,编码容易,性能较差,总耗时是多条 SQL 执行时间累加。
后者,编码复杂,性能较好,总耗时约等于执行时间最长的 SQL。
👼 ExecutorEngine 当然采用的是后者,并行执行 SQL。
2.1 ListeningExecutorService
Guava( Java 工具库 ) 提供的继承自 ExecutorService 的线程服务接口,提供创建 ListenableFuture 功能。ListenableFuture 接口,继承 Future 接口,有如下好处:
我们强烈地建议你在代码中多使用ListenableFuture来代替JDK的 Future, 因为:
大多数Futures 方法中需要它。 转到ListenableFuture 编程比较容易。 Guava提供的通用公共类封装了公共的操作方方法,不需要提供Future和ListenableFuture的扩展方法。传统JDK中的Future通过异步的方式计算返回结果:在多线程运算中可能或者可能在没有结束返回结果,Future是运行中的多线程的一个引用句柄,确保在服务执行返回一个Result。ListenableFuture可以允许你注册回调方法(callbacks),在运算(多线程执行)完成的时候进行调用, 或者在运算(多线程执行)完成后立即执行。这样简单的改进,使得可以明显的支持更多的操作,这样的功能在JDK concurrent中的Future是不支持的。
如上内容来自《Google Guava包的ListenableFuture解析
》,文章写的很棒。下文你会看到 Sharding-JDBC 是如何通过 ListenableFuture 简化并发编程的。
下面看看 ExecutorEngine 如何初始化 ListeningExecutorService
// ShardingDataSource.java
publicShardingDataSource(finalShardingRule shardingRule,finalProperties props){
// .... 省略部分代码
shardingProperties =newShardingProperties(props);
int executorSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_SIZE);
executorEngine =newExecutorEngine(executorSize);
// .... 省略部分代码
}
// ExecutorEngine
publicExecutorEngine(finalint executorSize){
executorService =MoreExecutors.listeningDecorator(newThreadPoolExecutor(
executorSize, executorSize,0,TimeUnit.MILLISECONDS,newLinkedBlockingQueue<Runnable>(),
newThreadFactoryBuilder().setDaemon(true).setNameFormat("ShardingJDBC-%d").build()));
MoreExecutors.addDelayedShutdownHook(executorService,60,TimeUnit.SECONDS);
}
- 一个分片数据源( ShardingDataSource ) 独占 一个 SQL执行引擎( ExecutorEngine )。
MoreExecutors#listeningDecorator()
创建 ListeningExecutorService,这样#submit()
,#invokeAll()
可以返回 ListenableFuture。- 默认情况下,线程池大小为 8。可以根据实际业务需要,设置 ShardingProperties 进行调整。
#setNameFormat()
并发编程时,一定要对线程名字做下定义,这样排查问题会方便很多。MoreExecutors#addDelayedShutdownHook()
,应用关闭时,等待所有任务全部完成再关闭。默认配置等待时间为 60 秒,建议将等待时间做成可配的。
2.2 关闭
数据源关闭时,会调用 ExecutorEngine 也进行关闭。
// ShardingDataSource.java
@Override
publicvoid close(){
executorEngine.close();
}
// ExecutorEngine
@Override
publicvoid close(){
executorService.shutdownNow();
try{
executorService.awaitTermination(5,TimeUnit.SECONDS);
}catch(finalInterruptedException ignored){
}
if(!executorService.isTerminated()){
thrownewShardingJdbcException("ExecutorEngine can not been terminated");
}
}
#shutdownNow()
尝试使用Thread.interrupt()
打断正在执行中的任务,未执行的任务不再执行。建议打印下哪些任务未执行,因为 SQL 未执行,可能数据未能持久化。#awaitTermination()
因为#shutdownNow()
打断不是立即结束,需要一个过程,因此这里等待了 5 秒。- 等待 5 秒后,线程池不一定已经关闭,此时抛出异常给上层。建议打印下日志,记录出现这个情况。
2.3 执行 SQL 任务
ExecutorEngine 对外暴露
#executeStatement()
, #executePreparedStatement()
, #executeBatch()
三个方法分别提供给 StatementExecutor、PreparedStatementExecutor、BatchPreparedStatementExecutor 调用。而这三个方法,内部调用的都是
#execute()
私有方法。// ExecutorEngine.java
/**
* 执行Statement.
* @param sqlType SQL类型
* @param statementUnits 语句对象执行单元集合
* @param executeCallback 执行回调函数
* @param <T> 返回值类型
* @return 执行结果
*/
public<T>List<T> executeStatement(finalSQLType sqlType,finalCollection<StatementUnit> statementUnits,finalExecuteCallback<T> executeCallback){
return execute(sqlType, statementUnits,Collections.<List<Object>>emptyList(), executeCallback);
}
/**
* 执行PreparedStatement.
* @param sqlType SQL类型
* @param preparedStatementUnits 语句对象执行单元集合
* @param parameters 参数列表
* @param executeCallback 执行回调函数
* @param <T> 返回值类型
* @return 执行结果
*/
public<T>List<T> executePreparedStatement(
finalSQLType sqlType,finalCollection<PreparedStatementUnit> preparedStatementUnits,finalList<Object> parameters,finalExecuteCallback<T> executeCallback){
return execute(sqlType, preparedStatementUnits,Collections.singletonList(parameters), executeCallback);
}
/**
* 执行Batch.
* @param sqlType SQL类型
* @param batchPreparedStatementUnits 语句对象执行单元集合
* @param parameterSets 参数列表集
* @param executeCallback 执行回调函数
* @return 执行结果
*/
publicList<int[]> executeBatch(
finalSQLType sqlType,finalCollection<BatchPreparedStatementUnit> batchPreparedStatementUnits,finalList<List<Object>> parameterSets,finalExecuteCallback<int[]> executeCallback){
return execute(sqlType, batchPreparedStatementUnits, parameterSets, executeCallback);
}
#execute()
执行过程大体流程如下图:/**
* 执行
*
* @param sqlType SQL 类型
* @param baseStatementUnits 语句对象执行单元集合
* @param parameterSets 参数列表集
* @param executeCallback 执行回调函数
* @param <T> 返回值类型
* @return 执行结果
*/
private<T>List<T> execute(
finalSQLType sqlType,finalCollection<?extendsBaseStatementUnit> baseStatementUnits,finalList<List<Object>> parameterSets,finalExecuteCallback<T> executeCallback){
if(baseStatementUnits.isEmpty()){
returnCollections.emptyList();
}
Iterator<?extendsBaseStatementUnit> iterator = baseStatementUnits.iterator();
BaseStatementUnit firstInput = iterator.next();
// 第二个任务开始所有 SQL任务 提交线程池【异步】执行任务
ListenableFuture<List<T>> restFutures = asyncExecute(sqlType,Lists.newArrayList(iterator), parameterSets, executeCallback);
T firstOutput;
List<T> restOutputs;
try{
// 第一个任务【同步】执行任务
firstOutput = syncExecute(sqlType, firstInput, parameterSets, executeCallback);
// 等待第二个任务开始所有 SQL任务完成
restOutputs = restFutures.get();
//CHECKSTYLE:OFF
}catch(finalException ex){
//CHECKSTYLE:ON
ExecutorExceptionHandler.handleException(ex);
returnnull;
}
// 返回结果
List<T> result =Lists.newLinkedList(restOutputs);
result.add(0, firstOutput);
return result;
}
- 第一个任务【同步】调用
#executeInternal()
执行任务。
private<T> T syncExecute(finalSQLType sqlType,finalBaseStatementUnit baseStatementUnit,finalList<List<Object>> parameterSets,finalExecuteCallback<T> executeCallback)throwsException{
// 【同步】执行任务
return executeInternal(sqlType, baseStatementUnit, parameterSets, executeCallback,ExecutorExceptionHandler.isExceptionThrown(),ExecutorDataMap.getDataMap());
}
- 第二个开始的任务提交线程池异步调用
#executeInternal()
执行任务。
private<T>ListenableFuture<List<T>> asyncExecute(
finalSQLType sqlType,finalCollection<BaseStatementUnit> baseStatementUnits,finalList<List<Object>> parameterSets,finalExecuteCallback<T> executeCallback){
List<ListenableFuture<T>> result =newArrayList<>(baseStatementUnits.size());
finalboolean isExceptionThrown =ExecutorExceptionHandler.isExceptionThrown();
finalMap<String,Object> dataMap =ExecutorDataMap.getDataMap();
for(finalBaseStatementUnit each : baseStatementUnits){
// 提交线程池【异步】执行任务
result.add(executorService.submit(newCallable<T>(){
@Override
public T call()throwsException{
return executeInternal(sqlType, each, parameterSets, executeCallback, isExceptionThrown, dataMap);
}
}));
}
// 返回 ListenableFuture
returnFutures.allAsList(result);
}
- 我们注意下
Futures.allAsList(result);
和restOutputs=restFutures.get();
。神器 Guava 简化并发编程 的好处就提现出来了。ListenableFuture#get()
当所有任务都成功时,返回所有任务执行结果;当任何一个任务失败时,马上抛出异常,无需等待其他任务执行完成。
😮 Guava 真她喵神器,公众号:【芋道源码】会更新 Guava 源码分享的一个系列哟!老司机还不赶紧上车?
- 为什么会分同步执行和异步执行呢?猜测,当SQL 执行是单表时,只要进行第一个任务的同步调用,性能更加优秀。等跟张亮大神请教确认原因后,咱会进行更新。
// ExecutorEngine.java
private<T> T executeInternal(finalSQLType sqlType,finalBaseStatementUnit baseStatementUnit,finalList<List<Object>> parameterSets,finalExecuteCallback<T> executeCallback,
finalboolean isExceptionThrown,finalMap<String,Object> dataMap)throwsException{
synchronized(baseStatementUnit.getStatement().getConnection()){
T result;
ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
ExecutorDataMap.setDataMap(dataMap);
List<AbstractExecutionEvent> events =newLinkedList<>();
// 生成 Event
if(parameterSets.isEmpty()){
events.add(getExecutionEvent(sqlType, baseStatementUnit,Collections.emptyList()));
}else{
for(List<Object> each : parameterSets){
events.add(getExecutionEvent(sqlType, baseStatementUnit, each));
}
}
// EventBus 发布 EventExecutionType.BEFORE_EXECUTE
for(AbstractExecutionEventevent: events){
EventBusInstance.getInstance().post(event);
}
try{
// 执行回调函数
result = executeCallback.execute(baseStatementUnit);
}catch(finalSQLException ex){
// EventBus 发布 EventExecutionType.EXECUTE_FAILURE
for(AbstractExecutionEvent each : events){
each.setEventExecutionType(EventExecutionType.EXECUTE_FAILURE);
each.setException(Optional.of(ex));
EventBusInstance.getInstance().post(each);
ExecutorExceptionHandler.handleException(ex);
}
returnnull;
}
// EventBus 发布 EventExecutionType.EXECUTE_SUCCESS
for(AbstractExecutionEvent each : events){
each.setEventExecutionType(EventExecutionType.EXECUTE_SUCCESS);
EventBusInstance.getInstance().post(each);
}
return result;
}
}
result=executeCallback.execute(baseStatementUnit);
执行回调函数。StatementExecutor,PreparedStatementExecutor,BatchPreparedStatementExecutor 通过传递执行回调函数( ExecuteCallback )实现给 ExecutorEngine 实现并行执行。
publicinterfaceExecuteCallback<T>{
/**
* 执行任务.
*
* @param baseStatementUnit 语句对象执行单元
* @return 处理结果
* @throws Exception 执行期异常
*/
T execute(BaseStatementUnit baseStatementUnit)throwsException;
}
synchronized(baseStatementUnit.getStatement().getConnection())
原以为 Connection 非线程安全,因此需要用同步,后翻查资料《数据库连接池为什么要建立多个连接》,Connection 是线程安全的。等跟张亮大神请教确认原因后,咱会进行更新。FROM https://github.com/dangdangdotcom/sharding-jdbc/issues/166druid的数据源的stat这种filter在并发使用同一个connection链接时没有考虑线程安全的问题,故造成多个线程修改filter中的状态异常。 改造这个问题时,考虑到mysql驱动在执行statement时对同一个connection是线程安全的。也就是说同一个数据库链接的会话是串行执行的。故在sjdbc的executor对于多线程执行的情况也进行了针对数据库链接级别的同步。故该方案不会降低sjdbc的性能。 同时jdk1.7版本的同步采用了锁升级技术,在碰撞较低的情况下开销也是很小的。- 解答:MySQL、Oracle 的 Connection 实现是线程安全的。数据库连接池实现的 Connection 不一定是线程安全,例如 Druid 的线程池 Connection 非线程安全
- ExecutionEvent 这里先不解释,在本文第四节【EventBus】分享。
- ExecutorExceptionHandler、ExecutorDataMap 和 柔性事务 ( AbstractSoftTransaction ),放在《柔性事务》分享。
3. Executor
Executor,执行器,目前一共有三个执行器。不同的执行器对应不同的执行单元 (BaseStatementUnit)。
执行器类 | 执行器名 | 执行单元 |
---|---|---|
StatementExecutor | 静态语句对象执行单元 | StatementUnit |
PreparedStatementExecutor | 预编译语句对象请求的执行器 | PreparedStatementUnit |
BatchPreparedStatementExecutor | 批量预编译语句对象请求的执行器 | BatchPreparedStatementUnit |
| ||
| ||
3.1 StatementExecutor
StatementExecutor,多线程执行静态语句对象请求的执行器,一共有三类方法:
- #executeQuery()
// StatementExecutor.java
/**
* 执行SQL查询.
* @return 结果集列表
*/
publicList<ResultSet> executeQuery(){
Context context =MetricsContext.start("ShardingStatement-executeQuery");
List<ResultSet> result;
try{
result = executorEngine.executeStatement(sqlType, statementUnits,newExecuteCallback<ResultSet>(){
@Override
publicResultSet execute(finalBaseStatementUnit baseStatementUnit)throwsException{
return baseStatementUnit.getStatement().executeQuery(baseStatementUnit.getSqlExecutionUnit().getSql());
}
});
}finally{
MetricsContext.stop(context);
}
return result;
}
#executeUpdate()
因为有四个不同情况的#executeUpdate()
,所以抽象了 Updater 接口,从而达到逻辑重用。
// StatementExecutor.java
/**
* 执行SQL更新.
* @return 更新数量
*/
publicint executeUpdate(){
return executeUpdate(newUpdater(){
@Override
publicint executeUpdate(finalStatement statement,finalString sql)throwsSQLException{
return statement.executeUpdate(sql);
}
});
}
privateint executeUpdate(finalUpdater updater){
Context context =MetricsContext.start("ShardingStatement-executeUpdate");
try{
List<Integer> results = executorEngine.executeStatement(sqlType, statementUnits,newExecuteCallback<Integer>(){
@Override
publicInteger execute(finalBaseStatementUnit baseStatementUnit)throwsException{
return updater.executeUpdate(baseStatementUnit.getStatement(), baseStatementUnit.getSqlExecutionUnit().getSql());
}
});
return accumulate(results);
}finally{
MetricsContext.stop(context);
}
}
/**
* 计算总的更新数量
* @param results 更新数量数组
* @return 更新数量
*/
privateint accumulate(finalList<Integer> results){
int result =0;
for(Integer each : results){
result +=null== each ?0: each;
}
return result;
}
#execute()
因为有四个不同情况的#execute()
,所以抽象了 Executor 接口,从而达到逻辑重用。
/**
* 执行SQL请求.
* @return true表示执行DQL语句, false表示执行的DML语句
*/
publicboolean execute(){
return execute(newExecutor(){
@Override
publicboolean execute(finalStatement statement,finalString sql)throwsSQLException{
return statement.execute(sql);
}
});
}
privateboolean execute(finalExecutor executor){
Context context =MetricsContext.start("ShardingStatement-execute");
try{
List<Boolean> result = executorEngine.executeStatement(sqlType, statementUnits,newExecuteCallback<Boolean>(){
@Override
publicBoolean execute(finalBaseStatementUnit baseStatementUnit)throwsException{
return executor.execute(baseStatementUnit.getStatement(), baseStatementUnit.getSqlExecutionUnit().getSql());
}
});
if(null== result || result.isEmpty()||null== result.get(0)){
returnfalse;
}
return result.get(0);
}finally{
MetricsContext.stop(context);
}
}
3.2 PreparedStatementExecutor
PreparedStatementExecutor,多线程执行预编译语句对象请求的执行器。比 StatementExecutor 多了
parameters
参数,方法逻辑上基本一致,就不重复分享啦。3.3 BatchPreparedStatementExecutor
BatchPreparedStatementExecutor,多线程执行批量预编译语句对象请求的执行器。
// BatchPreparedStatementExecutor.java
/**
* 执行批量SQL.
*
* @return 执行结果
*/
publicint[] executeBatch(){
Context context =MetricsContext.start("ShardingPreparedStatement-executeBatch");
try{
return accumulate(executorEngine.executeBatch(sqlType, batchPreparedStatementUnits, parameterSets,newExecuteCallback<int[]>(){
@Override
publicint[] execute(finalBaseStatementUnit baseStatementUnit)throwsException{
return baseStatementUnit.getStatement().executeBatch();
}
}));
}finally{
MetricsContext.stop(context);
}
}
/**
* 计算每个语句的更新数量
*
* @param results 每条 SQL 更新数量
* @return 每个语句的更新数量
*/
privateint[] accumulate(finalList<int[]> results){
int[] result =newint[parameterSets.size()];
int count =0;
// 每个语句按照顺序,读取到其对应的每个分片SQL影响的行数进行累加
for(BatchPreparedStatementUnit each : batchPreparedStatementUnits){
for(Map.Entry<Integer,Integer> entry : each.getJdbcAndActualAddBatchCallTimesMap().entrySet()){
result[entry.getKey()]+=null== results.get(count)?0: results.get(count)[entry.getValue()];
}
count++;
}
return result;
}
眼尖的同学会发现,为什么有 BatchPreparedStatementExecutor,而没有 BatchStatementExecutor 呢?目前 Sharding-JDBC 不支持 Statement 批量操作,只能进行 PreparedStatement 的批操作。
// PreparedStatement 批量操作,不会报错
PreparedStatement ps = conn.prepareStatement(sql)
ps.addBatch();
ps.addBatch();
// Statement 批量操作,会报错
ps.addBatch(sql);// 报错:at com.dangdang.ddframe.rdb.sharding.jdbc.unsupported.AbstractUnsupportedOperationStatement.addBatch
4. ExecutionEvent
AbstractExecutionEvent,SQL 执行事件抽象接口。
publicabstractclassAbstractExecutionEvent{
/**
* 事件编号
*/
privatefinalString id;
/**
* 数据源
*/
privatefinalString dataSource;
/**
* SQL
*/
privatefinalString sql;
/**
* 参数
*/
privatefinalList<Object> parameters;
/**
* 事件类型
*/
privateEventExecutionType eventExecutionType;
/**
* 异常
*/
privateOptional<SQLException> exception;
}
AbstractExecutionEvent 有两个实现子类:
- DMLExecutionEvent:DML类SQL执行时事件
- DQLExecutionEvent:DQL类SQL执行时事件
EventExecutionType,事件触发类型。
- BEFORE_EXECUTE:执行前
- EXECUTE_SUCCESS:执行成功
- EXECUTE_FAILURE:执行失败
4.1 EventBus
那究竟有什么用途呢? Sharding-JDBC 使用 Guava(没错,又是它)的 EventBus 实现了事件的发布和订阅。从上文
ExecutorEngine#executeInternal()
我们可以看到每个分片 SQL 执行的过程中会发布相应事件:- 执行 SQL 前:发布类型类型为 BEFORE_EXECUTE 的事件
- 执行 SQL 成功:发布类型类型为 EXECUTE_SUCCESS 的事件
- 执行 SQL 失败:发布类型类型为 EXECUTE_FAILURE 的事件
怎么订阅事件呢?非常简单,例子如下:
EventBusInstance.getInstance().register(newRunnable(){
@Override
publicvoid run(){
}
@Subscribe// 订阅
@AllowConcurrentEvents// 是否允许并发执行,即线程安全
publicvoid listen(finalDMLExecutionEventevent){// DMLExecutionEvent
System.out.println("DMLExecutionEvent:"+event.getSql()+"\t"+event.getEventExecutionType());
}
@Subscribe// 订阅
@AllowConcurrentEvents// 是否允许并发执行,即线程安全
publicvoid listen2(finalDQLExecutionEventevent){//DQLExecutionEvent
System.out.println("DQLExecutionEvent:"+event.getSql()+"\t"+event.getEventExecutionType());
}
});
#register()
任何类都可以,并非一定需要使用 Runnable 类。此处例子单纯因为方便@Subscribe
注解在方法上,实现对事件的订阅@AllowConcurrentEvents
注解在方法上,表示线程安全,允许并发执行- 方法上的参数对应的类即是订阅的事件。例如,
#listen()
订阅了 DMLExecutionEvent 事件 EventBus#post()
发布事件,同步调用订阅逻辑
- 推荐阅读文章:《Guava学习笔记:EventBus》
Sharding-JDBC 正在收集使用公司名单:传送门。🙂 你的登记,会让更多人参与和使用 Sharding-JDBC。传送门Sharding-JDBC 也会因此,能够覆盖更多的业务场景。传送门登记吧,骚年!传送门
4.2 BestEffortsDeliveryListener
BestEffortsDeliveryListener,最大努力送达型事务监听器。
本文暂时暂时不分析其实现,仅仅作为另外一个订阅者的例子。我们会在《柔性事务》进行分享。
publicfinalclassBestEffortsDeliveryListener{
@Subscribe
@AllowConcurrentEvents
publicvoid listen(finalDMLExecutionEventevent){
if(!isProcessContinuously()){
return;
}
SoftTransactionConfiguration transactionConfig =SoftTransactionManager.getCurrentTransactionConfiguration().get();
TransactionLogStorage transactionLogStorage =TransactionLogStorageFactory.createTransactionLogStorage(transactionConfig.buildTransactionLogDataSource());
BEDSoftTransaction bedSoftTransaction =(BEDSoftTransaction)SoftTransactionManager.getCurrentTransaction().get();
switch(event.getEventExecutionType()){
case BEFORE_EXECUTE:
//TODO 对于批量执行的SQL需要解析成两层列表
transactionLogStorage.add(newTransactionLog(event.getId(), bedSoftTransaction.getTransactionId(), bedSoftTransaction.getTransactionType(),
event.getDataSource(),event.getSql(),event.getParameters(),System.currentTimeMillis(),0));
return;
case EXECUTE_SUCCESS:
transactionLogStorage.remove(event.getId());
return;
case EXECUTE_FAILURE:
boolean deliverySuccess =false;
for(int i =0; i < transactionConfig.getSyncMaxDeliveryTryTimes(); i++){
if(deliverySuccess){
return;
}
boolean isNewConnection =false;
Connection conn =null;
PreparedStatement preparedStatement =null;
try{
conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(),SQLType.UPDATE);
if(!isValidConnection(conn)){
bedSoftTransaction.getConnection().release(conn);
conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(),SQLType.UPDATE);
isNewConnection =true;
}
preparedStatement = conn.prepareStatement(event.getSql());
//TODO 对于批量事件需要解析成两层列表
for(int parameterIndex =0; parameterIndex <event.getParameters().size(); parameterIndex++){
preparedStatement.setObject(parameterIndex +1,event.getParameters().get(parameterIndex));
}
preparedStatement.executeUpdate();
deliverySuccess =true;
transactionLogStorage.remove(event.getId());
}catch(finalSQLException ex){
log.error(String.format("Delivery times %s error, max try times is %s", i +1, transactionConfig.getSyncMaxDeliveryTryTimes()), ex);
}finally{
close(isNewConnection, conn, preparedStatement);
}
}
return;
default:
thrownewUnsupportedOperationException(event.getEventExecutionType().toString());
}
}
}
666. 彩蛋
本文完,但也未完。
跨分片事务问题。例如:
UPDATE t_order SET nickname =? WHERE user_id =?
A 节点
connection.commit()
时,应用突然挂了!B节点
connection.commit()
还来不及执行。
我们一起去《柔性事务》寻找答案。
道友,分享一波朋友圈可好?
阅读原文 最新评论
推荐文章
作者最新文章
你可能感兴趣的文章
Copyright Disclaimer: The copyright of contents (including texts, images, videos and audios) posted above belong to the User who shared or the third-party website which the User shared from. If you found your copyright have been infringed, please send a DMCA takedown notice to [email protected]. For more detail of the source, please click on the button "Read Original Post" below. For other communications, please send to [email protected].
版权声明:以上内容为用户推荐收藏至CareerEngine平台,其内容(含文字、图片、视频、音频等)及知识版权均属用户或用户转发自的第三方网站,如涉嫌侵权,请通知[email protected]进行信息删除。如需查看信息来源,请点击“查看原文”。如需洽谈其它事宜,请联系[email protected]。
版权声明:以上内容为用户推荐收藏至CareerEngine平台,其内容(含文字、图片、视频、音频等)及知识版权均属用户或用户转发自的第三方网站,如涉嫌侵权,请通知[email protected]进行信息删除。如需查看信息来源,请点击“查看原文”。如需洽谈其它事宜,请联系[email protected]。