This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 70d27b2 Add StatementManager and cache statements for
ShardingSpherePreparedStatement (#15934)
70d27b2 is described below
commit 70d27b2eef06479b278498bd0b42f17321bb2bea
Author: Zhengqiang Duan <[email protected]>
AuthorDate: Thu Mar 10 09:57:28 2022 +0800
Add StatementManager and cache statements for
ShardingSpherePreparedStatement (#15934)
---
.../driver/DriverExecutionPrepareEngine.java | 15 ++--
...Builder.java => ExecutorConnectionManager.java} | 27 +++++--
...rManager.java => ExecutorStatementManager.java} | 22 ++----
.../prepare/driver/SQLExecutionUnitBuilder.java | 2 +-
...ger.java => ExecutorJDBCConnectionManager.java} | 7 +-
...ager.java => ExecutorJDBCStatementManager.java} | 6 +-
.../jdbc/builder/JDBCExecutionUnitBuilder.java | 4 +-
.../PreparedStatementExecutionUnitBuilder.java | 11 ++-
.../builder/StatementExecutionUnitBuilder.java | 10 +--
.../ExecutorVertxConnectionManager.java} | 13 ++--
...ger.java => ExecutorVertxStatementManager.java} | 6 +-
.../builder/PreparedQueryExecutionUnitBuilder.java | 6 +-
.../vertx/builder/VertxExecutionUnitBuilder.java | 4 +-
.../jdbc/adapter/AbstractStatementAdapter.java | 6 ++
.../jdbc/core/connection/ConnectionManager.java | 21 +----
.../statement/ShardingSpherePreparedStatement.java | 23 +++---
.../core/statement/ShardingSphereStatement.java | 10 ++-
.../jdbc/core/statement/StatementManager.java | 79 +++++++++++++++++++
.../statement/CircuitBreakerPreparedStatement.java | 6 ++
.../backend/communication/ProxySQLExecutor.java | 4 +-
.../communication/ReactiveProxySQLExecutor.java | 4 +-
.../jdbc/JDBCDatabaseCommunicationEngine.java | 8 +-
.../jdbc/connection/JDBCBackendConnection.java | 51 +------------
.../jdbc/statement/JDBCBackendStatement.java | 89 ++++++++++++++++++++++
.../vertx/VertxBackendConnection.java | 19 +----
.../communication/vertx/VertxBackendStatement.java | 24 +++---
.../proxy/backend/session/ConnectionSession.java | 14 ++++
.../ral/advanced/PreviewDistSQLBackendHandler.java | 5 +-
.../jdbc/connection/JDBCBackendConnectionTest.java | 18 ++---
.../bind/OpenGaussComBatchBindExecutorTest.java | 7 +-
.../extended/PostgreSQLBatchedInsertsExecutor.java | 3 +-
...ggregatedBatchedInsertsCommandExecutorTest.java | 8 +-
32 files changed, 329 insertions(+), 203 deletions(-)
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/DriverExecutionPrepareEngine.java
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/DriverExecutionPrepareEngine.java
index d900995..61b7b48 100644
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/DriverExecutionPrepareEngine.java
+++
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/DriverExecutionPrepareEngine.java
@@ -46,7 +46,9 @@ public final class DriverExecutionPrepareEngine<T extends
DriverExecutionUnit<?>
@SuppressWarnings("rawtypes")
private static final Map<String, SQLExecutionUnitBuilder>
TYPE_TO_BUILDER_MAP = new ConcurrentHashMap<>(8, 1);
- private final ExecutorDriverManager<C, ?, ?> executorDriverManager;
+ private final ExecutorConnectionManager<C> connectionManager;
+
+ private final ExecutorStatementManager<C, ?, ?> statementManager;
private final StorageResourceOption option;
@@ -57,10 +59,11 @@ public final class DriverExecutionPrepareEngine<T extends
DriverExecutionUnit<?>
ShardingSphereServiceLoader.register(SQLExecutionUnitBuilder.class);
}
- public DriverExecutionPrepareEngine(final String type, final int
maxConnectionsSizePerQuery, final ExecutorDriverManager<C, ?, ?>
executorDriverManager,
- final StorageResourceOption option,
final Collection<ShardingSphereRule> rules) {
+ public DriverExecutionPrepareEngine(final String type, final int
maxConnectionsSizePerQuery, final ExecutorConnectionManager<C>
connectionManager,
+ final ExecutorStatementManager<C, ?,
?> statementManager, final StorageResourceOption option, final
Collection<ShardingSphereRule> rules) {
super(maxConnectionsSizePerQuery, rules);
- this.executorDriverManager = executorDriverManager;
+ this.connectionManager = connectionManager;
+ this.statementManager = statementManager;
this.option = option;
sqlExecutionUnitBuilder = getCachedSqlExecutionUnitBuilder(type);
}
@@ -83,7 +86,7 @@ public final class DriverExecutionPrepareEngine<T extends
DriverExecutionUnit<?>
@Override
protected List<ExecutionGroup<T>> group(final String dataSourceName, final
List<List<SQLUnit>> sqlUnitGroups, final ConnectionMode connectionMode) throws
SQLException {
List<ExecutionGroup<T>> result = new LinkedList<>();
- List<C> connections =
executorDriverManager.getConnections(dataSourceName, sqlUnitGroups.size(),
connectionMode);
+ List<C> connections = connectionManager.getConnections(dataSourceName,
sqlUnitGroups.size(), connectionMode);
int count = 0;
for (List<SQLUnit> each : sqlUnitGroups) {
result.add(createExecutionGroup(dataSourceName, each,
connections.get(count++), connectionMode));
@@ -95,7 +98,7 @@ public final class DriverExecutionPrepareEngine<T extends
DriverExecutionUnit<?>
private ExecutionGroup<T> createExecutionGroup(final String
dataSourceName, final List<SQLUnit> sqlUnits, final C connection, final
ConnectionMode connectionMode) throws SQLException {
List<T> result = new LinkedList<>();
for (SQLUnit each : sqlUnits) {
- result.add((T) sqlExecutionUnitBuilder.build(new
ExecutionUnit(dataSourceName, each), executorDriverManager, connection,
connectionMode, option));
+ result.add((T) sqlExecutionUnitBuilder.build(new
ExecutionUnit(dataSourceName, each), statementManager, connection,
connectionMode, option));
}
return new ExecutionGroup<>(result);
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/JDBCExecutionUnitBuilder.java
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/ExecutorConnectionManager.java
similarity index 61%
copy from
shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/JDBCExecutionUnitBuilder.java
copy to
shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/ExecutorConnectionManager.java
index 5f97b45..6bf5600 100644
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/JDBCExecutionUnitBuilder.java
+++
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/ExecutorConnectionManager.java
@@ -15,17 +15,28 @@
* limitations under the License.
*/
-package
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.builder;
+package org.apache.shardingsphere.infra.executor.sql.prepare.driver;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
-import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.SQLExecutionUnitBuilder;
-import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCManager;
-import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
-import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.List;
/**
- * JDBC execution unit builder.
+ * Executor connection manager.
+ *
+ * @param <C> type of resource connection
*/
-public interface JDBCExecutionUnitBuilder extends
SQLExecutionUnitBuilder<JDBCExecutionUnit, ExecutorJDBCManager, Connection,
StatementOption> {
+public interface ExecutorConnectionManager<C> {
+
+ /**
+ * Get connections.
+ *
+ * @param dataSourceName data source name
+ * @param connectionSize connection size
+ * @param connectionMode connection mode
+ * @return connections
+ * @throws SQLException SQL exception
+ */
+ List<C> getConnections(String dataSourceName, int connectionSize,
ConnectionMode connectionMode) throws SQLException;
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/ExecutorDriverManager.java
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/ExecutorStatementManager.java
similarity index 71%
rename from
shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/ExecutorDriverManager.java
rename to
shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/ExecutorStatementManager.java
index 344775a..e3483e5 100644
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/ExecutorDriverManager.java
+++
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/ExecutorStatementManager.java
@@ -17,30 +17,19 @@
package org.apache.shardingsphere.infra.executor.sql.prepare.driver;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
import java.sql.SQLException;
-import java.util.List;
/**
- * Executor driver manager.
+ * Executor statement manager.
*
* @param <C> type of resource connection
* @param <R> type of storage resource
* @param <O> type of storage resource option
*/
-public interface ExecutorDriverManager<C, R, O extends StorageResourceOption> {
-
- /**
- * Get connections.
- *
- * @param dataSourceName data source name
- * @param connectionSize connection size
- * @param connectionMode connection mode
- * @return connections
- * @throws SQLException SQL exception
- */
- List<C> getConnections(String dataSourceName, int connectionSize,
ConnectionMode connectionMode) throws SQLException;
+public interface ExecutorStatementManager<C, R, O extends
StorageResourceOption> {
/**
* Create storage resource.
@@ -56,13 +45,12 @@ public interface ExecutorDriverManager<C, R, O extends
StorageResourceOption> {
/**
* Create storage resource.
*
- * @param sql SQL
- * @param parameters SQL parameters
+ * @param executionUnit execution unit
* @param connection connection
* @param connectionMode connection mode
* @param option storage resource option
* @return storage resource
* @throws SQLException SQL exception
*/
- R createStorageResource(String sql, List<Object> parameters, C connection,
ConnectionMode connectionMode, O option) throws SQLException;
+ R createStorageResource(ExecutionUnit executionUnit, C connection,
ConnectionMode connectionMode, O option) throws SQLException;
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/SQLExecutionUnitBuilder.java
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/SQLExecutionUnitBuilder.java
index 3ead2a1..38828f5 100644
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/SQLExecutionUnitBuilder.java
+++
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/SQLExecutionUnitBuilder.java
@@ -32,7 +32,7 @@ import java.sql.SQLException;
* @param <C> type of resource connection
* @param <O> type of storage resource option
*/
-public interface SQLExecutionUnitBuilder<T extends DriverExecutionUnit<?>, M
extends ExecutorDriverManager<C, ?, O>, C, O extends StorageResourceOption>
extends TypedSPI {
+public interface SQLExecutionUnitBuilder<T extends DriverExecutionUnit<?>, M
extends ExecutorStatementManager<C, ?, O>, C, O extends StorageResourceOption>
extends TypedSPI {
/**
* Build SQL execution unit.
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/ExecutorJDBCManager.java
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/ExecutorJDBCConnectionManager.java
similarity index 83%
copy from
shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/ExecutorJDBCManager.java
copy to
shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/ExecutorJDBCConnectionManager.java
index 37b2056..5f0bbee 100644
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/ExecutorJDBCManager.java
+++
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/ExecutorJDBCConnectionManager.java
@@ -17,13 +17,12 @@
package org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc;
-import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.ExecutorDriverManager;
+import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.ExecutorConnectionManager;
import java.sql.Connection;
-import java.sql.Statement;
/**
- * Executor JDBC driver manager.
+ * Executor JDBC connection manager.
*/
-public interface ExecutorJDBCManager extends ExecutorDriverManager<Connection,
Statement, StatementOption> {
+public interface ExecutorJDBCConnectionManager extends
ExecutorConnectionManager<Connection> {
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/ExecutorJDBCManager.java
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/ExecutorJDBCStatementManager.java
similarity index 84%
copy from
shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/ExecutorJDBCManager.java
copy to
shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/ExecutorJDBCStatementManager.java
index 37b2056..b5b907d 100644
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/ExecutorJDBCManager.java
+++
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/ExecutorJDBCStatementManager.java
@@ -17,13 +17,13 @@
package org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc;
-import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.ExecutorDriverManager;
+import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.ExecutorStatementManager;
import java.sql.Connection;
import java.sql.Statement;
/**
- * Executor JDBC driver manager.
+ * Executor JDBC statement manager.
*/
-public interface ExecutorJDBCManager extends ExecutorDriverManager<Connection,
Statement, StatementOption> {
+public interface ExecutorJDBCStatementManager extends
ExecutorStatementManager<Connection, Statement, StatementOption> {
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/JDBCExecutionUnitBuilder.java
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/JDBCExecutionUnitBuilder.java
index 5f97b45..387585a 100644
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/JDBCExecutionUnitBuilder.java
+++
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/JDBCExecutionUnitBuilder.java
@@ -19,7 +19,7 @@ package
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.builder
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.SQLExecutionUnitBuilder;
-import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCManager;
+import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCStatementManager;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
import java.sql.Connection;
@@ -27,5 +27,5 @@ import java.sql.Connection;
/**
* JDBC execution unit builder.
*/
-public interface JDBCExecutionUnitBuilder extends
SQLExecutionUnitBuilder<JDBCExecutionUnit, ExecutorJDBCManager, Connection,
StatementOption> {
+public interface JDBCExecutionUnitBuilder extends
SQLExecutionUnitBuilder<JDBCExecutionUnit, ExecutorJDBCStatementManager,
Connection, StatementOption> {
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/PreparedStatementExecutionUnitBuilder.java
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/PreparedStatementExecutionUnitBuilder.java
index 3fbe911..4a82a8e 100644
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/PreparedStatementExecutionUnitBuilder.java
+++
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/PreparedStatementExecutionUnitBuilder.java
@@ -20,14 +20,13 @@ package
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.builder
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
-import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCManager;
+import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCStatementManager;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
-import java.util.List;
/**
* JDBC prepared statement execution unit builder.
@@ -35,16 +34,16 @@ import java.util.List;
public final class PreparedStatementExecutionUnitBuilder implements
JDBCExecutionUnitBuilder {
@Override
- public JDBCExecutionUnit build(final ExecutionUnit executionUnit, final
ExecutorJDBCManager executorManager,
+ public JDBCExecutionUnit build(final ExecutionUnit executionUnit, final
ExecutorJDBCStatementManager statementManager,
final Connection connection, final
ConnectionMode connectionMode, final StatementOption option) throws
SQLException {
PreparedStatement preparedStatement = createPreparedStatement(
- executionUnit.getSqlUnit().getSql(),
executionUnit.getSqlUnit().getParameters(), executorManager, connection,
connectionMode, option);
+ executionUnit, statementManager, connection, connectionMode,
option);
return new JDBCExecutionUnit(executionUnit, connectionMode,
preparedStatement);
}
- private PreparedStatement createPreparedStatement(final String sql, final
List<Object> parameters, final ExecutorJDBCManager executorJDBCManager, final
Connection connection,
+ private PreparedStatement createPreparedStatement(final ExecutionUnit
executionUnit, final ExecutorJDBCStatementManager statementManager, final
Connection connection,
final ConnectionMode
connectionMode, final StatementOption option) throws SQLException {
- return (PreparedStatement)
executorJDBCManager.createStorageResource(sql, parameters, connection,
connectionMode, option);
+ return (PreparedStatement)
statementManager.createStorageResource(executionUnit, connection,
connectionMode, option);
}
@Override
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/StatementExecutionUnitBuilder.java
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/StatementExecutionUnitBuilder.java
index a551a93..7af040c 100644
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/StatementExecutionUnitBuilder.java
+++
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/StatementExecutionUnitBuilder.java
@@ -20,7 +20,7 @@ package
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.builder
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
-import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCManager;
+import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCStatementManager;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
@@ -34,14 +34,14 @@ import java.sql.Statement;
public final class StatementExecutionUnitBuilder implements
JDBCExecutionUnitBuilder {
@Override
- public JDBCExecutionUnit build(final ExecutionUnit executionUnit, final
ExecutorJDBCManager executorManager,
+ public JDBCExecutionUnit build(final ExecutionUnit executionUnit, final
ExecutorJDBCStatementManager statementManager,
final Connection connection, final
ConnectionMode connectionMode, final StatementOption option) throws
SQLException {
- return new JDBCExecutionUnit(executionUnit, connectionMode,
createStatement(executorManager, connection, connectionMode, option));
+ return new JDBCExecutionUnit(executionUnit, connectionMode,
createStatement(statementManager, connection, connectionMode, option));
}
- private Statement createStatement(final ExecutorJDBCManager
executorJDBCManager, final Connection connection,
+ private Statement createStatement(final ExecutorJDBCStatementManager
statementManager, final Connection connection,
final ConnectionMode connectionMode,
final StatementOption option) throws SQLException {
- return executorJDBCManager.createStorageResource(connection,
connectionMode, option);
+ return statementManager.createStorageResource(connection,
connectionMode, option);
}
@Override
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/ExecutorJDBCManager.java
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/ExecutorVertxConnectionManager.java
similarity index 78%
rename from
shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/ExecutorJDBCManager.java
rename to
shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/ExecutorVertxConnectionManager.java
index 37b2056..b917d84 100644
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/ExecutorJDBCManager.java
+++
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/ExecutorVertxConnectionManager.java
@@ -15,15 +15,14 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc;
+package org.apache.shardingsphere.infra.executor.sql.prepare.driver.vertx;
-import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.ExecutorDriverManager;
-
-import java.sql.Connection;
-import java.sql.Statement;
+import io.vertx.core.Future;
+import io.vertx.sqlclient.SqlClient;
+import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.ExecutorConnectionManager;
/**
- * Executor JDBC driver manager.
+ * Executor connection manager for Vert.x.
*/
-public interface ExecutorJDBCManager extends ExecutorDriverManager<Connection,
Statement, StatementOption> {
+public interface ExecutorVertxConnectionManager extends
ExecutorConnectionManager<Future<? extends SqlClient>> {
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/ExecutorVertxManager.java
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/ExecutorVertxStatementManager.java
similarity index 82%
rename from
shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/ExecutorVertxManager.java
rename to
shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/ExecutorVertxStatementManager.java
index 500c8a1..16fbc80 100644
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/ExecutorVertxManager.java
+++
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/ExecutorVertxStatementManager.java
@@ -22,10 +22,10 @@ import io.vertx.sqlclient.Query;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.SqlClient;
-import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.ExecutorDriverManager;
+import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.ExecutorStatementManager;
/**
- * Executor manager for Vert.x.
+ * Executor statement manager for Vert.x.
*/
-public interface ExecutorVertxManager extends ExecutorDriverManager<Future<?
extends SqlClient>, Future<Query<RowSet<Row>>>, VertxExecutionContext> {
+public interface ExecutorVertxStatementManager extends
ExecutorStatementManager<Future<? extends SqlClient>,
Future<Query<RowSet<Row>>>, VertxExecutionContext> {
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/builder/PreparedQueryExecutionUnitBuilder.java
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/builder/PreparedQueryExecutionUnitBuilder.java
index df0c038..8e9e2ab 100644
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/builder/PreparedQueryExecutionUnitBuilder.java
+++
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/builder/PreparedQueryExecutionUnitBuilder.java
@@ -22,7 +22,7 @@ import io.vertx.sqlclient.SqlClient;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.vertx.VertxExecutionUnit;
-import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.vertx.ExecutorVertxManager;
+import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.vertx.ExecutorVertxStatementManager;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.vertx.VertxExecutionContext;
import java.sql.SQLException;
@@ -33,8 +33,8 @@ import java.sql.SQLException;
public final class PreparedQueryExecutionUnitBuilder implements
VertxExecutionUnitBuilder {
@Override
- public VertxExecutionUnit build(final ExecutionUnit executionUnit, final
ExecutorVertxManager executorManager, final Future<? extends SqlClient>
connection, final ConnectionMode connectionMode,
- final VertxExecutionContext option) throws
SQLException {
+ public VertxExecutionUnit build(final ExecutionUnit executionUnit, final
ExecutorVertxStatementManager statementManager,
+ final Future<? extends SqlClient>
connection, final ConnectionMode connectionMode, final VertxExecutionContext
option) throws SQLException {
return new VertxExecutionUnit(executionUnit, connectionMode,
connection.compose(sqlClient ->
Future.succeededFuture(sqlClient.preparedQuery(executionUnit.getSqlUnit().getSql()))));
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/builder/VertxExecutionUnitBuilder.java
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/builder/VertxExecutionUnitBuilder.java
index 95a8f2f..80944d7 100644
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/builder/VertxExecutionUnitBuilder.java
+++
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/builder/VertxExecutionUnitBuilder.java
@@ -21,11 +21,11 @@ import io.vertx.core.Future;
import io.vertx.sqlclient.SqlClient;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.vertx.VertxExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.SQLExecutionUnitBuilder;
-import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.vertx.ExecutorVertxManager;
+import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.vertx.ExecutorVertxStatementManager;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.vertx.VertxExecutionContext;
/**
* Execution unit builder for Vert.x.
*/
-public interface VertxExecutionUnitBuilder extends
SQLExecutionUnitBuilder<VertxExecutionUnit, ExecutorVertxManager, Future<?
extends SqlClient>, VertxExecutionContext> {
+public interface VertxExecutionUnitBuilder extends
SQLExecutionUnitBuilder<VertxExecutionUnit, ExecutorVertxStatementManager,
Future<? extends SqlClient>, VertxExecutionContext> {
}
diff --git
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
index ec22fde..a97982d 100644
---
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
+++
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
@@ -22,6 +22,7 @@ import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.driver.executor.DriverExecutor;
import
org.apache.shardingsphere.driver.jdbc.adapter.executor.ForceExecuteTemplate;
import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
+import org.apache.shardingsphere.driver.jdbc.core.statement.StatementManager;
import
org.apache.shardingsphere.driver.jdbc.unsupported.AbstractUnsupportedOperationStatement;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
@@ -189,6 +190,9 @@ public abstract class AbstractStatementAdapter extends
AbstractUnsupportedOperat
if (null != getExecutor()) {
getExecutor().close();
}
+ if (null != getStatementManager()) {
+ getStatementManager().close();
+ }
} finally {
getRoutedStatements().clear();
}
@@ -208,4 +212,6 @@ public abstract class AbstractStatementAdapter extends
AbstractUnsupportedOperat
protected abstract Collection<? extends Statement> getRoutedStatements();
protected abstract DriverExecutor getExecutor();
+
+ protected abstract StatementManager getStatementManager();
}
diff --git
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ConnectionManager.java
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ConnectionManager.java
index 24b13b5..1efb82a 100644
---
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ConnectionManager.java
+++
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ConnectionManager.java
@@ -28,8 +28,7 @@ import
org.apache.shardingsphere.driver.jdbc.adapter.invocation.MethodInvocation
import
org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCreator;
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
-import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCManager;
-import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
+import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCConnectionManager;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.instance.definition.InstanceId;
import org.apache.shardingsphere.infra.instance.definition.InstanceType;
@@ -45,9 +44,7 @@ import
org.apache.shardingsphere.transaction.rule.TransactionRule;
import javax.sql.DataSource;
import java.security.SecureRandom;
import java.sql.Connection;
-import java.sql.PreparedStatement;
import java.sql.SQLException;
-import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -60,7 +57,7 @@ import java.util.Random;
/**
* Connection manager.
*/
-public final class ConnectionManager implements ExecutorJDBCManager,
AutoCloseable {
+public final class ConnectionManager implements ExecutorJDBCConnectionManager,
AutoCloseable {
private final Map<String, DataSource> dataSourceMap = new
LinkedHashMap<>();
@@ -312,20 +309,6 @@ public final class ConnectionManager implements
ExecutorJDBCManager, AutoCloseab
return physicalDataSourceMap.containsKey(dataSourceName);
}
- @SuppressWarnings("MagicConstant")
- @Override
- public Statement createStorageResource(final Connection connection, final
ConnectionMode connectionMode, final StatementOption option) throws
SQLException {
- return connection.createStatement(option.getResultSetType(),
option.getResultSetConcurrency(), option.getResultSetHoldability());
- }
-
- @SuppressWarnings("MagicConstant")
- @Override
- public PreparedStatement createStorageResource(final String sql, final
List<Object> parameters,
- final Connection
connection, final ConnectionMode connectionMode, final StatementOption option)
throws SQLException {
- return option.isReturnGeneratedKeys() ?
connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)
- : connection.prepareStatement(sql, option.getResultSetType(),
option.getResultSetConcurrency(), option.getResultSetHoldability());
- }
-
@Override
public void close() throws SQLException {
try {
diff --git
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index 74c260d..3036dd0 100644
---
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -133,14 +133,17 @@ public final class ShardingSpherePreparedStatement
extends AbstractPreparedState
private final boolean statementsCacheable;
+ private final TrafficRule trafficRule;
+
+ @Getter(AccessLevel.PROTECTED)
+ private final StatementManager statementManager;
+
private ExecutionContext executionContext;
private ResultSet currentResultSet;
private TrafficContext trafficContext;
- private TrafficRule trafficRule;
-
public ShardingSpherePreparedStatement(final ShardingSphereConnection
connection, final String sql) throws SQLException {
this(connection, sql, ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT, false);
}
@@ -182,6 +185,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
kernelProcessor = new KernelProcessor();
statementsCacheable =
isStatementsCacheable(metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getConfigurations());
trafficRule =
metaDataContexts.getGlobalRuleMetaData().findSingleRule(TrafficRule.class).orElse(null);
+ statementManager = new StatementManager();
}
private boolean isStatementsCacheable(final Collection<RuleConfiguration>
configurations) {
@@ -282,7 +286,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
createDriverExecutionPrepareEngine() {
int maxConnectionsSizePerQuery =
metaDataContexts.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
- return new
DriverExecutionPrepareEngine<>(JDBCDriverType.PREPARED_STATEMENT,
maxConnectionsSizePerQuery, connection.getConnectionManager(),
+ return new
DriverExecutionPrepareEngine<>(JDBCDriverType.PREPARED_STATEMENT,
maxConnectionsSizePerQuery, connection.getConnectionManager(),
statementManager,
statementOption,
metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules());
}
@@ -504,8 +508,8 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
}
}
- private void clearPrevious() throws SQLException {
- clearStatements();
+ private void clearPrevious() {
+ statements.clear();
parameterSets.clear();
}
@@ -561,7 +565,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
private void initBatchPreparedStatementExecutor() throws SQLException {
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = new DriverExecutionPrepareEngine<>(
JDBCDriverType.PREPARED_STATEMENT,
metaDataContexts.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY),
- connection.getConnectionManager(), statementOption,
metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules());
+ connection.getConnectionManager(), statementManager,
statementOption,
metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules());
List<ExecutionUnit> executionUnits = new
ArrayList<>(batchPreparedStatementExecutor.getBatchExecutionUnits().size());
for (BatchExecutionUnit each :
batchPreparedStatementExecutor.getBatchExecutionUnits()) {
ExecutionUnit executionUnit = each.getExecutionUnit();
@@ -615,11 +619,4 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
public Collection<PreparedStatement> getRoutedStatements() {
return statements;
}
-
- private void clearStatements() throws SQLException {
- for (Statement each : statements) {
- each.close();
- }
- statements.clear();
- }
}
diff --git
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index d3cdb60..2f0c3f1 100644
---
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -109,6 +109,11 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
private final KernelProcessor kernelProcessor;
+ private final TrafficRule trafficRule;
+
+ @Getter(AccessLevel.PROTECTED)
+ private final StatementManager statementManager;
+
private boolean returnGeneratedKeys;
private ExecutionContext executionContext;
@@ -117,8 +122,6 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
private TrafficContext trafficContext;
- private TrafficRule trafficRule;
-
public ShardingSphereStatement(final ShardingSphereConnection connection) {
this(connection, ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);
}
@@ -136,6 +139,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
executor = new DriverExecutor(connection);
kernelProcessor = new KernelProcessor();
trafficRule =
metaDataContexts.getGlobalRuleMetaData().findSingleRule(TrafficRule.class).orElse(null);
+ statementManager = new StatementManager();
}
@Override
@@ -209,7 +213,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
createDriverExecutionPrepareEngine() {
int maxConnectionsSizePerQuery =
metaDataContexts.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
- return new DriverExecutionPrepareEngine<>(JDBCDriverType.STATEMENT,
maxConnectionsSizePerQuery, connection.getConnectionManager(),
+ return new DriverExecutionPrepareEngine<>(JDBCDriverType.STATEMENT,
maxConnectionsSizePerQuery, connection.getConnectionManager(), statementManager,
statementOption,
metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules());
}
diff --git
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/StatementManager.java
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/StatementManager.java
new file mode 100644
index 0000000..53c1149
--- /dev/null
+++
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/StatementManager.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.driver.jdbc.core.statement;
+
+import lombok.EqualsAndHashCode;
+import lombok.RequiredArgsConstructor;
+import
org.apache.shardingsphere.driver.jdbc.adapter.executor.ForceExecuteTemplate;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
+import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCStatementManager;
+import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Statement manager.
+ */
+public final class StatementManager implements ExecutorJDBCStatementManager,
AutoCloseable {
+
+ private final Map<CacheKey, Statement> cachedStatements = new
ConcurrentHashMap<>();
+
+ private final ForceExecuteTemplate<Statement> forceExecuteTemplate = new
ForceExecuteTemplate<>();
+
+ @SuppressWarnings("MagicConstant")
+ @Override
+ public Statement createStorageResource(final Connection connection, final
ConnectionMode connectionMode, final StatementOption option) throws
SQLException {
+ return connection.createStatement(option.getResultSetType(),
option.getResultSetConcurrency(), option.getResultSetHoldability());
+ }
+
+ @SuppressWarnings("MagicConstant")
+ @Override
+ public Statement createStorageResource(final ExecutionUnit executionUnit,
final Connection connection, final ConnectionMode connectionMode, final
StatementOption option) throws SQLException {
+ Statement result = cachedStatements.get(new CacheKey(executionUnit,
connectionMode));
+ if (null == result) {
+ String sql = executionUnit.getSqlUnit().getSql();
+ result = option.isReturnGeneratedKeys() ?
connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)
+ : connection.prepareStatement(sql,
option.getResultSetType(), option.getResultSetConcurrency(),
option.getResultSetHoldability());
+ cachedStatements.put(new CacheKey(executionUnit, connectionMode),
result);
+ }
+ return result;
+ }
+
+ @Override
+ public void close() throws SQLException {
+ try {
+ forceExecuteTemplate.execute(cachedStatements.values(),
Statement::close);
+ } finally {
+ cachedStatements.clear();
+ }
+ }
+
+ @RequiredArgsConstructor
+ @EqualsAndHashCode
+ private static final class CacheKey {
+
+ private final ExecutionUnit executionUnit;
+
+ private final ConnectionMode connectionMode;
+ }
+}
diff --git
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
index 86e5ee7..0948738 100644
---
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
+++
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
@@ -19,6 +19,7 @@ package
org.apache.shardingsphere.driver.state.circuit.statement;
import lombok.Getter;
import org.apache.shardingsphere.driver.executor.DriverExecutor;
+import org.apache.shardingsphere.driver.jdbc.core.statement.StatementManager;
import
org.apache.shardingsphere.driver.jdbc.unsupported.AbstractUnsupportedOperationPreparedStatement;
import
org.apache.shardingsphere.driver.state.circuit.connection.CircuitBreakerConnection;
import
org.apache.shardingsphere.driver.state.circuit.resultset.CircuitBreakerResultSet;
@@ -281,6 +282,11 @@ public final class CircuitBreakerPreparedStatement extends
AbstractUnsupportedOp
}
@Override
+ protected StatementManager getStatementManager() {
+ return null;
+ }
+
+ @Override
public ResultSet executeQuery() {
return new CircuitBreakerResultSet();
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
index 8fb29ae..a86df4f 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
@@ -39,6 +39,7 @@ import
org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.JDBCDatabaseCommunicationEngine;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.ProxyJDBCExecutor;
+import
org.apache.shardingsphere.proxy.backend.communication.jdbc.statement.JDBCBackendStatement;
import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import
org.apache.shardingsphere.proxy.backend.exception.TableModifyInTransactionException;
@@ -149,8 +150,9 @@ public final class ProxySQLExecutor {
private List<ExecuteResult> useDriverToExecute(final ExecutionContext
executionContext, final Collection<ShardingSphereRule> rules,
final int
maxConnectionsSizePerQuery, final boolean isReturnGeneratedKeys, final boolean
isExceptionThrown) throws SQLException {
+ JDBCBackendStatement statementManager = (JDBCBackendStatement)
backendConnection.getConnectionSession().getStatementManager();
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = new DriverExecutionPrepareEngine<>(
- type, maxConnectionsSizePerQuery, backendConnection, new
StatementOption(isReturnGeneratedKeys), rules);
+ type, maxConnectionsSizePerQuery, backendConnection,
statementManager, new StatementOption(isReturnGeneratedKeys), rules);
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext;
try {
executionGroupContext =
prepareEngine.prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits());
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ReactiveProxySQLExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ReactiveProxySQLExecutor.java
index 00e1764..a5b5ff3 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ReactiveProxySQLExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ReactiveProxySQLExecutor.java
@@ -31,6 +31,7 @@ import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecuti
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.vertx.VertxExecutionContext;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import
org.apache.shardingsphere.proxy.backend.communication.vertx.VertxBackendConnection;
+import
org.apache.shardingsphere.proxy.backend.communication.vertx.VertxBackendStatement;
import
org.apache.shardingsphere.proxy.backend.communication.vertx.executor.ProxyReactiveExecutor;
import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
@@ -103,8 +104,9 @@ public final class ReactiveProxySQLExecutor {
private Future<List<ExecuteResult>> useDriverToExecute(final
ExecutionContext executionContext, final Collection<ShardingSphereRule> rules,
final int
maxConnectionsSizePerQuery) throws SQLException {
+ VertxBackendStatement statementManager = (VertxBackendStatement)
backendConnection.getConnectionSession().getStatementManager();
DriverExecutionPrepareEngine<VertxExecutionUnit, Future<? extends
SqlClient>> prepareEngine = new DriverExecutionPrepareEngine<>(
- TYPE, maxConnectionsSizePerQuery, backendConnection, new
VertxExecutionContext(), rules);
+ TYPE, maxConnectionsSizePerQuery, backendConnection,
statementManager, new VertxExecutionContext(), rules);
ExecutionGroupContext<VertxExecutionUnit> executionGroupContext;
try {
executionGroupContext =
prepareEngine.prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits());
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
index 4ba4bbd..c003eb4 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
@@ -42,12 +42,13 @@ import
org.apache.shardingsphere.proxy.backend.communication.ProxySQLExecutor;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.ProxyJDBCExecutorCallback;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.ProxyJDBCExecutorCallbackFactory;
+import
org.apache.shardingsphere.proxy.backend.communication.jdbc.statement.JDBCBackendStatement;
import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
+import
org.apache.shardingsphere.proxy.backend.response.header.query.QueryHeaderBuilder;
import
org.apache.shardingsphere.proxy.backend.response.header.query.QueryHeaderBuilderFactory;
import
org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
-import
org.apache.shardingsphere.proxy.backend.response.header.query.QueryHeaderBuilder;
import
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
import
org.apache.shardingsphere.sharding.merge.dql.iterator.IteratorStreamMergedResult;
import
org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLInsertStatement;
@@ -147,8 +148,9 @@ public final class JDBCDatabaseCommunicationEngine extends
DatabaseCommunication
private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
createDriverExecutionPrepareEngine(final boolean isReturnGeneratedKeys, final
MetaDataContexts metaData) {
int maxConnectionsSizePerQuery =
metaData.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
- return new DriverExecutionPrepareEngine<>(getDriverType(),
maxConnectionsSizePerQuery, backendConnection, new
StatementOption(isReturnGeneratedKeys),
-
metaData.getMetaData(backendConnection.getConnectionSession().getSchemaName()).getRuleMetaData().getRules());
+ JDBCBackendStatement statementManager = (JDBCBackendStatement)
backendConnection.getConnectionSession().getStatementManager();
+ return new DriverExecutionPrepareEngine<>(getDriverType(),
maxConnectionsSizePerQuery, backendConnection, statementManager,
+ new StatementOption(isReturnGeneratedKeys),
metaData.getMetaData(backendConnection.getConnectionSession().getSchemaName()).getRuleMetaData().getRules());
}
private ResponseHeader processExecuteFederation(final ResultSet resultSet,
final MetaDataContexts metaDataContexts) throws SQLException {
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnection.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnection.java
index 9110961..e09223f 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnection.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnection.java
@@ -22,34 +22,25 @@ import com.google.common.collect.LinkedHashMultimap;
import com.google.common.collect.Multimap;
import lombok.Getter;
import lombok.Setter;
-import
org.apache.shardingsphere.db.protocol.parameter.TypeUnspecifiedSQLParameter;
-import org.apache.shardingsphere.infra.database.type.DatabaseType;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
-import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCManager;
-import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
+import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCConnectionManager;
import org.apache.shardingsphere.infra.federation.executor.FederationExecutor;
import org.apache.shardingsphere.proxy.backend.communication.BackendConnection;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.JDBCDatabaseCommunicationEngine;
-import
org.apache.shardingsphere.proxy.backend.communication.jdbc.statement.StatementMemoryStrictlyFetchSizeSetter;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.transaction.JDBCBackendTransactionManager;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import
org.apache.shardingsphere.proxy.backend.exception.BackendConnectionException;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import org.apache.shardingsphere.proxy.backend.util.TransactionUtil;
-import org.apache.shardingsphere.spi.singleton.SingletonSPIRegistry;
import org.apache.shardingsphere.transaction.core.TransactionType;
import java.sql.Connection;
-import java.sql.PreparedStatement;
import java.sql.SQLException;
-import java.sql.Statement;
-import java.sql.Types;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -57,7 +48,7 @@ import java.util.concurrent.ConcurrentHashMap;
*/
@Getter
@Setter
-public final class JDBCBackendConnection implements BackendConnection<Void>,
ExecutorJDBCManager {
+public final class JDBCBackendConnection implements BackendConnection<Void>,
ExecutorJDBCConnectionManager {
private final ConnectionSession connectionSession;
@@ -75,11 +66,8 @@ public final class JDBCBackendConnection implements
BackendConnection<Void>, Exe
private final ConnectionStatus connectionStatus = new ConnectionStatus();
- private final Map<String, StatementMemoryStrictlyFetchSizeSetter>
fetchSizeSetters;
-
public JDBCBackendConnection(final ConnectionSession connectionSession) {
this.connectionSession = connectionSession;
- fetchSizeSetters =
SingletonSPIRegistry.getTypedSingletonInstancesMap(StatementMemoryStrictlyFetchSizeSetter.class);
}
@Override
@@ -140,41 +128,6 @@ public final class JDBCBackendConnection implements
BackendConnection<Void>, Exe
}
}
- @Override
- public Statement createStorageResource(final Connection connection, final
ConnectionMode connectionMode, final StatementOption option) throws
SQLException {
- Statement result = connection.createStatement();
- if (ConnectionMode.MEMORY_STRICTLY == connectionMode) {
- setFetchSize(result);
- }
- return result;
- }
-
- @Override
- public PreparedStatement createStorageResource(final String sql, final
List<Object> parameters,
- final Connection
connection, final ConnectionMode connectionMode, final StatementOption option)
throws SQLException {
- PreparedStatement result = option.isReturnGeneratedKeys()
- ? connection.prepareStatement(sql,
Statement.RETURN_GENERATED_KEYS) : connection.prepareStatement(sql);
- for (int i = 0; i < parameters.size(); i++) {
- Object parameter = parameters.get(i);
- if (parameter instanceof TypeUnspecifiedSQLParameter) {
- result.setObject(i + 1, parameter, Types.OTHER);
- } else {
- result.setObject(i + 1, parameter);
- }
- }
- if (ConnectionMode.MEMORY_STRICTLY == connectionMode) {
- setFetchSize(result);
- }
- return result;
- }
-
- private void setFetchSize(final Statement statement) throws SQLException {
- DatabaseType databaseType =
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData(connectionSession.getSchemaName()).getResource().getDatabaseType();
- if (fetchSizeSetters.containsKey(databaseType.getName())) {
-
fetchSizeSetters.get(databaseType.getName()).setFetchSize(statement);
- }
- }
-
/**
* Whether execute SQL serial or not.
*
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/statement/JDBCBackendStatement.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/statement/JDBCBackendStatement.java
new file mode 100644
index 0000000..acd9e4b
--- /dev/null
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/statement/JDBCBackendStatement.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.backend.communication.jdbc.statement;
+
+import lombok.Getter;
+import lombok.Setter;
+import
org.apache.shardingsphere.db.protocol.parameter.TypeUnspecifiedSQLParameter;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
+import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCStatementManager;
+import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
+import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+import org.apache.shardingsphere.spi.singleton.SingletonSPIRegistry;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * JDBC backend statement.
+ */
+@Getter
+@Setter
+public final class JDBCBackendStatement implements
ExecutorJDBCStatementManager {
+
+ private final Map<String, StatementMemoryStrictlyFetchSizeSetter>
fetchSizeSetters;
+
+ private String schemaName;
+
+ public JDBCBackendStatement() {
+ fetchSizeSetters =
SingletonSPIRegistry.getTypedSingletonInstancesMap(StatementMemoryStrictlyFetchSizeSetter.class);
+ }
+
+ @Override
+ public Statement createStorageResource(final Connection connection, final
ConnectionMode connectionMode, final StatementOption option) throws
SQLException {
+ Statement result = connection.createStatement();
+ if (ConnectionMode.MEMORY_STRICTLY == connectionMode) {
+ setFetchSize(result);
+ }
+ return result;
+ }
+
+ @Override
+ public Statement createStorageResource(final ExecutionUnit executionUnit,
final Connection connection, final ConnectionMode connectionMode, final
StatementOption option) throws SQLException {
+ String sql = executionUnit.getSqlUnit().getSql();
+ List<Object> parameters = executionUnit.getSqlUnit().getParameters();
+ PreparedStatement result = option.isReturnGeneratedKeys()
+ ?
connection.prepareStatement(executionUnit.getSqlUnit().getSql(),
Statement.RETURN_GENERATED_KEYS) : connection.prepareStatement(sql);
+ for (int i = 0; i < parameters.size(); i++) {
+ Object parameter = parameters.get(i);
+ if (parameter instanceof TypeUnspecifiedSQLParameter) {
+ result.setObject(i + 1, parameter, Types.OTHER);
+ } else {
+ result.setObject(i + 1, parameter);
+ }
+ }
+ if (ConnectionMode.MEMORY_STRICTLY == connectionMode) {
+ setFetchSize(result);
+ }
+ return result;
+ }
+
+ private void setFetchSize(final Statement statement) throws SQLException {
+ DatabaseType databaseType =
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData(schemaName).getResource().getDatabaseType();
+ if (fetchSizeSetters.containsKey(databaseType.getName())) {
+
fetchSizeSetters.get(databaseType.getName()).setFetchSize(statement);
+ }
+ }
+}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/vertx/VertxBackendConnection.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/vertx/VertxBackendConnection.java
index 7e09d3f..c12ff55 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/vertx/VertxBackendConnection.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/vertx/VertxBackendConnection.java
@@ -22,15 +22,11 @@ import com.google.common.collect.LinkedHashMultimap;
import com.google.common.collect.Multimap;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
-import io.vertx.sqlclient.Query;
-import io.vertx.sqlclient.Row;
-import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.SqlClient;
import io.vertx.sqlclient.SqlConnection;
import lombok.Getter;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
-import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.vertx.ExecutorVertxManager;
-import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.vertx.VertxExecutionContext;
+import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.vertx.ExecutorVertxConnectionManager;
import org.apache.shardingsphere.proxy.backend.communication.BackendConnection;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.ConnectionPostProcessor;
import
org.apache.shardingsphere.proxy.backend.communication.vertx.transaction.VertxLocalTransactionManager;
@@ -48,7 +44,7 @@ import java.util.List;
* Vert.x backend connection.
*/
@Getter
-public final class VertxBackendConnection implements
BackendConnection<Future<Void>>, ExecutorVertxManager {
+public final class VertxBackendConnection implements
BackendConnection<Future<Void>>, ExecutorVertxConnectionManager {
private final ConnectionSession connectionSession;
@@ -117,17 +113,6 @@ public final class VertxBackendConnection implements
BackendConnection<Future<Vo
}
@Override
- public Future<Query<RowSet<Row>>> createStorageResource(final Future<?
extends SqlClient> connection, final ConnectionMode connectionMode, final
VertxExecutionContext option) {
- return Future.failedFuture(new UnsupportedOperationException("Vert.x
query is not like JDBC statement."));
- }
-
- @Override
- public Future<Query<RowSet<Row>>> createStorageResource(final String sql,
final List<Object> parameters, final Future<? extends SqlClient> connection,
final ConnectionMode connectionMode,
- final
VertxExecutionContext ignored) {
- return Future.failedFuture(new UnsupportedOperationException("Vert.x
prepared query is not like JDBC prepared statement."));
- }
-
- @Override
public Future<Void> prepareForTaskExecution() {
if (!connectionSession.isAutoCommit() &&
!connectionSession.getTransactionStatus().isInTransaction()) {
VertxLocalTransactionManager transactionManager = new
VertxLocalTransactionManager(this);
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/builder/PreparedQueryExecutionUnitBuilder.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/vertx/VertxBackendStatement.java
similarity index 53%
copy from
shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/builder/PreparedQueryExecutionUnitBuilder.java
copy to
shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/vertx/VertxBackendStatement.java
index df0c038..52f1b15 100644
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/vertx/builder/PreparedQueryExecutionUnitBuilder.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/vertx/VertxBackendStatement.java
@@ -15,31 +15,35 @@
* limitations under the License.
*/
-package
org.apache.shardingsphere.infra.executor.sql.prepare.driver.vertx.builder;
+package org.apache.shardingsphere.proxy.backend.communication.vertx;
import io.vertx.core.Future;
+import io.vertx.sqlclient.Query;
+import io.vertx.sqlclient.Row;
+import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.SqlClient;
+import lombok.Getter;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.vertx.VertxExecutionUnit;
-import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.vertx.ExecutorVertxManager;
+import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.vertx.ExecutorVertxStatementManager;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.vertx.VertxExecutionContext;
import java.sql.SQLException;
/**
- * Execution unit builder using Vert.x prepared query.
+ * Vert.x backend statement.
*/
-public final class PreparedQueryExecutionUnitBuilder implements
VertxExecutionUnitBuilder {
+@Getter
+public final class VertxBackendStatement implements
ExecutorVertxStatementManager {
@Override
- public VertxExecutionUnit build(final ExecutionUnit executionUnit, final
ExecutorVertxManager executorManager, final Future<? extends SqlClient>
connection, final ConnectionMode connectionMode,
- final VertxExecutionContext option) throws
SQLException {
- return new VertxExecutionUnit(executionUnit, connectionMode,
connection.compose(sqlClient ->
Future.succeededFuture(sqlClient.preparedQuery(executionUnit.getSqlUnit().getSql()))));
+ public Future<Query<RowSet<Row>>> createStorageResource(final Future<?
extends SqlClient> connection, final ConnectionMode connectionMode, final
VertxExecutionContext option) throws SQLException {
+ return Future.failedFuture(new UnsupportedOperationException("Vert.x
query is not like JDBC statement."));
}
@Override
- public String getType() {
- return "Vert.x";
+ public Future<Query<RowSet<Row>>> createStorageResource(final
ExecutionUnit executionUnit, final Future<? extends SqlClient> connection,
+ final
ConnectionMode connectionMode, final VertxExecutionContext option) throws
SQLException {
+ return Future.failedFuture(new UnsupportedOperationException("Vert.x
prepared query is not like JDBC prepared statement."));
}
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java
index 81fc944..cb7bcb7 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java
@@ -23,11 +23,14 @@ import lombok.Getter;
import lombok.Setter;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.exception.ShardingSphereException;
+import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.ExecutorStatementManager;
import org.apache.shardingsphere.infra.metadata.user.Grantee;
import org.apache.shardingsphere.proxy.backend.communication.BackendConnection;
import
org.apache.shardingsphere.proxy.backend.communication.SQLStatementSchemaHolder;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
+import
org.apache.shardingsphere.proxy.backend.communication.jdbc.statement.JDBCBackendStatement;
import
org.apache.shardingsphere.proxy.backend.communication.vertx.VertxBackendConnection;
+import
org.apache.shardingsphere.proxy.backend.communication.vertx.VertxBackendStatement;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import
org.apache.shardingsphere.proxy.backend.session.transaction.TransactionStatus;
import
org.apache.shardingsphere.sql.parser.sql.common.constant.TransactionIsolationLevel;
@@ -65,10 +68,13 @@ public final class ConnectionSession {
private final BackendConnection backendConnection;
+ private final ExecutorStatementManager statementManager;
+
public ConnectionSession(final TransactionType initialTransactionType,
final AttributeMap attributeMap) {
transactionStatus = new TransactionStatus(initialTransactionType);
this.attributeMap = attributeMap;
backendConnection = determineBackendConnection();
+ statementManager = determineStatementManager();
}
private BackendConnection determineBackendConnection() {
@@ -76,6 +82,11 @@ public final class ConnectionSession {
return "ExperimentalVertx".equals(proxyBackendDriverType) ? new
VertxBackendConnection(this) : new JDBCBackendConnection(this);
}
+ private ExecutorStatementManager determineStatementManager() {
+ String proxyBackendDriverType =
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getProps().getValue(ConfigurationPropertyKey.PROXY_BACKEND_DRIVER_TYPE);
+ return "ExperimentalVertx".equals(proxyBackendDriverType) ? new
VertxBackendStatement() : new JDBCBackendStatement();
+ }
+
/**
* Change schema of current channel.
*
@@ -88,6 +99,9 @@ public final class ConnectionSession {
if (transactionStatus.isInTransaction()) {
throw new ShardingSphereException("Failed to switch schema, please
terminate current transaction.");
}
+ if (statementManager instanceof JDBCBackendStatement) {
+ ((JDBCBackendStatement)
statementManager).setSchemaName(schemaName);
+ }
this.schemaName = schemaName;
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/advanced/PreviewDistSQLBackendHandler.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/advanced/PreviewDistSQLBackendHandler.java
index fccb9fd..e96f2ac 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/advanced/PreviewDistSQLBackendHandler.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/advanced/PreviewDistSQLBackendHandler.java
@@ -50,6 +50,7 @@ import
org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.parser.rule.SQLParserRule;
import
org.apache.shardingsphere.proxy.backend.communication.SQLStatementSchemaHolder;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
+import
org.apache.shardingsphere.proxy.backend.communication.jdbc.statement.JDBCBackendStatement;
import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import
org.apache.shardingsphere.proxy.backend.exception.NoDatabaseSelectedException;
@@ -150,8 +151,8 @@ public final class PreviewDistSQLBackendHandler extends
QueryableRALBackendHandl
private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
createDriverExecutionPrepareEngine(final boolean isReturnGeneratedKeys, final
MetaDataContexts metaData) {
int maxConnectionsSizePerQuery =
metaData.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
- return new DriverExecutionPrepareEngine<>(JDBCDriverType.STATEMENT,
maxConnectionsSizePerQuery, (JDBCBackendConnection)
connectionSession.getBackendConnection(),
- new StatementOption(isReturnGeneratedKeys),
metaData.getMetaData(getSchemaName()).getRuleMetaData().getRules());
+ return new DriverExecutionPrepareEngine<>(JDBCDriverType.STATEMENT,
maxConnectionsSizePerQuery, (JDBCBackendConnection)
connectionSession.getBackendConnection(),
+ (JDBCBackendStatement)
connectionSession.getStatementManager(), new
StatementOption(isReturnGeneratedKeys),
metaData.getMetaData(getSchemaName()).getRuleMetaData().getRules());
}
private String getSchemaName() {
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnectionTest.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnectionTest.java
index 026b202..7f3d93e 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnectionTest.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnectionTest.java
@@ -31,6 +31,7 @@ import
org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.JDBCDatabaseCommunicationEngine;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.datasource.JDBCBackendDataSource;
+import
org.apache.shardingsphere.proxy.backend.communication.jdbc.statement.JDBCBackendStatement;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import
org.apache.shardingsphere.proxy.backend.exception.BackendConnectionException;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
@@ -47,8 +48,6 @@ import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
@@ -97,6 +96,9 @@ public final class JDBCBackendConnectionTest {
backendConnection = spy(new JDBCBackendConnection(connectionSession));
when(connectionSession.getBackendConnection()).thenReturn(backendConnection);
when(connectionSession.getTransactionStatus()).thenReturn(new
TransactionStatus(TransactionType.LOCAL));
+ JDBCBackendStatement backendStatement = new JDBCBackendStatement();
+ backendStatement.setSchemaName(connectionSession.getSchemaName());
+
when(connectionSession.getStatementManager()).thenReturn(backendStatement);
}
private void setContextManager() throws ReflectiveOperationException {
@@ -248,15 +250,6 @@ public final class JDBCBackendConnectionTest {
}
@Test
- public void assertSetFetchSizeAsExpected() throws
InvocationTargetException, IllegalAccessException, NoSuchMethodException,
SQLException {
- Statement statement = mock(Statement.class);
- Method setFetchSizeMethod =
JDBCBackendConnection.class.getDeclaredMethod("setFetchSize", Statement.class);
- setFetchSizeMethod.setAccessible(true);
- setFetchSizeMethod.invoke(backendConnection, statement);
- verify(statement, times(1)).setFetchSize(Integer.MIN_VALUE);
- }
-
- @Test
public void assertCloseConnectionsCorrectlyWhenNotForceRollback() throws
NoSuchFieldException, IllegalAccessException, SQLException {
Field field =
JDBCBackendConnection.class.getDeclaredField("cachedConnections");
field.setAccessible(true);
@@ -306,7 +299,8 @@ public final class JDBCBackendConnectionTest {
Connection connection = mock(Connection.class);
Statement statement = mock(Statement.class);
when(connection.createStatement()).thenReturn(statement);
- assertThat(backendConnection.createStorageResource(connection,
ConnectionMode.MEMORY_STRICTLY, null), is(statement));
+ JDBCBackendStatement backendStatement = (JDBCBackendStatement)
connectionSession.getStatementManager();
+ assertThat(backendStatement.createStorageResource(connection,
ConnectionMode.MEMORY_STRICTLY, null), is(statement));
verify(connection, times(1)).createStatement();
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-opengauss/src/test/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/query/extended/bind/OpenGaussComBatchBindExecutorTest.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-opengauss/src/test/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/query/extended/bind/OpenGaussComBatchBindExecutorTest.java
index 3bffd38..961585e 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-opengauss/src/test/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/query/extended/bind/OpenGaussComBatchBindExecutorTest.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-opengauss/src/test/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/query/extended/bind/OpenGaussComBatchBindExecutorTest.java
@@ -23,6 +23,7 @@ import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.ext
import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.extended.bind.PostgreSQLBindCompletePacket;
import
org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLCommandCompletePacket;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
@@ -30,6 +31,7 @@ import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.parser.rule.SQLParserRule;
import
org.apache.shardingsphere.parser.rule.builder.DefaultSQLParserRuleConfigurationBuilder;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
+import
org.apache.shardingsphere.proxy.backend.communication.jdbc.statement.JDBCBackendStatement;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
@@ -50,7 +52,6 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyList;
-import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
@@ -86,7 +87,9 @@ public final class OpenGaussComBatchBindExecutorTest {
when(backendConnection.getConnections(nullable(String.class),
anyInt(),
any(ConnectionMode.class))).thenReturn(Collections.singletonList(connection));
PreparedStatement preparedStatement = mock(PreparedStatement.class);
when(preparedStatement.getConnection()).thenReturn(connection);
- when(backendConnection.createStorageResource(anyString(), anyList(),
any(Connection.class), any(ConnectionMode.class),
any(StatementOption.class))).thenReturn(preparedStatement);
+ JDBCBackendStatement backendStatement =
mock(JDBCBackendStatement.class);
+ when(backendStatement.createStorageResource(any(ExecutionUnit.class),
any(Connection.class), any(ConnectionMode.class),
any(StatementOption.class))).thenReturn(preparedStatement);
+
when(connectionSession.getStatementManager()).thenReturn(backendStatement);
when(connectionSession.getBackendConnection()).thenReturn(backendConnection);
PostgreSQLPreparedStatementRegistry.getInstance().register(connectionId);
SQLStatement sqlStatement = SQL_PARSER_ENGINE.parse("insert into bmsql
(id) values (?)", false);
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedInsertsExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedInsertsExecutor.java
index 4672f76..914da0a 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedInsertsExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedInsertsExecutor.java
@@ -41,6 +41,7 @@ import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.Statemen
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
+import
org.apache.shardingsphere.proxy.backend.communication.jdbc.statement.JDBCBackendStatement;
import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
@@ -123,7 +124,7 @@ public final class PostgreSQLBatchedInsertsExecutor {
Collection<ShardingSphereRule> rules =
metaDataContexts.getMetaData(connectionSession.getSchemaName()).getRuleMetaData().getRules();
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = new DriverExecutionPrepareEngine<>(
JDBCDriverType.PREPARED_STATEMENT,
metaDataContexts.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY),
- (JDBCBackendConnection)
connectionSession.getBackendConnection(), new StatementOption(false), rules);
+ (JDBCBackendConnection)
connectionSession.getBackendConnection(), (JDBCBackendStatement)
connectionSession.getStatementManager(), new StatementOption(false), rules);
executionGroupContext =
prepareEngine.prepare(anyExecutionContext.getRouteContext(),
executionUnitParameters.keySet());
for (ExecutionGroup<JDBCExecutionUnit> eachGroup :
executionGroupContext.getInputGroups()) {
for (JDBCExecutionUnit each : eachGroup.getInputs()) {
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLAggregatedBatchedInsertsCommandExecutorTest.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLAggregatedBatchedInsertsCommandExecutorTest.java
index 4410970..f493fca 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLAggregatedBatchedInsertsCommandExecutorTest.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLAggregatedBatchedInsertsCommandExecutorTest.java
@@ -28,6 +28,7 @@ import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.ext
import
org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.extended.execute.PostgreSQLComExecutePacket;
import
org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLCommandCompletePacket;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
@@ -35,6 +36,7 @@ import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.parser.rule.SQLParserRule;
import
org.apache.shardingsphere.parser.rule.builder.DefaultSQLParserRuleConfigurationBuilder;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
+import
org.apache.shardingsphere.proxy.backend.communication.jdbc.statement.JDBCBackendStatement;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import org.junit.After;
@@ -53,8 +55,6 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyList;
-import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
@@ -96,7 +96,9 @@ public final class
PostgreSQLAggregatedBatchedInsertsCommandExecutorTest {
when(backendConnection.getConnections(nullable(String.class),
anyInt(),
any(ConnectionMode.class))).thenReturn(Collections.singletonList(connection));
PreparedStatement preparedStatement = mock(PreparedStatement.class);
when(preparedStatement.getConnection()).thenReturn(connection);
- when(backendConnection.createStorageResource(anyString(), anyList(),
any(Connection.class), any(ConnectionMode.class),
any(StatementOption.class))).thenReturn(preparedStatement);
+ JDBCBackendStatement backendStatement =
mock(JDBCBackendStatement.class);
+ when(backendStatement.createStorageResource(any(ExecutionUnit.class),
any(Connection.class), any(ConnectionMode.class),
any(StatementOption.class))).thenReturn(preparedStatement);
+
when(connectionSession.getStatementManager()).thenReturn(backendStatement);
when(connectionSession.getBackendConnection()).thenReturn(backendConnection);
PostgreSQLAggregatedBatchedInsertsCommandExecutor
batchedInsertsCommandExecutor = new
PostgreSQLAggregatedBatchedInsertsCommandExecutor(connectionSession,
preparePackets());
List<DatabasePacket<?>> actualPackets = new
ArrayList<>(batchedInsertsCommandExecutor.execute());