原文地址:http://www.yunai.me/MyCAT/what-is-PreparedStatement/

MyCat-Server带注释代码
地址 :https://github.com/YunaiV/Mycat-Server

😈本系列每 1-2 周更新一篇,欢迎订阅、关注、收藏 公众号
QQ :7685413

  • 1. 概述
  • 2. JDBC Client 实现
  • 3. MyCAT Server 实现
    • 3.1 创建 PreparedStatement
    • 3.2 执行 SQL
  • 4. 彩蛋

1. 概述

相信很多同学在学习 JDBC 时,都碰到 PreparedStatementStatement。究竟该使用哪个呢?最终很可能是懵里懵懂的看了各种总结,使用 PreparedStatement。那么本文,通过 MyCAT 对 PreparedStatement 的实现对大家能够重新理解下。
本文主要分成两部分:
  1. JDBC Client 如何实现 PreparedStatement
  2. MyCAT Server 如何处理 PreparedStatement
😈 Let's Go,

2. JDBC Client 实现

首先,我们来看一段大家最喜欢复制粘贴之一的代码,JDBC PreparedStatement 查询 MySQL 数据库:
  1. publicclassPreparedStatementDemo{
  2. publicstaticvoid main(String[] args)throwsClassNotFoundException,SQLException{
  3. // 1. 获得数据库连接
  4. Class.forName("com.mysql.jdbc.Driver");
  5. Connection conn =DriverManager.getConnection("jdbc:mysql://127.0.0.1:8066/dbtest?useServerPrepStmts=true","root","123456");
  6. // PreparedStatement
  7. PreparedStatement ps = conn.prepareStatement("SELECT id, username, password FROM t_user WHERE id = ?");
  8.        ps.setLong(1,Math.abs(newRandom().nextLong()));
  9. // execute
  10.        ps.executeQuery();
  11. }
  12. }
获取 MySQL 连接时, useServerPrepStmts=true非常非常非常重要的参数。如果不配置, PreparedStatement 实际是个PreparedStatement(新版本默认为 FALSE,据说部分老版本默认为 TRUE),未开启服务端级别的 SQL 预编译。
WHY ?来看下 JDBC 里面是怎么实现的。
  1. // com.mysql.jdbc.ConnectionImpl.java
  2. publicPreparedStatement prepareStatement(String sql,int resultSetType,int resultSetConcurrency)throwsSQLException{
  3. synchronized(getConnectionMutex()){
  4.       checkClosed();
  5. PreparedStatement pStmt =null;
  6. boolean canServerPrepare =true;
  7. String nativeSql = getProcessEscapeCodesForPrepStmts()? nativeSQL(sql): sql;
  8. if(this.useServerPreparedStmts && getEmulateUnsupportedPstmts()){
  9.           canServerPrepare = canHandleAsServerPreparedStatement(nativeSql);
  10. }
  11. if(this.useServerPreparedStmts && canServerPrepare){
  12. if(this.getCachePreparedStatements()){// 从缓存中获取 pStmt
  13. synchronized(this.serverSideStatementCache){
  14.                   pStmt =(com.mysql.jdbc.ServerPreparedStatement)this.serverSideStatementCache
  15. .remove(makePreparedStatementCacheKey(this.database, sql));
  16. if(pStmt !=null){
  17. ((com.mysql.jdbc.ServerPreparedStatement) pStmt).setClosed(false);
  18.                       pStmt.clearParameters();// 清理上次留下的参数
  19. }
  20. if(pStmt ==null){
  21. // .... 省略代码 :向 Server 提交 SQL 预编译。
  22. }
  23. }
  24. }else{
  25. try{
  26. // 向 Server 提交 SQL 预编译。
  27.                   pStmt =ServerPreparedStatement.getInstance(getMultiHostSafeProxy(), nativeSql,this.database, resultSetType, resultSetConcurrency);
  28.                   pStmt.setResultSetType(resultSetType);
  29.                   pStmt.setResultSetConcurrency(resultSetConcurrency);
  30. }catch(SQLException sqlEx){
  31. // Punt, if necessary
  32. if(getEmulateUnsupportedPstmts()){
  33.                       pStmt =(PreparedStatement) clientPrepareStatement(nativeSql, resultSetType, resultSetConcurrency,false);
  34. }else{
  35. throw sqlEx;
  36. }
  37. }
  38. }
  39. }else{
  40.           pStmt =(PreparedStatement) clientPrepareStatement(nativeSql, resultSetType, resultSetConcurrency,false);
  41. }
  42. return pStmt;
  43. }
  44. }
  • 【前者】当 Client 开启 useServerPreparedStmts 并且 Server 支持 ServerPrepareClient 会向 Server 提交 SQL 预编译请求
  1. if(this.useServerPreparedStmts && canServerPrepare){
  2.    pStmt =ServerPreparedStatement.getInstance(getMultiHostSafeProxy(), nativeSql,this.database, resultSetType, resultSetConcurrency);
  3. }
  • 【后者】当 Client 未开启 useServerPreparedStmts 或者 Server 不支持 ServerPrepare,Client 创建 PreparedStatement不会向 Server 提交 SQL 预编译请求
  1. pStmt =(PreparedStatement) clientPrepareStatement(nativeSql, resultSetType, resultSetConcurrency,false);
即使这样,究竟为什么性能会更好呢?
  • 【前者】返回的 PreparedStatement 对象类是 JDBC42ServerPreparedStatement.java,后续每次执行 SQL 只需将对应占位符?对应的值提交给 Server即可,减少网络传输和 SQL 解析开销。
  • 【后者】返回的 PreparedStatement 对象类是 JDBC42PreparedStatement.java,后续每次执行 SQL 需要将完整的 SQL 提交给 Server,增加了网络传输和 SQL 解析开销。
🌚:【前者】性能一定比【后者】好吗?相信你已经有了正确的答案。

3. MyCAT Server 实现

3.1 创建 PreparedStatement

该操作对应 Client conn.prepareStatement(....)
MyCAT 接收到请求后,创建 PreparedStatement,并返回 statementId 等信息。Client 发起 SQL 执行时,需要将 statementId 带给 MyCAT。核心代码如下:
  1. // ServerPrepareHandler.java
  2. @Override
  3. publicvoid prepare(String sql){
  4. LOGGER.debug("use server prepare, sql: "+ sql);
  5. PreparedStatement pstmt = pstmtForSql.get(sql);
  6. if(pstmt ==null){// 缓存中获取
  7. // 解析获取字段个数和参数个数
  8. int columnCount = getColumnCount(sql);
  9. int paramCount = getParamCount(sql);
  10.       pstmt =newPreparedStatement(++pstmtId, sql, columnCount, paramCount);
  11.       pstmtForSql.put(pstmt.getStatement(), pstmt);
  12.       pstmtForId.put(pstmt.getId(), pstmt);
  13. }
  14. PreparedStmtResponse.response(pstmt, source);
  15. }
  16. // PreparedStmtResponse.java
  17. publicstaticvoid response(PreparedStatement pstmt,FrontendConnection c){
  18. byte packetId =0;
  19. // write preparedOk packet
  20. PreparedOkPacket preparedOk =newPreparedOkPacket();
  21.   preparedOk.packetId =++packetId;
  22.   preparedOk.statementId = pstmt.getId();
  23.   preparedOk.columnsNumber = pstmt.getColumnsNumber();
  24.   preparedOk.parametersNumber = pstmt.getParametersNumber();
  25. ByteBuffer buffer = preparedOk.write(c.allocate(), c,true);
  26. // write parameter field packet
  27. int parametersNumber = preparedOk.parametersNumber;
  28. if(parametersNumber >0){
  29. for(int i =0; i < parametersNumber; i++){
  30. FieldPacket field =newFieldPacket();
  31.           field.packetId =++packetId;
  32.           buffer = field.write(buffer, c,true);
  33. }
  34. EOFPacket eof =newEOFPacket();
  35.       eof.packetId =++packetId;
  36.       buffer = eof.write(buffer, c,true);
  37. }
  38. // write column field packet
  39. int columnsNumber = preparedOk.columnsNumber;
  40. if(columnsNumber >0){
  41. for(int i =0; i < columnsNumber; i++){
  42. FieldPacket field =newFieldPacket();
  43.           field.packetId =++packetId;
  44.           buffer = field.write(buffer, c,true);
  45. }
  46. EOFPacket eof =newEOFPacket();
  47.       eof.packetId =++packetId;
  48.       buffer = eof.write(buffer, c,true);
  49. }
  50. // send buffer
  51.   c.write(buffer);
  52. }
每个连接之间,PreparedStatement 不共享,即不同连接,即使 SQL相同,对应的 PreparedStatement 不同。

3.2 执行 SQL

该操作对应 Client conn.execute(....)
MyCAT 接收到请求后,将 PreparedStatement 使用请求的参数格式化成可执行的 SQL 进行执行。伪代码如下:
  1. String sql = pstmt.sql.format(request.params);
  2. execute(sql);
核心代码如下:
  1. // ServerPrepareHandler.java
  2. @Override
  3. publicvoid execute(byte[] data){
  4. long pstmtId =ByteUtil.readUB4(data,5);
  5. PreparedStatement pstmt =null;
  6. if((pstmt = pstmtForId.get(pstmtId))==null){
  7.       source.writeErrMessage(ErrorCode.ER_ERROR_WHEN_EXECUTING_COMMAND,"Unknown pstmtId when executing.");
  8. }else{
  9. // 参数读取
  10. ExecutePacket packet =newExecutePacket(pstmt);
  11. try{
  12.           packet.read(data, source.getCharset());
  13. }catch(UnsupportedEncodingException e){
  14.           source.writeErrMessage(ErrorCode.ER_ERROR_WHEN_EXECUTING_COMMAND, e.getMessage());
  15. return;
  16. }
  17. BindValue[] bindValues = packet.values;
  18. // 还原sql中的动态参数为实际参数值
  19. String sql = prepareStmtBindValue(pstmt, bindValues);
  20. // 执行sql
  21.       source.getSession2().setPrepared(true);
  22.       source.query(sql);
  23. }
  24. }
  25. privateString prepareStmtBindValue(PreparedStatement pstmt,BindValue[] bindValues){
  26. String sql = pstmt.getStatement();
  27. int[] paramTypes = pstmt.getParametersType();
  28. StringBuilder sb =newStringBuilder();
  29. int idx =0;
  30. for(int i =0, len = sql.length(); i < len; i++){
  31. char c = sql.charAt(i);
  32. if(c !='?'){
  33.           sb.append(c);
  34. continue;
  35. }
  36. // 处理占位符?
  37. int paramType = paramTypes[idx];
  38. BindValue bindValue = bindValues[idx];
  39.       idx++;
  40. // 处理字段为空的情况
  41. if(bindValue.isNull){
  42.           sb.append("NULL");
  43. continue;
  44. }
  45. // 非空情况, 根据字段类型获取值
  46. switch(paramType &0xff){
  47. caseFields.FIELD_TYPE_TINY:
  48.               sb.append(String.valueOf(bindValue.byteBinding));
  49. break;
  50. caseFields.FIELD_TYPE_SHORT:
  51.               sb.append(String.valueOf(bindValue.shortBinding));
  52. break;
  53. caseFields.FIELD_TYPE_LONG:
  54.               sb.append(String.valueOf(bindValue.intBinding));
  55. break;
  56. // .... 省略非核心代码
  57. }
  58. }
  59. return sb.toString();
  60. }

4. 彩蛋

💯 看到此处是不是真爱?!反正我信了。

给老铁们额外加个🍗。
细心的同学们可能已经注意到 JDBC Client 是支持缓存 PreparedStatement,无需每次都让 Server 进行创建。
当配置 MySQL 数据连接 cachePrepStmts=true 时开启 Client 级别的缓存。But,此处的缓存又和一般的缓存不一样,是使用 remove 的方式获得的,并且创建好 PreparedStatement 时也不添加到缓存。那什么时候添加缓存呢?在 pstmt.close() 时,并且pstmt 是通过缓存获取时,添加到缓存。核心代码如下:
  1. // ServerPreparedStatement.java
  2. publicvoid close()throwsSQLException{
  3. MySQLConnection locallyScopedConn =this.connection;
  4. if(locallyScopedConn ==null){
  5. return;// already closed
  6. }
  7. synchronized(locallyScopedConn.getConnectionMutex()){
  8. if(this.isCached && isPoolable()&&!this.isClosed){
  9.           clearParameters();
  10. this.isClosed =true;
  11. this.connection.recachePreparedStatement(this);
  12. return;
  13. }
  14.       realClose(true,true);
  15. }
  16. }
  17. // ConnectionImpl.java
  18. publicvoid recachePreparedStatement(ServerPreparedStatement pstmt)throwsSQLException{
  19. synchronized(getConnectionMutex()){
  20. if(getCachePreparedStatements()&& pstmt.isPoolable()){
  21. synchronized(this.serverSideStatementCache){
  22. this.serverSideStatementCache.put(makePreparedStatementCacheKey(pstmt.currentCatalog, pstmt.originalSql), pstmt);
  23. }
  24. }
  25. }
  26. }
为什么要这么实现? PreparedStatement 是有状态的变量,我们会去 setXXX(pos,value),一旦多线程共享,会导致错乱。
🗿 这个“彩蛋”还满意么?请关注我的公众号:芋艿的后端小屋。下一篇更新:《MyCAT源码解析 —— MongoDB》,极大可能就在本周噢。
另外推荐一篇文章:《JDBC PreparedStatement》。
继续阅读
阅读原文