This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a9ced3d5e24 Refactor ShardingSphereStatement to remove duplicate code
(#25166)
a9ced3d5e24 is described below
commit a9ced3d5e2489b96abf537a45261dcc14c45544d
Author: Zhengqiang Duan <[email protected]>
AuthorDate: Fri Apr 14 16:42:09 2023 +0800
Refactor ShardingSphereStatement to remove duplicate code (#25166)
---
.../command/query/PostgreSQLColumnDescription.java | 2 +-
.../core/statement/ShardingSphereStatement.java | 101 ++++++++-------------
.../MySQLDataSourcePrivilegeCheckerTest.java | 2 +-
.../general/MySQLTimeTypesMigrationE2EIT.java | 6 +-
4 files changed, 42 insertions(+), 69 deletions(-)
diff --git
a/db-protocol/postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/PostgreSQLColumnDescription.java
b/db-protocol/postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/PostgreSQLColumnDescription.java
index 6aab7192426..08654393e46 100644
---
a/db-protocol/postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/PostgreSQLColumnDescription.java
+++
b/db-protocol/postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/PostgreSQLColumnDescription.java
@@ -47,7 +47,7 @@ public final class PostgreSQLColumnDescription {
public PostgreSQLColumnDescription(final String columnName, final int
columnIndex, final int columnType, final int columnLength, final String
columnTypeName) {
this(columnName, columnIndex, columnType, columnLength,
columnTypeName, PostgreSQLValueFormat.TEXT.getCode());
}
-
+
public PostgreSQLColumnDescription(final String columnName, final int
columnIndex, final int columnType, final int columnLength, final String
columnTypeName, final int dataFormat) {
this.columnName = columnName;
this.columnIndex = columnIndex;
diff --git
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index ee53eac6830..b3764fb7738 100644
---
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -89,6 +89,7 @@ import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectState
import
org.apache.shardingsphere.sqlfederation.spi.SQLFederationExecutorContext;
import org.apache.shardingsphere.traffic.engine.TrafficEngine;
import
org.apache.shardingsphere.traffic.exception.metadata.EmptyTrafficExecutionUnitException;
+import org.apache.shardingsphere.traffic.executor.TrafficExecutorCallback;
import org.apache.shardingsphere.traffic.rule.TrafficRule;
import org.apache.shardingsphere.transaction.ConnectionTransaction;
import org.apache.shardingsphere.transaction.api.TransactionType;
@@ -277,18 +278,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
@Override
public int executeUpdate(final String sql) throws SQLException {
try {
- QueryContext queryContext = createQueryContext(sql);
-
checkSameDatabaseNameInTransaction(queryContext.getSqlStatementContext(),
connection.getDatabaseName());
- trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null);
- if (null != trafficInstanceId) {
- JDBCExecutionUnit executionUnit =
createTrafficExecutionUnit(trafficInstanceId, queryContext);
- return executor.getTrafficExecutor().execute(executionUnit,
Statement::executeUpdate);
- }
- executionContext = createExecutionContext(queryContext);
- if
(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getRuleMetaData().getRules().stream().anyMatch(each
-> each instanceof RawExecutionRule)) {
- return
accumulate(executor.getRawExecutor().execute(createRawExecutionContext(),
executionContext.getQueryContext(), new RawSQLExecutorCallback()));
- }
- return executeUpdate((actualSQL, statement) ->
statement.executeUpdate(actualSQL), executionContext.getSqlStatementContext());
+ return executeUpdate0(sql, (actualSQL, statement) ->
statement.executeUpdate(actualSQL), Statement::executeUpdate);
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
@@ -305,18 +295,8 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
returnGeneratedKeys = true;
}
try {
- QueryContext queryContext = createQueryContext(sql);
-
checkSameDatabaseNameInTransaction(queryContext.getSqlStatementContext(),
connection.getDatabaseName());
- trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null);
- if (null != trafficInstanceId) {
- JDBCExecutionUnit executionUnit =
createTrafficExecutionUnit(trafficInstanceId, queryContext);
- return executor.getTrafficExecutor().execute(executionUnit,
(statement, actualSQL) -> statement.executeUpdate(actualSQL,
autoGeneratedKeys));
- }
- executionContext = createExecutionContext(queryContext);
- if
(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getRuleMetaData().getRules().stream().anyMatch(each
-> each instanceof RawExecutionRule)) {
- return
accumulate(executor.getRawExecutor().execute(createRawExecutionContext(),
executionContext.getQueryContext(), new RawSQLExecutorCallback()));
- }
- return executeUpdate((actualSQL, statement) ->
statement.executeUpdate(actualSQL, autoGeneratedKeys),
executionContext.getSqlStatementContext());
+ return executeUpdate0(sql, (actualSQL, statement) ->
statement.executeUpdate(actualSQL, autoGeneratedKeys),
+ (statement, actualSQL) ->
statement.executeUpdate(actualSQL, autoGeneratedKeys));
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
@@ -331,18 +311,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
public int executeUpdate(final String sql, final int[] columnIndexes)
throws SQLException {
returnGeneratedKeys = true;
try {
- QueryContext queryContext = createQueryContext(sql);
-
checkSameDatabaseNameInTransaction(queryContext.getSqlStatementContext(),
connection.getDatabaseName());
- trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null);
- if (null != trafficInstanceId) {
- JDBCExecutionUnit executionUnit =
createTrafficExecutionUnit(trafficInstanceId, queryContext);
- return executor.getTrafficExecutor().execute(executionUnit,
(statement, actualSQL) -> statement.executeUpdate(actualSQL, columnIndexes));
- }
- executionContext = createExecutionContext(queryContext);
- if
(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getRuleMetaData().getRules().stream().anyMatch(each
-> each instanceof RawExecutionRule)) {
- return
accumulate(executor.getRawExecutor().execute(createRawExecutionContext(),
executionContext.getQueryContext(), new RawSQLExecutorCallback()));
- }
- return executeUpdate((actualSQL, statement) ->
statement.executeUpdate(actualSQL, columnIndexes),
executionContext.getSqlStatementContext());
+ return executeUpdate0(sql, (actualSQL, statement) ->
statement.executeUpdate(actualSQL, columnIndexes), (statement, actualSQL) ->
statement.executeUpdate(actualSQL, columnIndexes));
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
@@ -357,18 +326,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
public int executeUpdate(final String sql, final String[] columnNames)
throws SQLException {
returnGeneratedKeys = true;
try {
- QueryContext queryContext = createQueryContext(sql);
-
checkSameDatabaseNameInTransaction(queryContext.getSqlStatementContext(),
connection.getDatabaseName());
- trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null);
- if (null != trafficInstanceId) {
- JDBCExecutionUnit executionUnit =
createTrafficExecutionUnit(trafficInstanceId, queryContext);
- return executor.getTrafficExecutor().execute(executionUnit,
(statement, actualSQL) -> statement.executeUpdate(actualSQL, columnNames));
- }
- executionContext = createExecutionContext(queryContext);
- if
(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getRuleMetaData().getRules().stream().anyMatch(each
-> each instanceof RawExecutionRule)) {
- return
accumulate(executor.getRawExecutor().execute(createRawExecutionContext(),
executionContext.getQueryContext(), new RawSQLExecutorCallback()));
- }
- return executeUpdate((actualSQL, statement) ->
statement.executeUpdate(actualSQL, columnNames),
executionContext.getSqlStatementContext());
+ return executeUpdate0(sql, (actualSQL, statement) ->
statement.executeUpdate(actualSQL, columnNames), (statement, actualSQL) ->
statement.executeUpdate(actualSQL, columnNames));
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
@@ -379,15 +337,31 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
}
}
- private int executeUpdate(final ExecuteUpdateCallback updater, final
SQLStatementContext<?> sqlStatementContext) throws SQLException {
- return isNeedImplicitCommitTransaction(executionContext) ?
executeUpdateWithImplicitCommitTransaction(updater, sqlStatementContext) :
useDriverToExecuteUpdate(updater, sqlStatementContext);
+ private int executeUpdate(final ExecuteUpdateCallback updateCallback,
final SQLStatementContext<?> sqlStatementContext) throws SQLException {
+ return isNeedImplicitCommitTransaction(executionContext) ?
executeUpdateWithImplicitCommitTransaction(updateCallback, sqlStatementContext)
+ : useDriverToExecuteUpdate(updateCallback,
sqlStatementContext);
+ }
+
+ private int executeUpdate0(final String sql, final ExecuteUpdateCallback
updateCallback, final TrafficExecutorCallback<Integer> trafficCallback) throws
SQLException {
+ QueryContext queryContext = createQueryContext(sql);
+
checkSameDatabaseNameInTransaction(queryContext.getSqlStatementContext(),
connection.getDatabaseName());
+ trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null);
+ if (null != trafficInstanceId) {
+ JDBCExecutionUnit executionUnit =
createTrafficExecutionUnit(trafficInstanceId, queryContext);
+ return executor.getTrafficExecutor().execute(executionUnit,
trafficCallback);
+ }
+ executionContext = createExecutionContext(queryContext);
+ if
(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getRuleMetaData().getRules().stream().anyMatch(each
-> each instanceof RawExecutionRule)) {
+ return
accumulate(executor.getRawExecutor().execute(createRawExecutionContext(),
executionContext.getQueryContext(), new RawSQLExecutorCallback()));
+ }
+ return executeUpdate(updateCallback,
executionContext.getSqlStatementContext());
}
- private int executeUpdateWithImplicitCommitTransaction(final
ExecuteUpdateCallback updater, final SQLStatementContext<?>
sqlStatementContext) throws SQLException {
+ private int executeUpdateWithImplicitCommitTransaction(final
ExecuteUpdateCallback updateCallback, final SQLStatementContext<?>
sqlStatementContext) throws SQLException {
int result;
try {
connection.setAutoCommit(false);
- result = useDriverToExecuteUpdate(updater, sqlStatementContext);
+ result = useDriverToExecuteUpdate(updateCallback,
sqlStatementContext);
connection.commit();
// CHECKSTYLE:OFF
} catch (final Exception ex) {
@@ -398,22 +372,22 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
return result;
}
- private int useDriverToExecuteUpdate(final ExecuteUpdateCallback updater,
final SQLStatementContext<?> sqlStatementContext) throws SQLException {
+ private int useDriverToExecuteUpdate(final ExecuteUpdateCallback
updateCallback, final SQLStatementContext<?> sqlStatementContext) throws
SQLException {
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
createExecutionGroupContext();
cacheStatements(executionGroupContext.getInputGroups());
- JDBCExecutorCallback<Integer> callback =
createExecuteUpdateCallback(updater, sqlStatementContext);
+ JDBCExecutorCallback<Integer> callback =
createExecuteUpdateCallback(updateCallback, sqlStatementContext);
return
executor.getRegularExecutor().executeUpdate(executionGroupContext,
executionContext.getQueryContext(),
executionContext.getRouteContext().getRouteUnits(), callback);
}
- private JDBCExecutorCallback<Integer> createExecuteUpdateCallback(final
ExecuteUpdateCallback updater, final SQLStatementContext<?>
sqlStatementContext) {
+ private JDBCExecutorCallback<Integer> createExecuteUpdateCallback(final
ExecuteUpdateCallback updateCallback, final SQLStatementContext<?>
sqlStatementContext) {
boolean isExceptionThrown =
SQLExecutorExceptionHandler.isExceptionThrown();
return new
JDBCExecutorCallback<Integer>(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType(),
metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getStorageTypes(),
sqlStatementContext.getSqlStatement(), isExceptionThrown) {
@Override
protected Integer executeSQL(final String sql, final Statement
statement, final ConnectionMode connectionMode, final DatabaseType storageType)
throws SQLException {
- return updater.executeUpdate(sql, statement);
+ return updateCallback.executeUpdate(sql, statement);
}
@Override
@@ -434,7 +408,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
@Override
public boolean execute(final String sql) throws SQLException {
try {
- return execute0(sql, (actualSQL, statement) ->
statement.execute(actualSQL));
+ return execute0(sql, (actualSQL, statement) ->
statement.execute(actualSQL), Statement::execute);
// CHECKSTYLE:OFF
} catch (final SQLException ex) {
// CHECKSTYLE:ON
@@ -449,7 +423,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
if (RETURN_GENERATED_KEYS == autoGeneratedKeys) {
returnGeneratedKeys = true;
}
- return execute0(sql, (actualSQL, statement) ->
statement.execute(actualSQL, autoGeneratedKeys));
+ return execute0(sql, (actualSQL, statement) ->
statement.execute(actualSQL, autoGeneratedKeys), (statement, actualSQL) ->
statement.execute(actualSQL, autoGeneratedKeys));
// CHECKSTYLE:OFF
} catch (final SQLException ex) {
// CHECKSTYLE:ON
@@ -462,7 +436,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
public boolean execute(final String sql, final int[] columnIndexes) throws
SQLException {
try {
returnGeneratedKeys = true;
- return execute0(sql, (actualSQL, statement) ->
statement.execute(actualSQL, columnIndexes));
+ return execute0(sql, (actualSQL, statement) ->
statement.execute(actualSQL, columnIndexes), (statement, actualSQL) ->
statement.execute(actualSQL, columnIndexes));
// CHECKSTYLE:OFF
} catch (final SQLException ex) {
// CHECKSTYLE:ON
@@ -475,7 +449,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
public boolean execute(final String sql, final String[] columnNames)
throws SQLException {
try {
returnGeneratedKeys = true;
- return execute0(sql, (actualSQL, statement) ->
statement.execute(actualSQL, columnNames));
+ return execute0(sql, (actualSQL, statement) ->
statement.execute(actualSQL, columnNames), (statement, actualSQL) ->
statement.execute(actualSQL, columnNames));
// CHECKSTYLE:OFF
} catch (final SQLException ex) {
// CHECKSTYLE:ON
@@ -484,14 +458,14 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
}
}
- private boolean execute0(final String sql, final ExecuteCallback callback)
throws SQLException {
+ private boolean execute0(final String sql, final ExecuteCallback
executeCallback, final TrafficExecutorCallback<Boolean> trafficCallback) throws
SQLException {
try {
QueryContext queryContext = createQueryContext(sql);
checkSameDatabaseNameInTransaction(queryContext.getSqlStatementContext(),
connection.getDatabaseName());
trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null);
if (null != trafficInstanceId) {
JDBCExecutionUnit executionUnit =
createTrafficExecutionUnit(trafficInstanceId, queryContext);
- return executor.getTrafficExecutor().execute(executionUnit,
(statement, actualSQL) -> callback.execute(actualSQL, statement));
+ return executor.getTrafficExecutor().execute(executionUnit,
trafficCallback);
}
deciderContext = decide(queryContext,
metaDataContexts.getMetaData().getGlobalRuleMetaData(),
metaDataContexts.getMetaData().getProps(),
metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()));
@@ -501,11 +475,10 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
}
executionContext = createExecutionContext(queryContext);
if
(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getRuleMetaData().getRules().stream().anyMatch(each
-> each instanceof RawExecutionRule)) {
- // TODO process getStatement
Collection<ExecuteResult> results =
executor.getRawExecutor().execute(createRawExecutionContext(),
executionContext.getQueryContext(), new RawSQLExecutorCallback());
return results.iterator().next() instanceof QueryResult;
}
- return isNeedImplicitCommitTransaction(executionContext) ?
executeWithImplicitCommitTransaction(callback) : useDriverToExecute(callback);
+ return isNeedImplicitCommitTransaction(executionContext) ?
executeWithImplicitCommitTransaction(executeCallback) :
useDriverToExecute(executeCallback);
} finally {
currentResultSet = null;
}
diff --git
a/kernel/transaction/type/xa/core/src/test/java/org/apache/shardingsphere/transaction/xa/jta/datasource/checker/MySQLDataSourcePrivilegeCheckerTest.java
b/kernel/transaction/type/xa/core/src/test/java/org/apache/shardingsphere/transaction/xa/jta/datasource/checker/MySQLDataSourcePrivilegeCheckerTest.java
index 0050bfcd8f9..b7d7fd1cf0a 100644
---
a/kernel/transaction/type/xa/core/src/test/java/org/apache/shardingsphere/transaction/xa/jta/datasource/checker/MySQLDataSourcePrivilegeCheckerTest.java
+++
b/kernel/transaction/type/xa/core/src/test/java/org/apache/shardingsphere/transaction/xa/jta/datasource/checker/MySQLDataSourcePrivilegeCheckerTest.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
index f597af52cbc..74bb9061cef 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
@@ -39,7 +39,7 @@ import java.sql.SQLException;
*/
@PipelineE2ESettings(fetchSingle = true, database =
@PipelineE2ESettings.PipelineE2EDatabaseSettings(type = "MySQL", scenarioFiles
= "env/common/none.xml"))
public class MySQLTimeTypesMigrationE2EIT extends AbstractMigrationE2EIT {
-
+
@ParameterizedTest(name = "{0}")
@EnabledIf("isEnabled")
@ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
@@ -59,7 +59,7 @@ public class MySQLTimeTypesMigrationE2EIT extends
AbstractMigrationE2EIT {
assertCheckMigrationSuccess(containerComposer, jobId,
"DATA_MATCH");
}
}
-
+
private void insertOneRecordWithZeroValue(final PipelineContainerComposer
containerComposer, final int id) throws SQLException {
try (Connection connection =
containerComposer.getSourceDataSource().getConnection()) {
PreparedStatement preparedStatement =
connection.prepareStatement("INSERT INTO `time_e2e`(id, t_timestamp,
t_datetime, t_date, t_year) VALUES (?, ?, ?, ?, ?)");
@@ -71,7 +71,7 @@ public class MySQLTimeTypesMigrationE2EIT extends
AbstractMigrationE2EIT {
preparedStatement.execute();
}
}
-
+
private static boolean isEnabled() {
return PipelineE2ECondition.isEnabled(new MySQLDatabaseType());
}