摘要: 原创出处 http://www.iocoder.cn/Sharding-JDBC/jdbc-implement-and-read-write-splitting/ 「芋道源码」欢迎转载,保留摘要,谢谢!
排版已崩....建议点击原文,抱歉~~~~

本文主要基于 Sharding-JDBC 1.5.0 正式版
  • 1. 概述
  • 2. unspported 包
  • 3. adapter 包
  • 3.1 WrapperAdapter
  • 3.2 AbstractDataSourceAdapter
  • 3.3 AbstractConnectionAdapter
  • 3.4 AbstractStatementAdapter
  • 3.5 AbstractPreparedStatementAdapter
  • 3.6 AbstractResultSetAdapter
  • 4. 插入流程
  • 5. 查询流程
  • 6. 读写分离
  • 666. 彩蛋
🙂🙂🙂关注微信公众号:【芋道源码】有福利:
  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

本文主要分享 JDBC读写分离 的实现。为什么会把这两个东西放在一起讲呢?客户端直连数据库的读写分离主要通过获取读库和写库的不同连接来实现,和 JDBC Connection 刚好放在一块。
OK,我们先来看一段 Sharding-JDBC 官方对自己的定义和定位
Sharding-JDBC定位为轻量级java框架,使用客户端直连数据库,以jar包形式提供服务,未使用中间层,无需额外部署,无其他依赖,DBA也无需改变原有的运维方式,可理解为增强版的JDBC驱动,旧代码迁移成本几乎为零。
可以看出,Sharding-JDBC 通过实现 JDBC规范,对上层提供透明化数据库分库分表的访问。😈 黑科技?实际我们使用的数据库连接池也是通过这种方式实现对上层无感知的提供连接池。甚至还可以通过这种方式实现对 Lucene、MongoDB 等等的访问。
扯远了,下面来看看 Sharding-JDBC jdbc 包的结构:
  • unsupported:声明不支持的数据操作方法
  • adapter:适配类,实现和分库分表无关的方法
  • core:核心类,实现和分库分表相关的方法
根据 core 包,可以看出分到四种我们超级熟悉的对象
  • Datasource
  • Connection
  • Statement
  • ResultSet
实现层级如下:JDBC 接口 <=(继承)== unsupported抽象类 <=(继承)== unsupported抽象类 <=(继承)== core

本文内容顺序
  1. unspported 包
  2. adapter 包
  3. 插入流程,分析的类:
    • ShardingDataSource
    • ShardingConnection
    • ShardingPreparedStatement(ShardingStatement 类似,不重复分析)
    • GeneratedKeysResultSet、GeneratedKeysResultSetMetaData
  4. 查询流程,分析的类:
    • ShardingPreparedStatement
    • ShardingResultSet
  5. 读写分离,分析的类:
    • MasterSlaveDataSource

Sharding-JDBC 正在收集使用公司名单:传送门。

🙂 你的登记,会让更多人参与和使用 Sharding-JDBC。传送门

Sharding-JDBC 也会因此,能够覆盖更多的业务场景。传送门

登记吧,骚年!传送门

2. unspported 包

unspported 包内的抽象类,声明不支持操作的数据对象,所有方法都是 thrownewSQLFeatureNotSupportedException() 方式。
  1. publicabstractclassAbstractUnsupportedGeneratedKeysResultSetextendsAbstractUnsupportedOperationResultSet{
  2. @Override
  3. publicboolean getBoolean(finalint columnIndex)throwsSQLException{
  4. thrownewSQLFeatureNotSupportedException("getBoolean");
  5. }
  6. // .... 省略其它类似方法
  7. }
  8. publicabstractclassAbstractUnsupportedOperationConnectionextendsWrapperAdapterimplementsConnection{
  9. @Override
  10. publicfinalCallableStatement prepareCall(finalString sql)throwsSQLException{
  11. thrownewSQLFeatureNotSupportedException("prepareCall");
  12. }
  13. // .... 省略其它类似方法
  14. }

3. adapter 包

adapter 包内的抽象类,实现和分库分表无关的方法。
考虑到第4、5两小节更容易理解,本小节贴的代码会相对多

3.1 WrapperAdapter

WrapperAdapter,JDBC Wrapper 适配类。
对 Wrapper 接口实现如下两个方法
  1. @Override
  2. publicfinal<T> T unwrap(finalClass<T> iface)throwsSQLException{
  3. if(isWrapperFor(iface)){
  4. return(T)this;
  5. }
  6. thrownewSQLException(String.format("[%s] cannot be unwrapped as [%s]", getClass().getName(), iface.getName()));
  7. }
  8. @Override
  9. publicfinalboolean isWrapperFor(finalClass<?> iface)throwsSQLException{
  10. return iface.isInstance(this);
  11. }
提供子类 #recordMethodInvocation() 记录方法调用, #replayMethodsInvocation() 回放记录的方法调用
  1. /**
  2. * 记录的方法数组
  3. */
  4. privatefinalCollection<JdbcMethodInvocation> jdbcMethodInvocations =newArrayList<>();
  5. /**
  6. * 记录方法调用.
  7. *
  8. * @param targetClass 目标类
  9. * @param methodName 方法名称
  10. * @param argumentTypes 参数类型
  11. * @param arguments 参数
  12. */
  13. publicfinalvoid recordMethodInvocation(finalClass<?> targetClass,finalString methodName,finalClass<?>[] argumentTypes,finalObject[] arguments){
  14. try{
  15.       jdbcMethodInvocations.add(newJdbcMethodInvocation(targetClass.getMethod(methodName, argumentTypes), arguments));
  16. }catch(finalNoSuchMethodException ex){
  17. thrownewShardingJdbcException(ex);
  18. }
  19. }
  20. /**
  21. * 回放记录的方法调用.
  22. *
  23. * @param target 目标对象
  24. */
  25. publicfinalvoid replayMethodsInvocation(finalObject target){
  26. for(JdbcMethodInvocation each : jdbcMethodInvocations){
  27.       each.invoke(target);
  28. }
  29. }
  • 这两个方法有什么用途呢?例如下文会提到的 AbstractConnectionAdapter 的 #setAutoCommit(),当它无数据库连接时,先记录;等获得到数据连接后,再回放:
    1. // AbstractConnectionAdapter.java
    2. @Override
    3. publicfinalvoid setAutoCommit(finalboolean autoCommit)throwsSQLException{
    4. this.autoCommit = autoCommit;
    5. if(getConnections().isEmpty()){// 无数据连接时,记录方法调用
    6.       recordMethodInvocation(Connection.class,"setAutoCommit",newClass[]{boolean.class},newObject[]{autoCommit});
    7. return;
    8. }
    9. for(Connection each : getConnections()){
    10.       each.setAutoCommit(autoCommit);
    11. }
    12. }
  • JdbcMethodInvocation,反射调用JDBC相关方法的工具类:
    1. publicclassJdbcMethodInvocation{
    2. /**
    3.    * 方法
    4.    */
    5. @Getter
    6. privatefinalMethod method;
    7. /**
    8.    * 方法参数
    9.    */
    10. @Getter
    11. privatefinalObject[] arguments;
    12. /**
    13.    *  调用方法.
    14.    *
    15.    * @param target 目标对象
    16.    */
    17. publicvoid invoke(finalObject target){
    18. try{
    19.           method.invoke(target, arguments);// 反射调用
    20. }catch(finalIllegalAccessException|InvocationTargetException ex){
    21. thrownewShardingJdbcException("Invoke jdbc method exception", ex);
    22. }
    23. }
    24. }
提供子类 #throwSQLExceptionIfNecessary() 抛出异常链
  1. protectedvoid throwSQLExceptionIfNecessary(finalCollection<SQLException> exceptions)throwsSQLException{
  2. if(exceptions.isEmpty()){// 为空不抛出异常
  3. return;
  4. }
  5. SQLException ex =newSQLException();
  6. for(SQLException each : exceptions){
  7.       ex.setNextException(each);// 异常链
  8. }
  9. throw ex;
  10. }

3.2 AbstractDataSourceAdapter

AbstractDataSourceAdapter,数据源适配类。
直接点击链接查看源码。

3.3 AbstractConnectionAdapter

AbstractConnectionAdapter,数据库连接适配类。
我们来瞅瞅大家最关心的事务相关方法的实现。
  1. /**
  2. * 是否自动提交
  3. */
  4. privateboolean autoCommit =true;
  5. /**
  6. * 获得链接
  7. *
  8. * @return 链接
  9. */
  10. protectedabstractCollection<Connection> getConnections();
  11. @Override
  12. publicfinalboolean getAutoCommit()throwsSQLException{
  13. return autoCommit;
  14. }
  15. @Override
  16. publicfinalvoid setAutoCommit(finalboolean autoCommit)throwsSQLException{
  17. this.autoCommit = autoCommit;
  18. if(getConnections().isEmpty()){// 无数据连接时,记录方法调用
  19.       recordMethodInvocation(Connection.class,"setAutoCommit",newClass[]{boolean.class},newObject[]{autoCommit});
  20. return;
  21. }
  22. for(Connection each : getConnections()){
  23.       each.setAutoCommit(autoCommit);
  24. }
  25. }
  • #setAutoCommit() 调用时,实际会设置其所持有的 Connection 的 autoCommit 属性
  • #getConnections() 和分库分表相关,因而仅抽象该方法,留给子类实现
  1. @Override
  2. publicfinalvoid commit()throwsSQLException{
  3. for(Connection each : getConnections()){
  4.       each.commit();
  5. }
  6. }
  7. @Override
  8. publicfinalvoid rollback()throwsSQLException{
  9. Collection<SQLException> exceptions =newLinkedList<>();
  10. for(Connection each : getConnections()){
  11. try{
  12.           each.rollback();
  13. }catch(finalSQLException ex){
  14.           exceptions.add(ex);
  15. }
  16. }
  17.   throwSQLExceptionIfNecessary(exceptions);
  18. }
  • #commit()#rollback() 调用时,实际调用其所持有的 Connection 的方法
  • 异常情况下, #commit()#rollback() 处理方式不同,笔者暂时不知道答案,求证后会进行更新
    • #commit() 处理方式需要改成和 #rollback() 一样。代码如下:
    1. @Override
    2. publicfinalvoid commit()throwsSQLException{
    3. Collection<SQLException> exceptions =newLinkedList<>();
    4. for(Connection each : getConnections()){
    5. try{
    6.           each.commit();
    7. }catch(finalSQLException ex){
    8.           exceptions.add(ex);
    9. }
    10. }
    11.   throwSQLExceptionIfNecessary(exceptions);
    12. }
事务级别和是否只读相关代码如下:
  1. /**
  2. * 只读
  3. */
  4. privateboolean readOnly =true;
  5. /**
  6. * 事务级别
  7. */
  8. privateint transactionIsolation = TRANSACTION_READ_UNCOMMITTED;
  9. @Override
  10. publicfinalvoid setReadOnly(finalboolean readOnly)throwsSQLException{
  11. this.readOnly = readOnly;
  12. if(getConnections().isEmpty()){
  13.       recordMethodInvocation(Connection.class,"setReadOnly",newClass[]{boolean.class},newObject[]{readOnly});
  14. return;
  15. }
  16. for(Connection each : getConnections()){
  17.       each.setReadOnly(readOnly);
  18. }
  19. }
  20. @Override
  21. publicfinalvoid setTransactionIsolation(finalint level)throwsSQLException{
  22.   transactionIsolation = level;
  23. if(getConnections().isEmpty()){
  24.       recordMethodInvocation(Connection.class,"setTransactionIsolation",newClass[]{int.class},newObject[]{level});
  25. return;
  26. }
  27. for(Connection each : getConnections()){
  28.       each.setTransactionIsolation(level);
  29. }
  30. }

3.4 AbstractStatementAdapter

AbstractStatementAdapter,静态语句对象适配类。
  1. @Override
  2. publicfinalint getUpdateCount()throwsSQLException{
  3. long result =0;
  4. boolean hasResult =false;
  5. for(Statement each : getRoutedStatements()){
  6. if(each.getUpdateCount()>-1){
  7.           hasResult =true;
  8. }
  9.       result += each.getUpdateCount();
  10. }
  11. if(result >Integer.MAX_VALUE){
  12.       result =Integer.MAX_VALUE;
  13. }
  14. return hasResult ?Long.valueOf(result).intValue():-1;
  15. }
  16. /**
  17. * 获取路由的静态语句对象集合.
  18. *
  19. * @return 路由的静态语句对象集合
  20. */
  21. protectedabstractCollection<?extendsStatement> getRoutedStatements();
  • #getUpdateCount() 调用持有的 Statement 计算更新数量
  • #getRoutedStatements() 和分库分表相关,因而仅抽象该方法,留给子类实现

3.5 AbstractPreparedStatementAdapter

AbstractPreparedStatementAdapter,预编译语句对象的适配类。
#recordSetParameter()实现对占位符参数的设置
  1. /**
  2. * 记录的设置参数方法数组
  3. */
  4. privatefinalList<SetParameterMethodInvocation> setParameterMethodInvocations =newLinkedList<>();
  5. /**
  6. * 参数
  7. */
  8. @Getter
  9. privatefinalList<Object> parameters =newArrayList<>();
  10. @Override
  11. publicfinalvoid setInt(finalint parameterIndex,finalint x)throwsSQLException{
  12.   setParameter(parameterIndex, x);
  13.   recordSetParameter("setInt",newClass[]{int.class,int.class}, parameterIndex, x);
  14. }
  15. /**
  16. * 记录占位符参数
  17. *
  18. * @param parameterIndex 占位符参数位置
  19. * @param value 参数
  20. */
  21. privatevoid setParameter(finalint parameterIndex,finalObject value){
  22. if(parameters.size()== parameterIndex -1){
  23.       parameters.add(value);
  24. return;
  25. }
  26. for(int i = parameters.size(); i <= parameterIndex -1; i++){// 用 null 填充前面未设置的位置
  27.       parameters.add(null);
  28. }
  29.   parameters.set(parameterIndex -1, value);
  30. }
  31. /**
  32. * 记录设置参数方法调用
  33. *
  34. * @param methodName 方法名,例如 setInt、setLong 等
  35. * @param argumentTypes 参数类型
  36. * @param arguments 参数
  37. */
  38. privatevoid recordSetParameter(finalString methodName,finalClass[] argumentTypes,finalObject... arguments){
  39. try{
  40.       setParameterMethodInvocations.add(newSetParameterMethodInvocation(PreparedStatement.class.getMethod(methodName, argumentTypes), arguments, arguments[1]));
  41. }catch(finalNoSuchMethodException ex){
  42. thrownewShardingJdbcException(ex);
  43. }
  44. }
  45. /**
  46. * 回放记录的设置参数方法调用
  47. *
  48. * @param preparedStatement 预编译语句对象
  49. */
  50. protectedvoid replaySetParameter(finalPreparedStatement preparedStatement){
  51.   addParameters();
  52. for(SetParameterMethodInvocation each : setParameterMethodInvocations){
  53.       updateParameterValues(each, parameters.get(each.getIndex()-1));// 同一个位置多次设置,值可能不一样,需要更新下
  54.       each.invoke(preparedStatement);
  55. }
  56. }
  57. /**
  58. * 当使用分布式主键时,生成后会添加到 parameters,此时 parameters 数量多于 setParameterMethodInvocations,需要生成该分布式主键的 SetParameterMethodInvocation
  59. */
  60. privatevoid addParameters(){
  61. for(int i = setParameterMethodInvocations.size(); i < parameters.size(); i++){
  62.       recordSetParameter("setObject",newClass[]{int.class,Object.class}, i +1, parameters.get(i));
  63. }
  64. }
  65. privatevoid updateParameterValues(finalSetParameterMethodInvocation setParameterMethodInvocation,finalObject value){
  66. if(!Objects.equals(setParameterMethodInvocation.getValue(), value)){
  67.       setParameterMethodInvocation.changeValueArgument(value);// 修改占位符参数
  68. }
  69. }
  • 逻辑类似 WrapperAdapter#recordMethodInvocation()#replayMethodsInvocation(),请认真阅读代码注释
  • SetParameterMethodInvocation,继承 JdbcMethodInvocation,反射调用参数设置方法的工具类:
    1. publicfinalclassSetParameterMethodInvocationextendsJdbcMethodInvocation{
      /**
    2. * 位置
    3. */
    4. @Getter
    5. privatefinalint index;
    6. /**
    7. * 参数值
    8. */
    9. @Getter
    10. privatefinalObject value;
    11. /**
    12. * 设置参数值.
    13. *
    14. * @param value 参数值
    15. */
    16. publicvoid changeValueArgument(finalObject value){
    17.    getArguments()[1]= value;
    18. }
    19. }

3.6 AbstractResultSetAdapter

AbstractResultSetAdapter,代理结果集适配器。
  1. publicabstractclassAbstractResultSetAdapterextendsAbstractUnsupportedOperationResultSet{
  2. /**
  3.     * 结果集集合
  4.     */
  5. @Getter
  6. privatefinalList<ResultSet> resultSets;
  7. @Override
  8. // TODO should return sharding statement in future
  9. publicfinalStatement getStatement()throwsSQLException{
  10. return getResultSets().get(0).getStatement();
  11. }
  12. @Override
  13. publicfinalResultSetMetaData getMetaData()throwsSQLException{
  14. return getResultSets().get(0).getMetaData();
  15. }
  16. @Override
  17. publicint findColumn(finalString columnLabel)throwsSQLException{
  18. return getResultSets().get(0).findColumn(columnLabel);
  19. }
  20. // .... 省略其它方法
  21. }

4. 插入流程

插入使用分布式主键例子代码如下:
  1. // 代码仅仅是例子,生产环境下请注意异常处理和资源关闭
  2. String sql ="INSERT INTO t_order(uid, nickname, pid) VALUES (1, '2', ?)";
  3. DataSource dataSource =newShardingDataSource(shardingRule);
  4. Connection conn = dataSource.getConnection();
  5. PreparedStatement ps = conn.prepareStatement(sql,Statement.RETURN_GENERATED_KEYS);// 返回主键需要  Statement.RETURN_GENERATED_KEYS
  6. ps.setLong(1,100);
  7. ps.executeUpdate();
  8. ResultSet rs = ps.getGeneratedKeys();
  9. if(rs.next()){
  10. System.out.println("id:"+ rs.getLong(1));
  11. }
调用 #executeUpdate() 方法,内部过程如下
是不是对上层完全透明?!我们来看看内部是怎么实现的。
  1. // ShardingPreparedStatement.java
  2. @Override
  3. publicint executeUpdate()throwsSQLException{
  4. try{
  5. Collection<PreparedStatementUnit> preparedStatementUnits = route();
  6. returnnewPreparedStatementExecutor(
  7.               getShardingConnection().getShardingContext().getExecutorEngine(), getRouteResult().getSqlStatement().getType(), preparedStatementUnits, getParameters()).executeUpdate();
  8. }finally{
  9.       clearBatch();
  10. }
  11. }
  • #route() 分库分表路由,获得预编译语句对象执行单元( PreparedStatementUnit )集合。
    1. publicfinalclassPreparedStatementUnitimplementsBaseStatementUnit{
    2. /**
    3.     * SQL 执行单元
    4.     */
    5. privatefinalSQLExecutionUnit sqlExecutionUnit;
    6. /**
    7.     * 预编译语句对象
    8.     */
    9. privatefinalPreparedStatement statement;
    10. }
  • #executeUpdate() 调用执行引擎并行执行多个预编译语句对象。执行时,最终调用预编译语句对象( PreparedStatement )。我们来看一个例子:
    1. // PreparedStatementExecutor.java
    2. publicint executeUpdate(){
    3. Context context =MetricsContext.start("ShardingPreparedStatement-executeUpdate");
    4. try{
    5. List<Integer> results = executorEngine.executePreparedStatement(sqlType, preparedStatementUnits, parameters,newExecuteCallback<Integer>(){
      @Override
    6. publicInteger execute(finalBaseStatementUnit baseStatementUnit)throwsException{
    7. // 调用 PreparedStatement#executeUpdate()
    8. return((PreparedStatement) baseStatementUnit.getStatement()).executeUpdate();
    9. }
    10. });
    11. return accumulate(results);
    12. }finally{
    13. MetricsContext.stop(context);
    14. }
    15. }

  1. // ShardingPreparedStatement.java
  2. privateCollection<PreparedStatementUnit> route()throwsSQLException{
  3. Collection<PreparedStatementUnit> result =newLinkedList<>();
  4. // 路由
  5.   setRouteResult(routingEngine.route(getParameters()));
  6. // 遍历 SQL 执行单元
  7. for(SQLExecutionUnit each : getRouteResult().getExecutionUnits()){
  8. SQLType sqlType = getRouteResult().getSqlStatement().getType();
  9. Collection<PreparedStatement> preparedStatements;
  10. // 创建实际的 PreparedStatement
  11. if(SQLType.DDL == sqlType){
  12.           preparedStatements = generatePreparedStatementForDDL(each);
  13. }else{
  14.           preparedStatements =Collections.singletonList(generatePreparedStatement(each));
  15. }
  16.       getRoutedStatements().addAll(preparedStatements);
  17. // 回放设置占位符参数到 PreparedStatement
  18. for(PreparedStatement preparedStatement : preparedStatements){
  19.           replaySetParameter(preparedStatement);
  20.           result.add(newPreparedStatementUnit(each, preparedStatement));
  21. }
  22. }
  23. return result;
  24. }
  25. /**
  26. * 创建 PreparedStatement
  27. *
  28. * @param sqlExecutionUnit SQL 执行单元
  29. * @return PreparedStatement
  30. * @throws SQLException 当 JDBC 操作发生异常时
  31. */
  32. privatePreparedStatement generatePreparedStatement(finalSQLExecutionUnit sqlExecutionUnit)throwsSQLException{
  33. Optional<GeneratedKey> generatedKey = getGeneratedKey();
  34. // 获得连接
  35. Connection connection = getShardingConnection().getConnection(sqlExecutionUnit.getDataSource(), getRouteResult().getSqlStatement().getType());
  36. // 声明返回主键
  37. if(isReturnGeneratedKeys()|| isReturnGeneratedKeys()&& generatedKey.isPresent()){
  38. return connection.prepareStatement(sqlExecutionUnit.getSql(), RETURN_GENERATED_KEYS);
  39. }
  40. return connection.prepareStatement(sqlExecutionUnit.getSql(), getResultSetType(), getResultSetConcurrency(), getResultSetHoldability());
  41. }
  • 调用 #generatePreparedStatement() 创建 PreparedStatement,后调用 #replaySetParameter() 回放设置占位符参数到 PreparedStatement
  • 声明返回主键 时,即 #isReturnGeneratedKeys() 返回 true 时,调用 connection.prepareStatement(sqlExecutionUnit.getSql(),RETURN_GENERATED_KEYS)。为什么该方法会返回 true?上文例子 conn.prepareStatement(sql,Statement.RETURN_GENERATED_KEYS)
    声明返回主键后,插入执行完成,我们调用 #getGeneratedKeys() 可以获得主键 :
    1. // ShardingStatement.java
    2. @Override
    3. publicResultSet getGeneratedKeys()throwsSQLException{
    4. Optional<GeneratedKey> generatedKey = getGeneratedKey();
    5. // 分布式主键
    6. if(generatedKey.isPresent()&& returnGeneratedKeys){
    7. returnnewGeneratedKeysResultSet(routeResult.getGeneratedKeys().iterator(), generatedKey.get().getColumn(),this);
    8. }
    9. // 数据库自增
    10. if(1== getRoutedStatements().size()){
    11. return getRoutedStatements().iterator().next().getGeneratedKeys();
    12. }
    13. returnnewGeneratedKeysResultSet();
    14. }
    15. // ShardingConnection.java
    16. @Override
    17. publicPreparedStatement prepareStatement(finalString sql,finalString[] columnNames)throwsSQLException{
    18. returnnewShardingPreparedStatement(this, sql,Statement.RETURN_GENERATED_KEYS);
    19. }
    20. // ShardingPreparedStatement.java
    21. publicShardingPreparedStatement(finalShardingConnection shardingConnection,finalString sql,finalint autoGeneratedKeys){
    22. this(shardingConnection, sql);
    23. if(RETURN_GENERATED_KEYS == autoGeneratedKeys){
    24.     markReturnGeneratedKeys();
    25. }
    26. }
    27. protectedfinalvoid markReturnGeneratedKeys(){
    28. returnGeneratedKeys =true;
    29. }
  • 调用 ShardingConnection#getConnection() 方法获得该 PreparedStatement 对应的真实数据库连接( Connection ):
    • 调用 #getCachedConnection() 尝试获得已缓存的数据库连接;如果缓存中不存在,获取到连接后会进行缓存
    • 从 ShardingRule 配置的 DataSourceRule 获取真实的数据源( DataSource )
    • MasterSlaveDataSource 实现主从数据源封装,我们在下小节分享
    • 调用 #replayMethodsInvocation() 回放记录的 Connection 方法
    1. // ShardingConnection.java
    2. /**
    3. * 根据数据源名称获取相应的数据库连接.
    4. *
    5. * @param dataSourceName 数据源名称
    6. * @param sqlType SQL语句类型
    7. * @return 数据库连接
    8. * @throws SQLException SQL异常
    9. */
    10. publicConnection getConnection(finalString dataSourceName,finalSQLType sqlType)throwsSQLException{
    11. // 从连接缓存中获取连接
    12. Optional<Connection> connection = getCachedConnection(dataSourceName, sqlType);
    13. if(connection.isPresent()){
    14. return connection.get();
    15. }
    16. Context metricsContext =MetricsContext.start(Joiner.on("-").join("ShardingConnection-getConnection", dataSourceName));
    17. //
    18. DataSource dataSource = shardingContext.getShardingRule().getDataSourceRule().getDataSource(dataSourceName);
    19. Preconditions.checkState(null!= dataSource,"Missing the rule of %s in DataSourceRule", dataSourceName);
    20. String realDataSourceName;
    21. if(dataSource instanceofMasterSlaveDataSource){
    22.        dataSource =((MasterSlaveDataSource) dataSource).getDataSource(sqlType);
    23.        realDataSourceName =MasterSlaveDataSource.getDataSourceName(dataSourceName, sqlType);
    24. }else{
    25.        realDataSourceName = dataSourceName;
    26. }
    27. Connection result = dataSource.getConnection();
    28. MetricsContext.stop(metricsContext);
    29. // 添加到连接缓存
    30.    connectionMap.put(realDataSourceName, result);
    31. // 回放 Connection 方法
    32.    replayMethodsInvocation(result);
    33. return result;
    34. }
    35. privateOptional<Connection> getCachedConnection(finalString dataSourceName,finalSQLType sqlType){
    36. String key = connectionMap.containsKey(dataSourceName)? dataSourceName :MasterSlaveDataSource.getDataSourceName(dataSourceName, sqlType);
    37. returnOptional.fromNullable(connectionMap.get(key));
    38. }
插入实现的代码基本分享完了,因为是不断代码下钻的方式分析,可以反向向上在理理,会更加清晰

5. 查询流程

单纯从 core 包里的 JDBC 实现,查询流程 #executeQuery()#execute() 基本一致,差别在于执行多结果集归并
  1. @Override
  2. publicResultSet executeQuery()throwsSQLException{
  3. ResultSet result;
  4. try{
  5. // 路由
  6. Collection<PreparedStatementUnit> preparedStatementUnits = route();
  7. // 执行
  8. List<ResultSet> resultSets =newPreparedStatementExecutor(
  9.               getShardingConnection().getShardingContext().getExecutorEngine(), getRouteResult().getSqlStatement().getType(), preparedStatementUnits, getParameters()).executeQuery();
  10. // 结果归并
  11.       result =newShardingResultSet(resultSets,newMergeEngine(
  12.               getShardingConnection().getShardingContext().getDatabaseType(), resultSets,(SelectStatement) getRouteResult().getSqlStatement()).merge());
  13. }finally{
  14.       clearBatch();
  15. }
  16. // 设置结果集
  17.   setCurrentResultSet(result);
  18. return result;
  19. }
  • SQL执行 感兴趣的同学可以看:《Sharding-JDBC 源码分析 —— SQL 执行》
  • 结果归并 感兴趣的同学可以看:《Sharding-JDBC 源码分析 —— 结果归并》
  • 结果归并 #merge() 完后,创建分片结果集( ShardingResultSet )
    1. publicfinalclassShardingResultSetextendsAbstractResultSetAdapter{
    2. /**
    3.     * 归并结果集
    4.     */
    5. privatefinalResultSetMerger mergeResultSet;
      @Override
    6. publicint getInt(finalint columnIndex)throwsSQLException{
    7. Object result = mergeResultSet.getValue(columnIndex,int.class);
    8.    wasNull =null== result;
    9. return(int)ResultSetUtil.convertValue(result,int.class);
    10. }
    11. @Override
    12. publicint getInt(finalString columnLabel)throwsSQLException{
    13. Object result = mergeResultSet.getValue(columnLabel,int.class);
    14.    wasNull =null== result;
    15. return(int)ResultSetUtil.convertValue(result,int.class);
    16. }
    17. // .... 隐藏其他类似 getXXXX() 方法
    18. }

6. 读写分离

建议前置阅读:《官方文档 —— 读写分离》
当你有读写分离的需求时,将 ShardingRule 配置对应的数据源 从 ShardingDataSource 替换成 MasterSlaveDataSource。我们来看看 MasterSlaveDataSource 的功能和实现。
支持一主多从的读写分离配置,可配合分库分表使用
  1. // MasterSlaveDataSourceFactory.java
  2. publicfinalclassMasterSlaveDataSourceFactory{
  3. /**
  4.     * 创建读写分离数据源.
  5.     *
  6.     * @param name 读写分离数据源名称
  7.     * @param masterDataSource 主节点数据源
  8.     * @param slaveDataSource 从节点数据源
  9.     * @param otherSlaveDataSources 其他从节点数据源
  10.     * @return 读写分离数据源
  11.     */
  12. publicstaticDataSource createDataSource(finalString name,finalDataSource masterDataSource,finalDataSource slaveDataSource,finalDataSource... otherSlaveDataSources){
  13. returnnewMasterSlaveDataSource(name, masterDataSource,Lists.asList(slaveDataSource, otherSlaveDataSources));
  14. }
  15. }
  16. // MasterSlaveDataSource.java
  17. publicfinalclassMasterSlaveDataSourceextendsAbstractDataSourceAdapter{
  18. /**
  19.     * 数据源名
  20.     */
  21. privatefinalString name;
  22. /**
  23.     * 主数据源
  24.     */
  25. @Getter
  26. privatefinalDataSource masterDataSource;
  27. /**
  28.     * 从数据源集合
  29.     */
  30. @Getter
  31. privatefinalList<DataSource> slaveDataSources;
  32. }
同一线程且同一数据库连接内,如有写入操作,以后的读操作均从主库读取,用于保证数据一致性。
  1. // ShardingConnection.java
  2. publicConnection getConnection(finalString dataSourceName,finalSQLType sqlType)throwsSQLException{
  3. // .... 省略部分代码
  4. String realDataSourceName;
  5. if(dataSource instanceofMasterSlaveDataSource){// 读写分离
  6.       dataSource =((MasterSlaveDataSource) dataSource).getDataSource(sqlType);
  7.       realDataSourceName =MasterSlaveDataSource.getDataSourceName(dataSourceName, sqlType);
  8. }else{
  9.       realDataSourceName = dataSourceName;
  10. }
  11. Connection result = dataSource.getConnection();
  12. // .... 省略部分代码
  13. }
  14. // MasterSlaveDataSource.java
  15. /**
  16. * 当前线程是否是 DML 操作标识
  17. */
  18. privatestaticfinalThreadLocal<Boolean> DML_FLAG =newThreadLocal<Boolean>(){
  19. @Override
  20. protectedBoolean initialValue(){
  21. returnfalse;
  22. }
  23. };
  24. /**
  25. * 从库负载均衡策略
  26. */
  27. privatefinalSlaveLoadBalanceStrategy slaveLoadBalanceStrategy =newRoundRobinSlaveLoadBalanceStrategy();
  28. /**
  29. * 获取主或从节点的数据源.
  30. *
  31. * @param sqlType SQL类型
  32. * @return 主或从节点的数据源
  33. */
  34. publicDataSource getDataSource(finalSQLType sqlType){
  35. if(isMasterRoute(sqlType)){
  36.       DML_FLAG.set(true);
  37. return masterDataSource;
  38. }
  39. return slaveLoadBalanceStrategy.getDataSource(name, slaveDataSources);
  40. }
  41. privatestaticboolean isMasterRoute(finalSQLType sqlType){
  42. returnSQLType.DQL != sqlType || DML_FLAG.get()||HintManagerHolder.isMasterRouteOnly();
  43. }
  • ShardingConnection 获取到的数据源是 MasterSlaveDataSource 时,调用 MasterSlaveDataSource#getConnection() 方法获取真实的数据源
  • 通过 #isMasterRoute() 判断是否读取主库,以下三种情况会访问主库:
    • 非查询语句 (DQL)
    • 数据源在当前线程访问过主库:通过线程变量 DML_FLAG 实现
    • 强制主库:程序里调用 HintManager.getInstance().setMasterRouteOnly() 实现
  • 访问从库时,会通过负载均衡策略( SlaveLoadBalanceStrategy ) 选择一个从库
    • MasterSlaveDataSource 默认使用 RoundRobinSlaveLoadBalanceStrategy,暂时不支持配置
    • RoundRobinSlaveLoadBalanceStrategy,轮询负载均衡策略,每个从节点访问次数均衡,暂不支持数据源故障移除
    1. // SlaveLoadBalanceStrategy.java
    2. publicinterfaceSlaveLoadBalanceStrategy{
      /**
    3. * 根据负载均衡策略获取从库数据源.
    4. *
    5. * @param name 读写分离数据源名称
    6. * @param slaveDataSources 从库数据源列表
    7. * @return 选中的从库数据源
    8. */
    9. DataSource getDataSource(String name,List&lt;DataSource&gt; slaveDataSources);
    10. }
    11. // RoundRobinSlaveLoadBalanceStrategy.java
    12. publicfinalclassRoundRobinSlaveLoadBalanceStrategyimplementsSlaveLoadBalanceStrategy{
      privatestaticfinalConcurrentHashMap&lt;String,AtomicInteger&gt; COUNT_MAP =newConcurrentHashMap&lt;&gt;();
    13. @Override
    14. publicDataSource getDataSource(finalString name,finalList&lt;DataSource&gt; slaveDataSources){
    15. AtomicInteger count = COUNT_MAP.containsKey(name)? COUNT_MAP.get(name):newAtomicInteger(0);
    16.    COUNT_MAP.putIfAbsent(name, count);
    17.    count.compareAndSet(slaveDataSources.size(),0);
    18. return slaveDataSources.get(count.getAndIncrement()% slaveDataSources.size());
    19. }
    20. }

666. 彩蛋

没有彩蛋

没有彩

没有

下一篇,《分布式事务(一)之最大努力型》走起。老司机,赶紧上车。
道友,分享一个朋友圈可好?不然交个道姑那~~敏感词~~你。
继续阅读
阅读原文