This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 410f2296fee Refactor ShardingSphereStatement (#31395)
410f2296fee is described below
commit 410f2296feeac13118c7c703c100b0321d62d682
Author: Liang Zhang <[email protected]>
AuthorDate: Sun May 26 13:47:28 2024 +0800
Refactor ShardingSphereStatement (#31395)
* Refactor ShardingSphereStatement
* Refactor AbstractUnsupportedOperationStatement
* Refactor AbstractUnsupportedOperationStatement
---
.../jdbc/adapter/AbstractStatementAdapter.java | 24 +++++++
.../core/statement/ShardingSphereStatement.java | 81 ++++++++++------------
.../AbstractUnsupportedOperationStatement.java | 6 +-
.../UnsupportedOperationStatementTest.java | 10 ---
.../traffic/executor/TrafficExecutor.java | 18 ++---
5 files changed, 69 insertions(+), 70 deletions(-)
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
index 0d2ed0a4a78..007ad0dcbe6 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
@@ -59,6 +59,8 @@ public abstract class AbstractStatementAdapter extends
AbstractUnsupportedOperat
private boolean closed;
+ private boolean closeOnCompletion;
+
protected final boolean isNeedImplicitCommitTransaction(final
ShardingSphereConnection connection, final SQLStatement sqlStatement, final
boolean multiExecutionUnits) {
if (!connection.getAutoCommit()) {
return false;
@@ -229,6 +231,28 @@ public abstract class AbstractStatementAdapter extends
AbstractUnsupportedOperat
public final void clearWarnings() {
}
+ @Override
+ public void closeOnCompletion() {
+ closeOnCompletion = true;
+ }
+
+ @Override
+ public boolean isCloseOnCompletion() {
+ return closeOnCompletion;
+ }
+
+ @Override
+ public void setCursorName(final String name) throws SQLException {
+ if (isTransparent()) {
+ getRoutedStatements().iterator().next().setCursorName(name);
+ }
+ super.setCursorName(name);
+ }
+
+ private boolean isTransparent() {
+ return 1 == getRoutedStatements().size();
+ }
+
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
public final void cancel() throws SQLException {
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index d2ec9025c90..58905a7565a 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -108,6 +108,8 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
private final MetaDataContexts metaDataContexts;
+ private String databaseName;
+
private final List<Statement> statements;
private final StatementOption statementOption;
@@ -130,12 +132,6 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
private ResultSet currentResultSet;
- private String trafficInstanceId;
-
- private boolean useFederation;
-
- private String databaseName;
-
public ShardingSphereStatement(final ShardingSphereConnection connection) {
this(connection, ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);
}
@@ -147,6 +143,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
public ShardingSphereStatement(final ShardingSphereConnection connection,
final int resultSetType, final int resultSetConcurrency, final int
resultSetHoldability) {
this.connection = connection;
metaDataContexts =
connection.getContextManager().getMetaDataContexts();
+ databaseName = connection.getDatabaseName();
statements = new LinkedList<>();
statementOption = new StatementOption(resultSetType,
resultSetConcurrency, resultSetHoldability);
executor = new DriverExecutor(connection);
@@ -154,7 +151,6 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
trafficRule =
metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(TrafficRule.class);
statementManager = new StatementManager();
batchStatementExecutor = new BatchStatementExecutor(this);
- databaseName = connection.getDatabaseName();
}
@Override
@@ -166,13 +162,16 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
handleAutoCommit(queryContext);
databaseName =
queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName());
connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
- trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null);
+ String trafficInstanceId =
getInstanceIdAndSet(queryContext).orElse(null);
if (null != trafficInstanceId) {
- return
executor.getTrafficExecutor().execute(createTrafficExecutionUnit(trafficInstanceId,
queryContext), Statement::executeQuery);
+ result =
executor.getTrafficExecutor().execute(createTrafficExecutionUnit(trafficInstanceId,
queryContext), Statement::executeQuery);
+ currentResultSet = result;
+ return result;
}
- useFederation = decide(queryContext,
metaDataContexts.getMetaData().getDatabase(databaseName),
metaDataContexts.getMetaData().getGlobalRuleMetaData());
- if (useFederation) {
- return executeFederationQuery(queryContext);
+ if (decide(queryContext,
metaDataContexts.getMetaData().getDatabase(databaseName),
metaDataContexts.getMetaData().getGlobalRuleMetaData())) {
+ result = executeFederationQuery(queryContext);
+ currentResultSet = result;
+ return result;
}
executionContext = createExecutionContext(queryContext);
result = doExecuteQuery(executionContext);
@@ -181,8 +180,6 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
// CHECKSTYLE:ON
handleExceptionInTransaction(connection, metaDataContexts);
throw SQLExceptionTransformEngine.toSQLException(ex,
metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
- } finally {
- currentResultSet = null;
}
currentResultSet = result;
return result;
@@ -320,7 +317,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
handleAutoCommit(queryContext);
databaseName =
queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName());
connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
- trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null);
+ String trafficInstanceId =
getInstanceIdAndSet(queryContext).orElse(null);
if (null != trafficInstanceId) {
JDBCExecutionUnit executionUnit =
createTrafficExecutionUnit(trafficInstanceId, queryContext);
return executor.getTrafficExecutor().execute(executionUnit,
trafficCallback);
@@ -421,30 +418,29 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
}
private boolean execute0(final String sql, final ExecuteCallback
executeCallback, final TrafficExecutorCallback<Boolean> trafficCallback) throws
SQLException {
- try {
- QueryContext queryContext = createQueryContext(sql);
- handleAutoCommit(queryContext);
- databaseName =
queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName());
-
connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
- trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null);
- if (null != trafficInstanceId) {
- JDBCExecutionUnit executionUnit =
createTrafficExecutionUnit(trafficInstanceId, queryContext);
- return executor.getTrafficExecutor().execute(executionUnit,
trafficCallback);
- }
- useFederation = decide(queryContext,
metaDataContexts.getMetaData().getDatabase(databaseName),
metaDataContexts.getMetaData().getGlobalRuleMetaData());
- if (useFederation) {
- ResultSet resultSet = executeFederationQuery(queryContext);
- return null != resultSet;
- }
- executionContext = createExecutionContext(queryContext);
- if
(!metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty())
{
- Collection<ExecuteResult> results =
executor.getRawExecutor().execute(createRawExecutionContext(executionContext),
executionContext.getQueryContext(), new RawSQLExecutorCallback());
- return results.iterator().next() instanceof QueryResult;
- }
- return executeWithExecutionContext(executeCallback,
executionContext);
- } finally {
- currentResultSet = null;
+ QueryContext queryContext = createQueryContext(sql);
+ handleAutoCommit(queryContext);
+ databaseName =
queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName());
+
connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
+ String trafficInstanceId =
getInstanceIdAndSet(queryContext).orElse(null);
+ if (null != trafficInstanceId) {
+ JDBCExecutionUnit executionUnit =
createTrafficExecutionUnit(trafficInstanceId, queryContext);
+ boolean result =
executor.getTrafficExecutor().execute(executionUnit, trafficCallback);
+ currentResultSet = executor.getTrafficExecutor().getResultSet();
+ return result;
+ }
+ if (decide(queryContext,
metaDataContexts.getMetaData().getDatabase(databaseName),
metaDataContexts.getMetaData().getGlobalRuleMetaData())) {
+ ResultSet resultSet = executeFederationQuery(queryContext);
+ currentResultSet = resultSet;
+ return null != resultSet;
}
+ currentResultSet = null;
+ executionContext = createExecutionContext(queryContext);
+ if
(!metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty())
{
+ Collection<ExecuteResult> results =
executor.getRawExecutor().execute(createRawExecutionContext(executionContext),
executionContext.getQueryContext(), new RawSQLExecutorCallback());
+ return results.iterator().next() instanceof QueryResult;
+ }
+ return executeWithExecutionContext(executeCallback, executionContext);
}
private void handleAutoCommit(final QueryContext queryContext) throws
SQLException {
@@ -563,14 +559,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
if (null != currentResultSet) {
return currentResultSet;
}
- if (null != trafficInstanceId) {
- return executor.getTrafficExecutor().getResultSet();
- }
- if (useFederation) {
- return executor.getSqlFederationEngine().getResultSet();
- }
- if (executionContext.getSqlStatementContext() instanceof
SelectStatementContext
- || executionContext.getSqlStatementContext().getSqlStatement()
instanceof DALStatement) {
+ if (executionContext.getSqlStatementContext() instanceof
SelectStatementContext ||
executionContext.getSqlStatementContext().getSqlStatement() instanceof
DALStatement) {
List<ResultSet> resultSets = getResultSets();
if (resultSets.isEmpty()) {
return currentResultSet;
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/unsupported/AbstractUnsupportedOperationStatement.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/unsupported/AbstractUnsupportedOperationStatement.java
index b6c10685931..9c759966002 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/unsupported/AbstractUnsupportedOperationStatement.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/unsupported/AbstractUnsupportedOperationStatement.java
@@ -29,17 +29,17 @@ import java.sql.Statement;
public abstract class AbstractUnsupportedOperationStatement extends
WrapperAdapter implements Statement {
@Override
- public final void closeOnCompletion() throws SQLException {
+ public void closeOnCompletion() throws SQLException {
throw new SQLFeatureNotSupportedException("closeOnCompletion");
}
@Override
- public final boolean isCloseOnCompletion() throws SQLException {
+ public boolean isCloseOnCompletion() throws SQLException {
throw new SQLFeatureNotSupportedException("isCloseOnCompletion");
}
@Override
- public final void setCursorName(final String name) throws SQLException {
+ public void setCursorName(final String name) throws SQLException {
throw new SQLFeatureNotSupportedException("setCursorName");
}
}
diff --git
a/jdbc/src/test/java/org/apache/shardingsphere/driver/jdbc/unsupported/UnsupportedOperationStatementTest.java
b/jdbc/src/test/java/org/apache/shardingsphere/driver/jdbc/unsupported/UnsupportedOperationStatementTest.java
index 44a1a0922f7..a129de2303e 100644
---
a/jdbc/src/test/java/org/apache/shardingsphere/driver/jdbc/unsupported/UnsupportedOperationStatementTest.java
+++
b/jdbc/src/test/java/org/apache/shardingsphere/driver/jdbc/unsupported/UnsupportedOperationStatementTest.java
@@ -60,16 +60,6 @@ class UnsupportedOperationStatementTest {
shardingSphereStatement = new ShardingSphereStatement(connection);
}
- @Test
- void assertCloseOnCompletion() {
- assertThrows(SQLFeatureNotSupportedException.class, () ->
shardingSphereStatement.closeOnCompletion());
- }
-
- @Test
- void assertIsCloseOnCompletion() {
- assertThrows(SQLFeatureNotSupportedException.class, () ->
shardingSphereStatement.isCloseOnCompletion());
- }
-
@Test
void assertSetCursorName() {
assertThrows(SQLFeatureNotSupportedException.class, () ->
shardingSphereStatement.setCursorName("cursorName"));
diff --git
a/kernel/traffic/core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java
b/kernel/traffic/core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java
index b5f1fd82ec3..b349a280cb1 100644
---
a/kernel/traffic/core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java
+++
b/kernel/traffic/core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.traffic.executor;
+import lombok.Getter;
import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
@@ -33,6 +34,9 @@ public final class TrafficExecutor implements AutoCloseable {
private Statement statement;
+ @Getter
+ private ResultSet resultSet;
+
/**
* Execute.
*
@@ -45,7 +49,9 @@ public final class TrafficExecutor implements AutoCloseable {
public <T> T execute(final JDBCExecutionUnit executionUnit, final
TrafficExecutorCallback<T> callback) throws SQLException {
SQLUnit sqlUnit = executionUnit.getExecutionUnit().getSqlUnit();
cacheStatement(sqlUnit.getParameters(),
executionUnit.getStorageResource());
- return callback.execute(statement, sqlUnit.getSql());
+ T result = callback.execute(statement, sqlUnit.getSql());
+ resultSet = statement.getResultSet();
+ return result;
}
private void cacheStatement(final List<Object> params, final Statement
statement) throws SQLException {
@@ -63,16 +69,6 @@ public final class TrafficExecutor implements AutoCloseable {
}
}
- /**
- * Get result set.
- *
- * @return result set
- * @throws SQLException SQL exception
- */
- public ResultSet getResultSet() throws SQLException {
- return statement.getResultSet();
- }
-
@Override
public void close() throws SQLException {
if (null != statement) {