This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 70d27b2  Add StatementManager and cache statements for 
ShardingSpherePreparedStatement (#15934)
70d27b2 is described below

commit 70d27b2eef06479b278498bd0b42f17321bb2bea
Author: Zhengqiang Duan <[email protected]>
AuthorDate: Thu Mar 10 09:57:28 2022 +0800

    Add StatementManager and cache statements for 
ShardingSpherePreparedStatement (#15934)
---
 .../driver/DriverExecutionPrepareEngine.java       | 15 ++--
 ...Builder.java => ExecutorConnectionManager.java} | 27 +++++--
 ...rManager.java => ExecutorStatementManager.java} | 22 ++----
 .../prepare/driver/SQLExecutionUnitBuilder.java    |  2 +-
 ...ger.java => ExecutorJDBCConnectionManager.java} |  7 +-
 ...ager.java => ExecutorJDBCStatementManager.java} |  6 +-
 .../jdbc/builder/JDBCExecutionUnitBuilder.java     |  4 +-
 .../PreparedStatementExecutionUnitBuilder.java     | 11 ++-
 .../builder/StatementExecutionUnitBuilder.java     | 10 +--
 .../ExecutorVertxConnectionManager.java}           | 13 ++--
 ...ger.java => ExecutorVertxStatementManager.java} |  6 +-
 .../builder/PreparedQueryExecutionUnitBuilder.java |  6 +-
 .../vertx/builder/VertxExecutionUnitBuilder.java   |  4 +-
 .../jdbc/adapter/AbstractStatementAdapter.java     |  6 ++
 .../jdbc/core/connection/ConnectionManager.java    | 21 +----
 .../statement/ShardingSpherePreparedStatement.java | 23 +++---
 .../core/statement/ShardingSphereStatement.java    | 10 ++-
 .../jdbc/core/statement/StatementManager.java      | 79 +++++++++++++++++++
 .../statement/CircuitBreakerPreparedStatement.java |  6 ++
 .../backend/communication/ProxySQLExecutor.java    |  4 +-
 .../communication/ReactiveProxySQLExecutor.java    |  4 +-
 .../jdbc/JDBCDatabaseCommunicationEngine.java      |  8 +-
 .../jdbc/connection/JDBCBackendConnection.java     | 51 +------------
 .../jdbc/statement/JDBCBackendStatement.java       | 89 ++++++++++++++++++++++
 .../vertx/VertxBackendConnection.java              | 19 +----
 .../communication/vertx/VertxBackendStatement.java | 24 +++---
 .../proxy/backend/session/ConnectionSession.java   | 14 ++++
 .../ral/advanced/PreviewDistSQLBackendHandler.java |  5 +-
 .../jdbc/connection/JDBCBackendConnectionTest.java | 18 ++---
 .../bind/OpenGaussComBatchBindExecutorTest.java    |  7 +-
 .../extended/PostgreSQLBatchedInsertsExecutor.java |  3 +-
 ...ggregatedBatchedInsertsCommandExecutorTest.java |  8 +-
 32 files changed, 329 insertions(+), 203 deletions(-)

diff --git 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/DriverExecutionPrepareEngine.java
 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/DriverExecutionPrepareEngine.java
index d900995..61b7b48 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/DriverExecutionPrepareEngine.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/DriverExecutionPrepareEngine.java
@@ -46,7 +46,9 @@ public final class DriverExecutionPrepareEngine<T extends 
DriverExecutionUnit<?>
     @SuppressWarnings("rawtypes")
     private static final Map<String, SQLExecutionUnitBuilder> 
TYPE_TO_BUILDER_MAP = new ConcurrentHashMap<>(8, 1);
     
-    private final ExecutorDriverManager<C, ?, ?> executorDriverManager;
+    private final ExecutorConnectionManager<C> connectionManager;
+    
+    private final ExecutorStatementManager<C, ?, ?> statementManager;
     
     private final StorageResourceOption option;
     
@@ -57,10 +59,11 @@ public final class DriverExecutionPrepareEngine<T extends 
DriverExecutionUnit<?>
         ShardingSphereServiceLoader.register(SQLExecutionUnitBuilder.class);
     }
     
-    public DriverExecutionPrepareEngine(final String type, final int 
maxConnectionsSizePerQuery, final ExecutorDriverManager<C, ?, ?> 
executorDriverManager, 
-                                        final StorageResourceOption option, 
final Collection<ShardingSphereRule> rules) {
+    public DriverExecutionPrepareEngine(final String type, final int 
maxConnectionsSizePerQuery, final ExecutorConnectionManager<C> 
connectionManager, 
+                                        final ExecutorStatementManager<C, ?, 
?> statementManager, final StorageResourceOption option, final 
Collection<ShardingSphereRule> rules) {
         super(maxConnectionsSizePerQuery, rules);
-        this.executorDriverManager = executorDriverManager;
+        this.connectionManager = connectionManager;
+        this.statementManager = statementManager;
         this.option = option;
         sqlExecutionUnitBuilder = getCachedSqlExecutionUnitBuilder(type);
     }
@@ -83,7 +86,7 @@ public final class DriverExecutionPrepareEngine<T extends 
DriverExecutionUnit<?>
     @Override
     protected List<ExecutionGroup<T>> group(final String dataSourceName, final 
List<List<SQLUnit>> sqlUnitGroups, final ConnectionMode connectionMode) throws 
SQLException {
         List<ExecutionGroup<T>> result = new LinkedList<>();
-        List<C> connections = 
executorDriverManager.getConnections(dataSourceName, sqlUnitGroups.size(), 
connectionMode);
+        List<C> connections = connectionManager.getConnections(dataSourceName, 
sqlUnitGroups.size(), connectionMode);
         int count = 0;
         for (List<SQLUnit> each : sqlUnitGroups) {
             result.add(createExecutionGroup(dataSourceName, each, 
connections.get(count++), connectionMode));
@@ -95,7 +98,7 @@ public final class DriverExecutionPrepareEngine<T extends 
DriverExecutionUnit<?>
     private ExecutionGroup<T> createExecutionGroup(final String 
dataSourceName, final List<SQLUnit> sqlUnits, final C connection, final 
ConnectionMode connectionMode) throws SQLException {
         List<T> result = new LinkedList<>();
         for (SQLUnit each : sqlUnits) {
-            result.add((T) sqlExecutionUnitBuilder.build(new 
ExecutionUnit(dataSourceName, each), executorDriverManager, connection, 
connectionMode, option));
+            result.add((T) sqlExecutionUnitBuilder.build(new 
ExecutionUnit(dataSourceName, each), statementManager, connection, 
connectionMode, option));
         }
         return new ExecutionGroup<>(result);
     }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/JDBCExecutionUnitBuilder.java
 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/ExecutorConnectionManager.java
similarity index 61%
copy from 
shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/JDBCExecutionUnitBuilder.java
copy to 
shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/ExecutorConnectionManager.java
index 5f97b45..6bf5600 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/JDBCExecutionUnitBuilder.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/ExecutorConnectionManager.java
@@ -15,17 +15,28 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.builder;
+package org.apache.shardingsphere.infra.executor.sql.prepare.driver;
 
-import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
-import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.SQLExecutionUnitBuilder;
-import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCManager;
-import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
 
-import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.List;
 
 /**
- * JDBC execution unit builder.
+ * Executor connection manager.
+ * 
+ * @param <C> type of resource connection
  */
-public interface JDBCExecutionUnitBuilder extends 
SQLExecutionUnitBuilder<JDBCExecutionUnit, ExecutorJDBCManager, Connection, 
StatementOption> {
+public interface ExecutorConnectionManager<C> {
+    
+    /**
+     * Get connections.
+     *
+     * @param dataSourceName data source name
+     * @param connectionSize connection size
+     * @param connectionMode connection mode
+     * @return connections
+     * @throws SQLException SQL exception
+     */
+    List<C> getConnections(String dataSourceName, int connectionSize, 
ConnectionMode connectionMode) throws SQLException;
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/ExecutorDriverManager.java
 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/ExecutorStatementManager.java
similarity index 71%
rename from 
shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/ExecutorDriverManager.java
rename to 
shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/ExecutorStatementManager.java
index 344775a..e3483e5 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/ExecutorDriverManager.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/ExecutorStatementManager.java
@@ -17,30 +17,19 @@
 
 package org.apache.shardingsphere.infra.executor.sql.prepare.driver;
 
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
 
 import java.sql.SQLException;
-import java.util.List;
 
 /**
- * Executor driver manager.
+ * Executor statement manager.
  * 
  * @param <C> type of resource connection
  * @param <R> type of storage resource
  * @param <O> type of storage resource option
  */
-public interface ExecutorDriverManager<C, R, O extends StorageResourceOption> {
-    
-    /**
-     * Get connections.
-     *
-     * @param dataSourceName data source name
-     * @param connectionSize connection size
-     * @param connectionMode connection mode
-     * @return connections
-     * @throws SQLException SQL exception
-     */
-    List<C> getConnections(String dataSourceName, int connectionSize, 
ConnectionMode connectionMode) throws SQLException;
+public interface ExecutorStatementManager<C, R, O extends 
StorageResourceOption> {
     
     /**
      * Create storage resource.
@@ -56,13 +45,12 @@ public interface ExecutorDriverManager<C, R, O extends 
StorageResourceOption> {
     /**
      * Create storage resource.
      *
-     * @param sql SQL
-     * @param parameters SQL parameters
+     * @param executionUnit execution unit
      * @param connection connection
      * @param connectionMode connection mode
      * @param option storage resource option
      * @return storage resource
      * @throws SQLException SQL exception
      */
-    R createStorageResource(String sql, List<Object> parameters, C connection, 
ConnectionMode connectionMode, O option) throws SQLException;
+    R createStorageResource(ExecutionUnit executionUnit, C connection, 
ConnectionMode connectionMode, O option) throws SQLException;
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/SQLExecutionUnitBuilder.java
 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/SQLExecutionUnitBuilder.java
index 3ead2a1..38828f5 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/SQLExecutionUnitBuilder.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/SQLExecutionUnitBuilder.java
@@ -32,7 +32,7 @@ import java.sql.SQLException;
  * @param <C> type of resource connection
  * @param <O> type of storage resource option
  */
-public interface SQLExecutionUnitBuilder<T extends DriverExecutionUnit<?>, M 
extends ExecutorDriverManager<C, ?, O>, C, O extends StorageResourceOption> 
extends TypedSPI {
+public interface SQLExecutionUnitBuilder<T extends DriverExecutionUnit<?>, M 
extends ExecutorStatementManager<C, ?, O>, C, O extends StorageResourceOption> 
extends TypedSPI {
     
     /**
      * Build SQL execution unit.
diff --git 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/ExecutorJDBCManager.java
 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/ExecutorJDBCConnectionManager.java
similarity index 83%
copy from 
shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/ExecutorJDBCManager.java
copy to 
shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/ExecutorJDBCConnectionManager.java
index 37b2056..5f0bbee 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/ExecutorJDBCManager.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/ExecutorJDBCConnectionManager.java
@@ -17,13 +17,12 @@
 
 package org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc;
 
-import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.ExecutorDriverManager;
+import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.ExecutorConnectionManager;
 
 import java.sql.Connection;
-import java.sql.Statement;
 
 /**
- * Executor JDBC driver manager.
+ * Executor JDBC connection manager.
  */
-public interface ExecutorJDBCManager extends ExecutorDriverManager<Connection, 
Statement, StatementOption> {
+public interface ExecutorJDBCConnectionManager extends 
ExecutorConnectionManager<Connection> {
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/ExecutorJDBCManager.java
 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/ExecutorJDBCStatementManager.java
similarity index 84%
copy from 
shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/ExecutorJDBCManager.java
copy to 
shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/ExecutorJDBCStatementManager.java
index 37b2056..b5b907d 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/ExecutorJDBCManager.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/ExecutorJDBCStatementManager.java
@@ -17,13 +17,13 @@
 
 package org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc;
 
-import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.ExecutorDriverManager;
+import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.ExecutorStatementManager;
 
 import java.sql.Connection;
 import java.sql.Statement;
 
 /**
- * Executor JDBC driver manager.
+ * Executor JDBC statement manager.
  */
-public interface ExecutorJDBCManager extends ExecutorDriverManager<Connection, 
Statement, StatementOption> {
+public interface ExecutorJDBCStatementManager extends 
ExecutorStatementManager<Connection, Statement, StatementOption> {
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/JDBCExecutionUnitBuilder.java
 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/JDBCExecutionUnitBuilder.java
index 5f97b45..387585a 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/JDBCExecutionUnitBuilder.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/JDBCExecutionUnitBuilder.java
@@ -19,7 +19,7 @@ package 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.builder
 
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.SQLExecutionUnitBuilder;
-import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCManager;
+import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCStatementManager;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
 
 import java.sql.Connection;
@@ -27,5 +27,5 @@ import java.sql.Connection;
 /**
  * JDBC execution unit builder.
  */
-public interface JDBCExecutionUnitBuilder extends 
SQLExecutionUnitBuilder<JDBCExecutionUnit, ExecutorJDBCManager, Connection, 
StatementOption> {
+public interface JDBCExecutionUnitBuilder extends 
SQLExecutionUnitBuilder<JDBCExecutionUnit, ExecutorJDBCStatementManager, 
Connection, StatementOption> {
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/PreparedStatementExecutionUnitBuilder.java
 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/PreparedStatementExecutionUnitBuilder.java
index 3fbe911..4a82a8e 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/PreparedStatementExecutionUnitBuilder.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/PreparedStatementExecutionUnitBuilder.java
@@ -20,14 +20,13 @@ package 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.builder
 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
-import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCManager;
+import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCStatementManager;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
 
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
-import java.util.List;
 
 /**
  * JDBC prepared statement execution unit builder.
@@ -35,16 +34,16 @@ import java.util.List;
 public final class PreparedStatementExecutionUnitBuilder implements 
JDBCExecutionUnitBuilder {
     
     @Override
-    public JDBCExecutionUnit build(final ExecutionUnit executionUnit, final 
ExecutorJDBCManager executorManager,
+    public JDBCExecutionUnit build(final ExecutionUnit executionUnit, final 
ExecutorJDBCStatementManager statementManager,
                                    final Connection connection, final 
ConnectionMode connectionMode, final StatementOption option) throws 
SQLException {
         PreparedStatement preparedStatement = createPreparedStatement(
-                executionUnit.getSqlUnit().getSql(), 
executionUnit.getSqlUnit().getParameters(), executorManager, connection, 
connectionMode, option);
+                executionUnit, statementManager, connection, connectionMode, 
option);
         return new JDBCExecutionUnit(executionUnit, connectionMode, 
preparedStatement);
     }
     
-    private PreparedStatement createPreparedStatement(final String sql, final 
List<Object> parameters, final ExecutorJDBCManager executorJDBCManager, final 
Connection connection,
+    private PreparedStatement createPreparedStatement(final ExecutionUnit 
executionUnit, final ExecutorJDBCStatementManager statementManager, final 
Connection connection,
                                                       final ConnectionMode 
connectionMode, final StatementOption option) throws SQLException {
-        return (PreparedStatement) 
executorJDBCManager.createStorageResource(sql, parameters, connection, 
connectionMode, option);
+        return (PreparedStatement) 
statementManager.createStorageResource(executionUnit, connection, 
connectionMode, option);
     }
     
     @Override
diff --git 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/StatementExecutionUnitBuilder.java
 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/StatementExecutionUnitBuilder.java
index a551a93..7af040c 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/StatementExecutionUnitBuilder.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/StatementExecutionUnitBuilder.java
@@ -20,7 +20,7 @@ package 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.builder
 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
-import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCManager;
+import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCStatementManager;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
 
@@ -34,14 +34,14 @@ import java.sql.Statement;
 public final class StatementExecutionUnitBuilder implements 
JDBCExecutionUnitBuilder {
     
     @Override
-    public JDBCExecutionUnit build(final ExecutionUnit executionUnit, final 
ExecutorJDBCManager executorManager,
+    public JDBCExecutionUnit build(final ExecutionUnit executionUnit, final 
ExecutorJDBCStatementManager statementManager,
                                    final Connection connection, final 
ConnectionMode connectionMode, final StatementOption option) throws 
SQLException {
-        return new JDBCExecutionUnit(executionUnit, connectionMode, 
createStatement(executorManager, connection, connectionMode, option));
+        return new JDBCExecutionUnit(executionUnit, connectionMode, 
createStatement(statementManager, connection, connectionMode, option));
     }
     
-    private Statement createStatement(final ExecutorJDBCManager 
executorJDBCManager, final Connection connection,
+    private Statement createStatement(final ExecutorJDBCStatementManager 
statementManager, final Connection connection,
                                       final ConnectionMode connectionMode, 
final StatementOption option) throws SQLException {
-        return executorJDBCManager.createStorageResource(connection, 
connectionMode, option);
+        return statementManager.createStorageResource(connection, 
connectionMode, option);
     }
     
     @Override
diff --git 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/ExecutorJDBCManager.java
 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/ExecutorVertxConnectionManager.java
similarity index 78%
rename from 
shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/ExecutorJDBCManager.java
rename to 
shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/ExecutorVertxConnectionManager.java
index 37b2056..b917d84 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/ExecutorJDBCManager.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/ExecutorVertxConnectionManager.java
@@ -15,15 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc;
+package org.apache.shardingsphere.infra.executor.sql.prepare.driver.vertx;
 
-import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.ExecutorDriverManager;
-
-import java.sql.Connection;
-import java.sql.Statement;
+import io.vertx.core.Future;
+import io.vertx.sqlclient.SqlClient;
+import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.ExecutorConnectionManager;
 
 /**
- * Executor JDBC driver manager.
+ * Executor connection manager for Vert.x.
  */
-public interface ExecutorJDBCManager extends ExecutorDriverManager<Connection, 
Statement, StatementOption> {
+public interface ExecutorVertxConnectionManager extends 
ExecutorConnectionManager<Future<? extends SqlClient>> {
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/ExecutorVertxManager.java
 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/ExecutorVertxStatementManager.java
similarity index 82%
rename from 
shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/ExecutorVertxManager.java
rename to 
shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/ExecutorVertxStatementManager.java
index 500c8a1..16fbc80 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/ExecutorVertxManager.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/ExecutorVertxStatementManager.java
@@ -22,10 +22,10 @@ import io.vertx.sqlclient.Query;
 import io.vertx.sqlclient.Row;
 import io.vertx.sqlclient.RowSet;
 import io.vertx.sqlclient.SqlClient;
-import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.ExecutorDriverManager;
+import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.ExecutorStatementManager;
 
 /**
- * Executor manager for Vert.x.
+ * Executor statement manager for Vert.x.
  */
-public interface ExecutorVertxManager extends ExecutorDriverManager<Future<? 
extends SqlClient>, Future<Query<RowSet<Row>>>, VertxExecutionContext> {
+public interface ExecutorVertxStatementManager extends 
ExecutorStatementManager<Future<? extends SqlClient>, 
Future<Query<RowSet<Row>>>, VertxExecutionContext> {
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/builder/PreparedQueryExecutionUnitBuilder.java
 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/builder/PreparedQueryExecutionUnitBuilder.java
index df0c038..8e9e2ab 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/builder/PreparedQueryExecutionUnitBuilder.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/builder/PreparedQueryExecutionUnitBuilder.java
@@ -22,7 +22,7 @@ import io.vertx.sqlclient.SqlClient;
 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.vertx.VertxExecutionUnit;
-import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.vertx.ExecutorVertxManager;
+import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.vertx.ExecutorVertxStatementManager;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.vertx.VertxExecutionContext;
 
 import java.sql.SQLException;
@@ -33,8 +33,8 @@ import java.sql.SQLException;
 public final class PreparedQueryExecutionUnitBuilder implements 
VertxExecutionUnitBuilder {
     
     @Override
-    public VertxExecutionUnit build(final ExecutionUnit executionUnit, final 
ExecutorVertxManager executorManager, final Future<? extends SqlClient> 
connection, final ConnectionMode connectionMode,
-                                    final VertxExecutionContext option) throws 
SQLException {
+    public VertxExecutionUnit build(final ExecutionUnit executionUnit, final 
ExecutorVertxStatementManager statementManager, 
+                                    final Future<? extends SqlClient> 
connection, final ConnectionMode connectionMode, final VertxExecutionContext 
option) throws SQLException {
         return new VertxExecutionUnit(executionUnit, connectionMode, 
connection.compose(sqlClient -> 
Future.succeededFuture(sqlClient.preparedQuery(executionUnit.getSqlUnit().getSql()))));
     }
     
diff --git 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/builder/VertxExecutionUnitBuilder.java
 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/builder/VertxExecutionUnitBuilder.java
index 95a8f2f..80944d7 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/builder/VertxExecutionUnitBuilder.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/builder/VertxExecutionUnitBuilder.java
@@ -21,11 +21,11 @@ import io.vertx.core.Future;
 import io.vertx.sqlclient.SqlClient;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.vertx.VertxExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.SQLExecutionUnitBuilder;
-import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.vertx.ExecutorVertxManager;
+import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.vertx.ExecutorVertxStatementManager;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.vertx.VertxExecutionContext;
 
 /**
  * Execution unit builder for Vert.x.
  */
-public interface VertxExecutionUnitBuilder extends 
SQLExecutionUnitBuilder<VertxExecutionUnit, ExecutorVertxManager, Future<? 
extends SqlClient>, VertxExecutionContext> {
+public interface VertxExecutionUnitBuilder extends 
SQLExecutionUnitBuilder<VertxExecutionUnit, ExecutorVertxStatementManager, 
Future<? extends SqlClient>, VertxExecutionContext> {
 }
diff --git 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
index ec22fde..a97982d 100644
--- 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
+++ 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
@@ -22,6 +22,7 @@ import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.driver.executor.DriverExecutor;
 import 
org.apache.shardingsphere.driver.jdbc.adapter.executor.ForceExecuteTemplate;
 import 
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
+import org.apache.shardingsphere.driver.jdbc.core.statement.StatementManager;
 import 
org.apache.shardingsphere.driver.jdbc.unsupported.AbstractUnsupportedOperationStatement;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import 
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
@@ -189,6 +190,9 @@ public abstract class AbstractStatementAdapter extends 
AbstractUnsupportedOperat
             if (null != getExecutor()) {
                 getExecutor().close();
             }
+            if (null != getStatementManager()) {
+                getStatementManager().close();
+            }
         } finally {
             getRoutedStatements().clear();
         }
@@ -208,4 +212,6 @@ public abstract class AbstractStatementAdapter extends 
AbstractUnsupportedOperat
     protected abstract Collection<? extends Statement> getRoutedStatements();
     
     protected abstract DriverExecutor getExecutor();
+    
+    protected abstract StatementManager getStatementManager();
 }
diff --git 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ConnectionManager.java
 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ConnectionManager.java
index 24b13b5..1efb82a 100644
--- 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ConnectionManager.java
+++ 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ConnectionManager.java
@@ -28,8 +28,7 @@ import 
org.apache.shardingsphere.driver.jdbc.adapter.invocation.MethodInvocation
 import 
org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCreator;
 import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
-import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCManager;
-import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
+import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCConnectionManager;
 import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
 import org.apache.shardingsphere.infra.instance.definition.InstanceId;
 import org.apache.shardingsphere.infra.instance.definition.InstanceType;
@@ -45,9 +44,7 @@ import 
org.apache.shardingsphere.transaction.rule.TransactionRule;
 import javax.sql.DataSource;
 import java.security.SecureRandom;
 import java.sql.Connection;
-import java.sql.PreparedStatement;
 import java.sql.SQLException;
-import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -60,7 +57,7 @@ import java.util.Random;
 /**
  * Connection manager.
  */
-public final class ConnectionManager implements ExecutorJDBCManager, 
AutoCloseable {
+public final class ConnectionManager implements ExecutorJDBCConnectionManager, 
AutoCloseable {
     
     private final Map<String, DataSource> dataSourceMap = new 
LinkedHashMap<>();
     
@@ -312,20 +309,6 @@ public final class ConnectionManager implements 
ExecutorJDBCManager, AutoCloseab
         return physicalDataSourceMap.containsKey(dataSourceName);
     }
     
-    @SuppressWarnings("MagicConstant")
-    @Override
-    public Statement createStorageResource(final Connection connection, final 
ConnectionMode connectionMode, final StatementOption option) throws 
SQLException {
-        return connection.createStatement(option.getResultSetType(), 
option.getResultSetConcurrency(), option.getResultSetHoldability());
-    }
-    
-    @SuppressWarnings("MagicConstant")
-    @Override
-    public PreparedStatement createStorageResource(final String sql, final 
List<Object> parameters,
-                                                   final Connection 
connection, final ConnectionMode connectionMode, final StatementOption option) 
throws SQLException {
-        return option.isReturnGeneratedKeys() ? 
connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)
-                : connection.prepareStatement(sql, option.getResultSetType(), 
option.getResultSetConcurrency(), option.getResultSetHoldability());
-    }
-    
     @Override
     public void close() throws SQLException {
         try {
diff --git 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index 74c260d..3036dd0 100644
--- 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++ 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -133,14 +133,17 @@ public final class ShardingSpherePreparedStatement 
extends AbstractPreparedState
     
     private final boolean statementsCacheable;
     
+    private final TrafficRule trafficRule;
+    
+    @Getter(AccessLevel.PROTECTED)
+    private final StatementManager statementManager;
+    
     private ExecutionContext executionContext;
     
     private ResultSet currentResultSet;
     
     private TrafficContext trafficContext;
     
-    private TrafficRule trafficRule;
-    
     public ShardingSpherePreparedStatement(final ShardingSphereConnection 
connection, final String sql) throws SQLException {
         this(connection, sql, ResultSet.TYPE_FORWARD_ONLY, 
ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT, false);
     }
@@ -182,6 +185,7 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
         kernelProcessor = new KernelProcessor();
         statementsCacheable = 
isStatementsCacheable(metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getConfigurations());
         trafficRule = 
metaDataContexts.getGlobalRuleMetaData().findSingleRule(TrafficRule.class).orElse(null);
+        statementManager = new StatementManager();
     }
     
     private boolean isStatementsCacheable(final Collection<RuleConfiguration> 
configurations) {
@@ -282,7 +286,7 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
     
     private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
createDriverExecutionPrepareEngine() {
         int maxConnectionsSizePerQuery = 
metaDataContexts.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
-        return new 
DriverExecutionPrepareEngine<>(JDBCDriverType.PREPARED_STATEMENT, 
maxConnectionsSizePerQuery, connection.getConnectionManager(), 
+        return new 
DriverExecutionPrepareEngine<>(JDBCDriverType.PREPARED_STATEMENT, 
maxConnectionsSizePerQuery, connection.getConnectionManager(), 
statementManager,  
                 statementOption, 
metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules());
     }
     
@@ -504,8 +508,8 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
         }
     }
     
-    private void clearPrevious() throws SQLException {
-        clearStatements();
+    private void clearPrevious() {
+        statements.clear();
         parameterSets.clear();
     }
     
@@ -561,7 +565,7 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
     private void initBatchPreparedStatementExecutor() throws SQLException {
         DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
prepareEngine = new DriverExecutionPrepareEngine<>(
                 JDBCDriverType.PREPARED_STATEMENT, 
metaDataContexts.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY),
-                connection.getConnectionManager(), statementOption, 
metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules());
+                connection.getConnectionManager(), statementManager, 
statementOption, 
metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules());
         List<ExecutionUnit> executionUnits = new 
ArrayList<>(batchPreparedStatementExecutor.getBatchExecutionUnits().size());
         for (BatchExecutionUnit each : 
batchPreparedStatementExecutor.getBatchExecutionUnits()) {
             ExecutionUnit executionUnit = each.getExecutionUnit();
@@ -615,11 +619,4 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
     public Collection<PreparedStatement> getRoutedStatements() {
         return statements;
     }
-    
-    private void clearStatements() throws SQLException {
-        for (Statement each : statements) {
-            each.close();
-        }
-        statements.clear();
-    }
 }
diff --git 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index d3cdb60..2f0c3f1 100644
--- 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++ 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -109,6 +109,11 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
     
     private final KernelProcessor kernelProcessor;
     
+    private final TrafficRule trafficRule;
+    
+    @Getter(AccessLevel.PROTECTED)
+    private final StatementManager statementManager;
+    
     private boolean returnGeneratedKeys;
     
     private ExecutionContext executionContext;
@@ -117,8 +122,6 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
     
     private TrafficContext trafficContext;
     
-    private TrafficRule trafficRule;
-    
     public ShardingSphereStatement(final ShardingSphereConnection connection) {
         this(connection, ResultSet.TYPE_FORWARD_ONLY, 
ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);
     }
@@ -136,6 +139,7 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         executor = new DriverExecutor(connection);
         kernelProcessor = new KernelProcessor();
         trafficRule = 
metaDataContexts.getGlobalRuleMetaData().findSingleRule(TrafficRule.class).orElse(null);
+        statementManager = new StatementManager();
     }
     
     @Override
@@ -209,7 +213,7 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
     
     private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
createDriverExecutionPrepareEngine() {
         int maxConnectionsSizePerQuery = 
metaDataContexts.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
-        return new DriverExecutionPrepareEngine<>(JDBCDriverType.STATEMENT, 
maxConnectionsSizePerQuery, connection.getConnectionManager(), 
+        return new DriverExecutionPrepareEngine<>(JDBCDriverType.STATEMENT, 
maxConnectionsSizePerQuery, connection.getConnectionManager(), statementManager,
                 statementOption, 
metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules());
     }
     
diff --git 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/StatementManager.java
 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/StatementManager.java
new file mode 100644
index 0000000..53c1149
--- /dev/null
+++ 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/StatementManager.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.driver.jdbc.core.statement;
+
+import lombok.EqualsAndHashCode;
+import lombok.RequiredArgsConstructor;
+import 
org.apache.shardingsphere.driver.jdbc.adapter.executor.ForceExecuteTemplate;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
+import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCStatementManager;
+import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Statement manager.
+ */
+public final class StatementManager implements ExecutorJDBCStatementManager, 
AutoCloseable {
+    
+    private final Map<CacheKey, Statement> cachedStatements = new 
ConcurrentHashMap<>();
+    
+    private final ForceExecuteTemplate<Statement> forceExecuteTemplate = new 
ForceExecuteTemplate<>();
+    
+    @SuppressWarnings("MagicConstant")
+    @Override
+    public Statement createStorageResource(final Connection connection, final 
ConnectionMode connectionMode, final StatementOption option) throws 
SQLException {
+        return connection.createStatement(option.getResultSetType(), 
option.getResultSetConcurrency(), option.getResultSetHoldability());
+    }
+    
+    @SuppressWarnings("MagicConstant")
+    @Override
+    public Statement createStorageResource(final ExecutionUnit executionUnit, 
final Connection connection, final ConnectionMode connectionMode, final 
StatementOption option) throws SQLException {
+        Statement result = cachedStatements.get(new CacheKey(executionUnit, 
connectionMode));
+        if (null == result) {
+            String sql = executionUnit.getSqlUnit().getSql();
+            result = option.isReturnGeneratedKeys() ? 
connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS) 
+                    : connection.prepareStatement(sql, 
option.getResultSetType(), option.getResultSetConcurrency(), 
option.getResultSetHoldability());
+            cachedStatements.put(new CacheKey(executionUnit, connectionMode), 
result);
+        }
+        return result;
+    }
+    
+    @Override
+    public void close() throws SQLException {
+        try {
+            forceExecuteTemplate.execute(cachedStatements.values(), 
Statement::close);
+        } finally {
+            cachedStatements.clear();
+        }
+    }
+    
+    @RequiredArgsConstructor
+    @EqualsAndHashCode
+    private static final class CacheKey {
+    
+        private final ExecutionUnit executionUnit;
+        
+        private final ConnectionMode connectionMode;
+    }
+}
diff --git 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
index 86e5ee7..0948738 100644
--- 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
+++ 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
@@ -19,6 +19,7 @@ package 
org.apache.shardingsphere.driver.state.circuit.statement;
 
 import lombok.Getter;
 import org.apache.shardingsphere.driver.executor.DriverExecutor;
+import org.apache.shardingsphere.driver.jdbc.core.statement.StatementManager;
 import 
org.apache.shardingsphere.driver.jdbc.unsupported.AbstractUnsupportedOperationPreparedStatement;
 import 
org.apache.shardingsphere.driver.state.circuit.connection.CircuitBreakerConnection;
 import 
org.apache.shardingsphere.driver.state.circuit.resultset.CircuitBreakerResultSet;
@@ -281,6 +282,11 @@ public final class CircuitBreakerPreparedStatement extends 
AbstractUnsupportedOp
     }
     
     @Override
+    protected StatementManager getStatementManager() {
+        return null;
+    }
+    
+    @Override
     public ResultSet executeQuery() {
         return new CircuitBreakerResultSet();
     }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
index 8fb29ae..a86df4f 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
@@ -39,6 +39,7 @@ import 
org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.JDBCDatabaseCommunicationEngine;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.ProxyJDBCExecutor;
+import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.statement.JDBCBackendStatement;
 import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import 
org.apache.shardingsphere.proxy.backend.exception.TableModifyInTransactionException;
@@ -149,8 +150,9 @@ public final class ProxySQLExecutor {
     
     private List<ExecuteResult> useDriverToExecute(final ExecutionContext 
executionContext, final Collection<ShardingSphereRule> rules,
                                                    final int 
maxConnectionsSizePerQuery, final boolean isReturnGeneratedKeys, final boolean 
isExceptionThrown) throws SQLException {
+        JDBCBackendStatement statementManager = (JDBCBackendStatement) 
backendConnection.getConnectionSession().getStatementManager();
         DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
prepareEngine = new DriverExecutionPrepareEngine<>(
-                type, maxConnectionsSizePerQuery, backendConnection, new 
StatementOption(isReturnGeneratedKeys), rules);
+                type, maxConnectionsSizePerQuery, backendConnection, 
statementManager, new StatementOption(isReturnGeneratedKeys), rules);
         ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext;
         try {
             executionGroupContext = 
prepareEngine.prepare(executionContext.getRouteContext(), 
executionContext.getExecutionUnits());
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ReactiveProxySQLExecutor.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ReactiveProxySQLExecutor.java
index 00e1764..a5b5ff3 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ReactiveProxySQLExecutor.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ReactiveProxySQLExecutor.java
@@ -31,6 +31,7 @@ import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecuti
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.vertx.VertxExecutionContext;
 import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
 import 
org.apache.shardingsphere.proxy.backend.communication.vertx.VertxBackendConnection;
+import 
org.apache.shardingsphere.proxy.backend.communication.vertx.VertxBackendStatement;
 import 
org.apache.shardingsphere.proxy.backend.communication.vertx.executor.ProxyReactiveExecutor;
 import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
@@ -103,8 +104,9 @@ public final class ReactiveProxySQLExecutor {
     
     private Future<List<ExecuteResult>> useDriverToExecute(final 
ExecutionContext executionContext, final Collection<ShardingSphereRule> rules,
                                                                  final int 
maxConnectionsSizePerQuery) throws SQLException {
+        VertxBackendStatement statementManager = (VertxBackendStatement) 
backendConnection.getConnectionSession().getStatementManager();
         DriverExecutionPrepareEngine<VertxExecutionUnit, Future<? extends 
SqlClient>> prepareEngine = new DriverExecutionPrepareEngine<>(
-                TYPE, maxConnectionsSizePerQuery, backendConnection, new 
VertxExecutionContext(), rules);
+                TYPE, maxConnectionsSizePerQuery, backendConnection, 
statementManager, new VertxExecutionContext(), rules);
         ExecutionGroupContext<VertxExecutionUnit> executionGroupContext;
         try {
             executionGroupContext = 
prepareEngine.prepare(executionContext.getRouteContext(), 
executionContext.getExecutionUnits());
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
index 4ba4bbd..c003eb4 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
@@ -42,12 +42,13 @@ import 
org.apache.shardingsphere.proxy.backend.communication.ProxySQLExecutor;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.ProxyJDBCExecutorCallback;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.ProxyJDBCExecutorCallbackFactory;
+import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.statement.JDBCBackendStatement;
 import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
+import 
org.apache.shardingsphere.proxy.backend.response.header.query.QueryHeaderBuilder;
 import 
org.apache.shardingsphere.proxy.backend.response.header.query.QueryHeaderBuilderFactory;
 import 
org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
-import 
org.apache.shardingsphere.proxy.backend.response.header.query.QueryHeaderBuilder;
 import 
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
 import 
org.apache.shardingsphere.sharding.merge.dql.iterator.IteratorStreamMergedResult;
 import 
org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLInsertStatement;
@@ -147,8 +148,9 @@ public final class JDBCDatabaseCommunicationEngine extends 
DatabaseCommunication
     
     private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
createDriverExecutionPrepareEngine(final boolean isReturnGeneratedKeys, final 
MetaDataContexts metaData) {
         int maxConnectionsSizePerQuery = 
metaData.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
-        return new DriverExecutionPrepareEngine<>(getDriverType(), 
maxConnectionsSizePerQuery, backendConnection, new 
StatementOption(isReturnGeneratedKeys),
-                
metaData.getMetaData(backendConnection.getConnectionSession().getSchemaName()).getRuleMetaData().getRules());
+        JDBCBackendStatement statementManager = (JDBCBackendStatement) 
backendConnection.getConnectionSession().getStatementManager();
+        return new DriverExecutionPrepareEngine<>(getDriverType(), 
maxConnectionsSizePerQuery, backendConnection, statementManager, 
+                new StatementOption(isReturnGeneratedKeys), 
metaData.getMetaData(backendConnection.getConnectionSession().getSchemaName()).getRuleMetaData().getRules());
     }
     
     private ResponseHeader processExecuteFederation(final ResultSet resultSet, 
final MetaDataContexts metaDataContexts) throws SQLException {
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnection.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnection.java
index 9110961..e09223f 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnection.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnection.java
@@ -22,34 +22,25 @@ import com.google.common.collect.LinkedHashMultimap;
 import com.google.common.collect.Multimap;
 import lombok.Getter;
 import lombok.Setter;
-import 
org.apache.shardingsphere.db.protocol.parameter.TypeUnspecifiedSQLParameter;
-import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
-import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCManager;
-import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
+import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCConnectionManager;
 import org.apache.shardingsphere.infra.federation.executor.FederationExecutor;
 import org.apache.shardingsphere.proxy.backend.communication.BackendConnection;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.JDBCDatabaseCommunicationEngine;
-import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.statement.StatementMemoryStrictlyFetchSizeSetter;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.transaction.JDBCBackendTransactionManager;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import 
org.apache.shardingsphere.proxy.backend.exception.BackendConnectionException;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
 import org.apache.shardingsphere.proxy.backend.util.TransactionUtil;
-import org.apache.shardingsphere.spi.singleton.SingletonSPIRegistry;
 import org.apache.shardingsphere.transaction.core.TransactionType;
 
 import java.sql.Connection;
-import java.sql.PreparedStatement;
 import java.sql.SQLException;
-import java.sql.Statement;
-import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -57,7 +48,7 @@ import java.util.concurrent.ConcurrentHashMap;
  */
 @Getter
 @Setter
-public final class JDBCBackendConnection implements BackendConnection<Void>, 
ExecutorJDBCManager {
+public final class JDBCBackendConnection implements BackendConnection<Void>, 
ExecutorJDBCConnectionManager {
     
     private final ConnectionSession connectionSession;
     
@@ -75,11 +66,8 @@ public final class JDBCBackendConnection implements 
BackendConnection<Void>, Exe
     
     private final ConnectionStatus connectionStatus = new ConnectionStatus();
     
-    private final Map<String, StatementMemoryStrictlyFetchSizeSetter> 
fetchSizeSetters;
-    
     public JDBCBackendConnection(final ConnectionSession connectionSession) {
         this.connectionSession = connectionSession;
-        fetchSizeSetters = 
SingletonSPIRegistry.getTypedSingletonInstancesMap(StatementMemoryStrictlyFetchSizeSetter.class);
     }
     
     @Override
@@ -140,41 +128,6 @@ public final class JDBCBackendConnection implements 
BackendConnection<Void>, Exe
         }
     }
     
-    @Override
-    public Statement createStorageResource(final Connection connection, final 
ConnectionMode connectionMode, final StatementOption option) throws 
SQLException {
-        Statement result = connection.createStatement();
-        if (ConnectionMode.MEMORY_STRICTLY == connectionMode) {
-            setFetchSize(result);
-        }
-        return result;
-    }
-    
-    @Override
-    public PreparedStatement createStorageResource(final String sql, final 
List<Object> parameters, 
-                                                   final Connection 
connection, final ConnectionMode connectionMode, final StatementOption option) 
throws SQLException {
-        PreparedStatement result = option.isReturnGeneratedKeys()
-                ? connection.prepareStatement(sql, 
Statement.RETURN_GENERATED_KEYS) : connection.prepareStatement(sql);
-        for (int i = 0; i < parameters.size(); i++) {
-            Object parameter = parameters.get(i);
-            if (parameter instanceof TypeUnspecifiedSQLParameter) {
-                result.setObject(i + 1, parameter, Types.OTHER);
-            } else {
-                result.setObject(i + 1, parameter);
-            }
-        }
-        if (ConnectionMode.MEMORY_STRICTLY == connectionMode) {
-            setFetchSize(result);
-        }
-        return result;
-    }
-    
-    private void setFetchSize(final Statement statement) throws SQLException {
-        DatabaseType databaseType = 
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData(connectionSession.getSchemaName()).getResource().getDatabaseType();
-        if (fetchSizeSetters.containsKey(databaseType.getName())) {
-            
fetchSizeSetters.get(databaseType.getName()).setFetchSize(statement);
-        }
-    }
-    
     /**
      * Whether execute SQL serial or not.
      *
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/statement/JDBCBackendStatement.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/statement/JDBCBackendStatement.java
new file mode 100644
index 0000000..acd9e4b
--- /dev/null
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/statement/JDBCBackendStatement.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.backend.communication.jdbc.statement;
+
+import lombok.Getter;
+import lombok.Setter;
+import 
org.apache.shardingsphere.db.protocol.parameter.TypeUnspecifiedSQLParameter;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
+import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCStatementManager;
+import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
+import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+import org.apache.shardingsphere.spi.singleton.SingletonSPIRegistry;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * JDBC backend statement.
+ */
+@Getter
+@Setter
+public final class JDBCBackendStatement implements 
ExecutorJDBCStatementManager {
+    
+    private final Map<String, StatementMemoryStrictlyFetchSizeSetter> 
fetchSizeSetters;
+    
+    private String schemaName;
+    
+    public JDBCBackendStatement() {
+        fetchSizeSetters = 
SingletonSPIRegistry.getTypedSingletonInstancesMap(StatementMemoryStrictlyFetchSizeSetter.class);
+    }
+    
+    @Override
+    public Statement createStorageResource(final Connection connection, final 
ConnectionMode connectionMode, final StatementOption option) throws 
SQLException {
+        Statement result = connection.createStatement();
+        if (ConnectionMode.MEMORY_STRICTLY == connectionMode) {
+            setFetchSize(result);
+        }
+        return result;
+    }
+    
+    @Override
+    public Statement createStorageResource(final ExecutionUnit executionUnit, 
final Connection connection, final ConnectionMode connectionMode, final 
StatementOption option) throws SQLException {
+        String sql = executionUnit.getSqlUnit().getSql();
+        List<Object> parameters = executionUnit.getSqlUnit().getParameters();
+        PreparedStatement result = option.isReturnGeneratedKeys()
+                ? 
connection.prepareStatement(executionUnit.getSqlUnit().getSql(), 
Statement.RETURN_GENERATED_KEYS) : connection.prepareStatement(sql);
+        for (int i = 0; i < parameters.size(); i++) {
+            Object parameter = parameters.get(i);
+            if (parameter instanceof TypeUnspecifiedSQLParameter) {
+                result.setObject(i + 1, parameter, Types.OTHER);
+            } else {
+                result.setObject(i + 1, parameter);
+            }
+        }
+        if (ConnectionMode.MEMORY_STRICTLY == connectionMode) {
+            setFetchSize(result);
+        }
+        return result;
+    }
+    
+    private void setFetchSize(final Statement statement) throws SQLException {
+        DatabaseType databaseType = 
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData(schemaName).getResource().getDatabaseType();
+        if (fetchSizeSetters.containsKey(databaseType.getName())) {
+            
fetchSizeSetters.get(databaseType.getName()).setFetchSize(statement);
+        }
+    }
+}
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/vertx/VertxBackendConnection.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/vertx/VertxBackendConnection.java
index 7e09d3f..c12ff55 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/vertx/VertxBackendConnection.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/vertx/VertxBackendConnection.java
@@ -22,15 +22,11 @@ import com.google.common.collect.LinkedHashMultimap;
 import com.google.common.collect.Multimap;
 import io.vertx.core.CompositeFuture;
 import io.vertx.core.Future;
-import io.vertx.sqlclient.Query;
-import io.vertx.sqlclient.Row;
-import io.vertx.sqlclient.RowSet;
 import io.vertx.sqlclient.SqlClient;
 import io.vertx.sqlclient.SqlConnection;
 import lombok.Getter;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
-import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.vertx.ExecutorVertxManager;
-import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.vertx.VertxExecutionContext;
+import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.vertx.ExecutorVertxConnectionManager;
 import org.apache.shardingsphere.proxy.backend.communication.BackendConnection;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.ConnectionPostProcessor;
 import 
org.apache.shardingsphere.proxy.backend.communication.vertx.transaction.VertxLocalTransactionManager;
@@ -48,7 +44,7 @@ import java.util.List;
  * Vert.x backend connection.
  */
 @Getter
-public final class VertxBackendConnection implements 
BackendConnection<Future<Void>>, ExecutorVertxManager {
+public final class VertxBackendConnection implements 
BackendConnection<Future<Void>>, ExecutorVertxConnectionManager {
     
     private final ConnectionSession connectionSession;
     
@@ -117,17 +113,6 @@ public final class VertxBackendConnection implements 
BackendConnection<Future<Vo
     }
     
     @Override
-    public Future<Query<RowSet<Row>>> createStorageResource(final Future<? 
extends SqlClient> connection, final ConnectionMode connectionMode, final 
VertxExecutionContext option) {
-        return Future.failedFuture(new UnsupportedOperationException("Vert.x 
query is not like JDBC statement."));
-    }
-    
-    @Override
-    public Future<Query<RowSet<Row>>> createStorageResource(final String sql, 
final List<Object> parameters, final Future<? extends SqlClient> connection, 
final ConnectionMode connectionMode,
-                                                            final 
VertxExecutionContext ignored) {
-        return Future.failedFuture(new UnsupportedOperationException("Vert.x 
prepared query is not like JDBC prepared statement."));
-    }
-    
-    @Override
     public Future<Void> prepareForTaskExecution() {
         if (!connectionSession.isAutoCommit() && 
!connectionSession.getTransactionStatus().isInTransaction()) {
             VertxLocalTransactionManager transactionManager = new 
VertxLocalTransactionManager(this);
diff --git 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/builder/PreparedQueryExecutionUnitBuilder.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/vertx/VertxBackendStatement.java
similarity index 53%
copy from 
shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/builder/PreparedQueryExecutionUnitBuilder.java
copy to 
shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/vertx/VertxBackendStatement.java
index df0c038..52f1b15 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/builder/PreparedQueryExecutionUnitBuilder.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/vertx/VertxBackendStatement.java
@@ -15,31 +15,35 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.vertx.builder;
+package org.apache.shardingsphere.proxy.backend.communication.vertx;
 
 import io.vertx.core.Future;
+import io.vertx.sqlclient.Query;
+import io.vertx.sqlclient.Row;
+import io.vertx.sqlclient.RowSet;
 import io.vertx.sqlclient.SqlClient;
+import lombok.Getter;
 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.vertx.VertxExecutionUnit;
-import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.vertx.ExecutorVertxManager;
+import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.vertx.ExecutorVertxStatementManager;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.vertx.VertxExecutionContext;
 
 import java.sql.SQLException;
 
 /**
- * Execution unit builder using Vert.x prepared query.
+ * Vert.x backend statement.
  */
-public final class PreparedQueryExecutionUnitBuilder implements 
VertxExecutionUnitBuilder {
+@Getter
+public final class VertxBackendStatement implements 
ExecutorVertxStatementManager {
     
     @Override
-    public VertxExecutionUnit build(final ExecutionUnit executionUnit, final 
ExecutorVertxManager executorManager, final Future<? extends SqlClient> 
connection, final ConnectionMode connectionMode,
-                                    final VertxExecutionContext option) throws 
SQLException {
-        return new VertxExecutionUnit(executionUnit, connectionMode, 
connection.compose(sqlClient -> 
Future.succeededFuture(sqlClient.preparedQuery(executionUnit.getSqlUnit().getSql()))));
+    public Future<Query<RowSet<Row>>> createStorageResource(final Future<? 
extends SqlClient> connection, final ConnectionMode connectionMode, final 
VertxExecutionContext option) throws SQLException {
+        return Future.failedFuture(new UnsupportedOperationException("Vert.x 
query is not like JDBC statement."));
     }
     
     @Override
-    public String getType() {
-        return "Vert.x";
+    public Future<Query<RowSet<Row>>> createStorageResource(final 
ExecutionUnit executionUnit, final Future<? extends SqlClient> connection, 
+                                                            final 
ConnectionMode connectionMode, final VertxExecutionContext option) throws 
SQLException {
+        return Future.failedFuture(new UnsupportedOperationException("Vert.x 
prepared query is not like JDBC prepared statement."));
     }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java
index 81fc944..cb7bcb7 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java
@@ -23,11 +23,14 @@ import lombok.Getter;
 import lombok.Setter;
 import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
 import org.apache.shardingsphere.infra.exception.ShardingSphereException;
+import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.ExecutorStatementManager;
 import org.apache.shardingsphere.infra.metadata.user.Grantee;
 import org.apache.shardingsphere.proxy.backend.communication.BackendConnection;
 import 
org.apache.shardingsphere.proxy.backend.communication.SQLStatementSchemaHolder;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
+import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.statement.JDBCBackendStatement;
 import 
org.apache.shardingsphere.proxy.backend.communication.vertx.VertxBackendConnection;
+import 
org.apache.shardingsphere.proxy.backend.communication.vertx.VertxBackendStatement;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import 
org.apache.shardingsphere.proxy.backend.session.transaction.TransactionStatus;
 import 
org.apache.shardingsphere.sql.parser.sql.common.constant.TransactionIsolationLevel;
@@ -65,10 +68,13 @@ public final class ConnectionSession {
 
     private final BackendConnection backendConnection;
     
+    private final ExecutorStatementManager statementManager;
+    
     public ConnectionSession(final TransactionType initialTransactionType, 
final AttributeMap attributeMap) {
         transactionStatus = new TransactionStatus(initialTransactionType);
         this.attributeMap = attributeMap;
         backendConnection = determineBackendConnection();
+        statementManager = determineStatementManager();
     }
     
     private BackendConnection determineBackendConnection() {
@@ -76,6 +82,11 @@ public final class ConnectionSession {
         return "ExperimentalVertx".equals(proxyBackendDriverType) ? new 
VertxBackendConnection(this) : new JDBCBackendConnection(this);
     }
     
+    private ExecutorStatementManager determineStatementManager() {
+        String proxyBackendDriverType = 
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getProps().getValue(ConfigurationPropertyKey.PROXY_BACKEND_DRIVER_TYPE);
+        return "ExperimentalVertx".equals(proxyBackendDriverType) ? new 
VertxBackendStatement() : new JDBCBackendStatement();
+    }
+    
     /**
      * Change schema of current channel.
      *
@@ -88,6 +99,9 @@ public final class ConnectionSession {
         if (transactionStatus.isInTransaction()) {
             throw new ShardingSphereException("Failed to switch schema, please 
terminate current transaction.");
         }
+        if (statementManager instanceof JDBCBackendStatement) {
+            ((JDBCBackendStatement) 
statementManager).setSchemaName(schemaName);
+        }
         this.schemaName = schemaName;
     }
     
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/advanced/PreviewDistSQLBackendHandler.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/advanced/PreviewDistSQLBackendHandler.java
index fccb9fd..e96f2ac 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/advanced/PreviewDistSQLBackendHandler.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/advanced/PreviewDistSQLBackendHandler.java
@@ -50,6 +50,7 @@ import 
org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import org.apache.shardingsphere.parser.rule.SQLParserRule;
 import 
org.apache.shardingsphere.proxy.backend.communication.SQLStatementSchemaHolder;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
+import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.statement.JDBCBackendStatement;
 import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import 
org.apache.shardingsphere.proxy.backend.exception.NoDatabaseSelectedException;
@@ -150,8 +151,8 @@ public final class PreviewDistSQLBackendHandler extends 
QueryableRALBackendHandl
     
     private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
createDriverExecutionPrepareEngine(final boolean isReturnGeneratedKeys, final 
MetaDataContexts metaData) {
         int maxConnectionsSizePerQuery = 
metaData.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
-        return new DriverExecutionPrepareEngine<>(JDBCDriverType.STATEMENT, 
maxConnectionsSizePerQuery, (JDBCBackendConnection) 
connectionSession.getBackendConnection(), 
-                new StatementOption(isReturnGeneratedKeys), 
metaData.getMetaData(getSchemaName()).getRuleMetaData().getRules());
+        return new DriverExecutionPrepareEngine<>(JDBCDriverType.STATEMENT, 
maxConnectionsSizePerQuery, (JDBCBackendConnection) 
connectionSession.getBackendConnection(),
+                (JDBCBackendStatement) 
connectionSession.getStatementManager(), new 
StatementOption(isReturnGeneratedKeys), 
metaData.getMetaData(getSchemaName()).getRuleMetaData().getRules());
     }
     
     private String getSchemaName() {
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnectionTest.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnectionTest.java
index 026b202..7f3d93e 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnectionTest.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnectionTest.java
@@ -31,6 +31,7 @@ import 
org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.JDBCDatabaseCommunicationEngine;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.datasource.JDBCBackendDataSource;
+import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.statement.JDBCBackendStatement;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import 
org.apache.shardingsphere.proxy.backend.exception.BackendConnectionException;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
@@ -47,8 +48,6 @@ import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -97,6 +96,9 @@ public final class JDBCBackendConnectionTest {
         backendConnection = spy(new JDBCBackendConnection(connectionSession));
         
when(connectionSession.getBackendConnection()).thenReturn(backendConnection);
         when(connectionSession.getTransactionStatus()).thenReturn(new 
TransactionStatus(TransactionType.LOCAL));
+        JDBCBackendStatement backendStatement = new JDBCBackendStatement();
+        backendStatement.setSchemaName(connectionSession.getSchemaName());
+        
when(connectionSession.getStatementManager()).thenReturn(backendStatement);
     }
     
     private void setContextManager() throws ReflectiveOperationException {
@@ -248,15 +250,6 @@ public final class JDBCBackendConnectionTest {
     }
     
     @Test
-    public void assertSetFetchSizeAsExpected() throws 
InvocationTargetException, IllegalAccessException, NoSuchMethodException, 
SQLException {
-        Statement statement = mock(Statement.class);
-        Method setFetchSizeMethod = 
JDBCBackendConnection.class.getDeclaredMethod("setFetchSize", Statement.class);
-        setFetchSizeMethod.setAccessible(true);
-        setFetchSizeMethod.invoke(backendConnection, statement);
-        verify(statement, times(1)).setFetchSize(Integer.MIN_VALUE);
-    }
-    
-    @Test
     public void assertCloseConnectionsCorrectlyWhenNotForceRollback() throws 
NoSuchFieldException, IllegalAccessException, SQLException {
         Field field = 
JDBCBackendConnection.class.getDeclaredField("cachedConnections");
         field.setAccessible(true);
@@ -306,7 +299,8 @@ public final class JDBCBackendConnectionTest {
         Connection connection = mock(Connection.class);
         Statement statement = mock(Statement.class);
         when(connection.createStatement()).thenReturn(statement);
-        assertThat(backendConnection.createStorageResource(connection, 
ConnectionMode.MEMORY_STRICTLY, null), is(statement));
+        JDBCBackendStatement backendStatement = (JDBCBackendStatement) 
connectionSession.getStatementManager();
+        assertThat(backendStatement.createStorageResource(connection, 
ConnectionMode.MEMORY_STRICTLY, null), is(statement));
         verify(connection, times(1)).createStatement();
     }
     
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-opengauss/src/test/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/query/extended/bind/OpenGaussComBatchBindExecutorTest.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-opengauss/src/test/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/query/extended/bind/OpenGaussComBatchBindExecutorTest.java
index 3bffd38..961585e 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-opengauss/src/test/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/query/extended/bind/OpenGaussComBatchBindExecutorTest.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-opengauss/src/test/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/query/extended/bind/OpenGaussComBatchBindExecutorTest.java
@@ -23,6 +23,7 @@ import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.ext
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.extended.bind.PostgreSQLBindCompletePacket;
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLCommandCompletePacket;
 import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
 import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
@@ -30,6 +31,7 @@ import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.parser.rule.SQLParserRule;
 import 
org.apache.shardingsphere.parser.rule.builder.DefaultSQLParserRuleConfigurationBuilder;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
+import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.statement.JDBCBackendStatement;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
@@ -50,7 +52,6 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyList;
-import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.nullable;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
@@ -86,7 +87,9 @@ public final class OpenGaussComBatchBindExecutorTest {
         when(backendConnection.getConnections(nullable(String.class), 
anyInt(), 
any(ConnectionMode.class))).thenReturn(Collections.singletonList(connection));
         PreparedStatement preparedStatement = mock(PreparedStatement.class);
         when(preparedStatement.getConnection()).thenReturn(connection);
-        when(backendConnection.createStorageResource(anyString(), anyList(), 
any(Connection.class), any(ConnectionMode.class), 
any(StatementOption.class))).thenReturn(preparedStatement);
+        JDBCBackendStatement backendStatement = 
mock(JDBCBackendStatement.class);
+        when(backendStatement.createStorageResource(any(ExecutionUnit.class), 
any(Connection.class), any(ConnectionMode.class), 
any(StatementOption.class))).thenReturn(preparedStatement);
+        
when(connectionSession.getStatementManager()).thenReturn(backendStatement);
         
when(connectionSession.getBackendConnection()).thenReturn(backendConnection);
         
PostgreSQLPreparedStatementRegistry.getInstance().register(connectionId);
         SQLStatement sqlStatement = SQL_PARSER_ENGINE.parse("insert into bmsql 
(id) values (?)", false);
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedInsertsExecutor.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedInsertsExecutor.java
index 4672f76..914da0a 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedInsertsExecutor.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedInsertsExecutor.java
@@ -41,6 +41,7 @@ import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.Statemen
 import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
+import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.statement.JDBCBackendStatement;
 import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
@@ -123,7 +124,7 @@ public final class PostgreSQLBatchedInsertsExecutor {
         Collection<ShardingSphereRule> rules = 
metaDataContexts.getMetaData(connectionSession.getSchemaName()).getRuleMetaData().getRules();
         DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
prepareEngine = new DriverExecutionPrepareEngine<>(
                 JDBCDriverType.PREPARED_STATEMENT, 
metaDataContexts.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY),
-                (JDBCBackendConnection) 
connectionSession.getBackendConnection(), new StatementOption(false), rules);
+                (JDBCBackendConnection) 
connectionSession.getBackendConnection(), (JDBCBackendStatement) 
connectionSession.getStatementManager(), new StatementOption(false), rules);
         executionGroupContext = 
prepareEngine.prepare(anyExecutionContext.getRouteContext(), 
executionUnitParameters.keySet());
         for (ExecutionGroup<JDBCExecutionUnit> eachGroup : 
executionGroupContext.getInputGroups()) {
             for (JDBCExecutionUnit each : eachGroup.getInputs()) {
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLAggregatedBatchedInsertsCommandExecutorTest.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLAggregatedBatchedInsertsCommandExecutorTest.java
index 4410970..f493fca 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLAggregatedBatchedInsertsCommandExecutorTest.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLAggregatedBatchedInsertsCommandExecutorTest.java
@@ -28,6 +28,7 @@ import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.ext
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.extended.execute.PostgreSQLComExecutePacket;
 import 
org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLCommandCompletePacket;
 import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
 import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
@@ -35,6 +36,7 @@ import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.parser.rule.SQLParserRule;
 import 
org.apache.shardingsphere.parser.rule.builder.DefaultSQLParserRuleConfigurationBuilder;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
+import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.statement.JDBCBackendStatement;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
 import org.junit.After;
@@ -53,8 +55,6 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyList;
-import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.nullable;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
@@ -96,7 +96,9 @@ public final class 
PostgreSQLAggregatedBatchedInsertsCommandExecutorTest {
         when(backendConnection.getConnections(nullable(String.class), 
anyInt(), 
any(ConnectionMode.class))).thenReturn(Collections.singletonList(connection));
         PreparedStatement preparedStatement = mock(PreparedStatement.class);
         when(preparedStatement.getConnection()).thenReturn(connection);
-        when(backendConnection.createStorageResource(anyString(), anyList(), 
any(Connection.class), any(ConnectionMode.class), 
any(StatementOption.class))).thenReturn(preparedStatement);
+        JDBCBackendStatement backendStatement = 
mock(JDBCBackendStatement.class);
+        when(backendStatement.createStorageResource(any(ExecutionUnit.class), 
any(Connection.class), any(ConnectionMode.class), 
any(StatementOption.class))).thenReturn(preparedStatement);
+        
when(connectionSession.getStatementManager()).thenReturn(backendStatement);
         
when(connectionSession.getBackendConnection()).thenReturn(backendConnection);
         PostgreSQLAggregatedBatchedInsertsCommandExecutor 
batchedInsertsCommandExecutor = new 
PostgreSQLAggregatedBatchedInsertsCommandExecutor(connectionSession, 
preparePackets());
         List<DatabasePacket<?>> actualPackets = new 
ArrayList<>(batchedInsertsCommandExecutor.execute());

Reply via email to