This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 6867d040715 Optimize ShardingSpherePreparedStatement for multi
executionContext (#28796)
6867d040715 is described below
commit 6867d040715d1850bad7ee5a2c2e375b46f5c79d
Author: Chuxin Chen <[email protected]>
AuthorDate: Thu Oct 19 11:18:53 2023 +0800
Optimize ShardingSpherePreparedStatement for multi executionContext (#28796)
* Refactor ShardingSpherePreparedStatement for support multi
executionContext.
* Refactor ShardingSpherePreparedStatement for support multi
executionContext.
---
.../jdbc/adapter/AbstractStatementAdapter.java | 16 ++-
.../statement/ShardingSpherePreparedStatement.java | 8 +-
.../core/statement/ShardingSphereStatement.java | 138 +++++++++++++--------
.../driver/jdbc/adapter/StatementAdapterTest.java | 2 +-
.../proxy/backend/connector/DatabaseConnector.java | 11 +-
5 files changed, 113 insertions(+), 62 deletions(-)
diff --git
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
index dabd42d7d2a..3ccbc6d1404 100644
---
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
+++
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
@@ -57,7 +57,7 @@ public abstract class AbstractStatementAdapter extends
AbstractUnsupportedOperat
private boolean closed;
- protected final boolean isNeedImplicitCommitTransaction(final
ShardingSphereConnection connection, final ExecutionContext executionContext) {
+ protected final boolean isNeedImplicitCommitTransaction(final
ShardingSphereConnection connection, final Collection<ExecutionContext>
executionContexts) {
if (connection.getAutoCommit()) {
return false;
}
@@ -66,11 +66,19 @@ public abstract class AbstractStatementAdapter extends
AbstractUnsupportedOperat
if
(!TransactionType.isDistributedTransaction(connectionTransaction.getTransactionType())
|| isInTransaction) {
return false;
}
- return isModifiedSQL(executionContext) &&
executionContext.getExecutionUnits().size() > 1;
+ if (1 == executionContexts.size()) {
+ SQLStatement sqlStatement =
executionContexts.iterator().next().getSqlStatementContext().getSqlStatement();
+ return isWriteDMLStatement(sqlStatement) &&
executionContexts.iterator().next().getExecutionUnits().size() > 1;
+ }
+ for (ExecutionContext each : executionContexts) {
+ if
(isWriteDMLStatement(each.getSqlStatementContext().getSqlStatement())) {
+ return true;
+ }
+ }
+ return false;
}
- private boolean isModifiedSQL(final ExecutionContext executionContext) {
- SQLStatement sqlStatement =
executionContext.getSqlStatementContext().getSqlStatement();
+ private boolean isWriteDMLStatement(final SQLStatement sqlStatement) {
return sqlStatement instanceof DMLStatement && !(sqlStatement
instanceof SelectStatement);
}
diff --git
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index beae1457f78..228e07a9bce 100644
---
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.driver.jdbc.core.statement;
import com.google.common.base.Strings;
import lombok.AccessLevel;
import lombok.Getter;
-import
org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine;
import org.apache.shardingsphere.driver.executor.DriverExecutor;
import org.apache.shardingsphere.driver.executor.batch.BatchExecutionUnit;
import
org.apache.shardingsphere.driver.executor.batch.BatchPreparedStatementExecutor;
@@ -33,15 +32,16 @@ import
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResult
import
org.apache.shardingsphere.driver.jdbc.core.statement.metadata.ShardingSphereParameterMetaData;
import
org.apache.shardingsphere.driver.jdbc.exception.syntax.EmptySQLException;
import org.apache.shardingsphere.infra.binder.context.aware.ParameterAware;
-import org.apache.shardingsphere.infra.binder.engine.SQLBindEngine;
import
org.apache.shardingsphere.infra.binder.context.segment.insert.keygen.GeneratedKeyContext;
import
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
import
org.apache.shardingsphere.infra.binder.context.statement.dml.InsertStatementContext;
import
org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
+import org.apache.shardingsphere.infra.binder.engine.SQLBindEngine;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType;
+import
org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine;
import org.apache.shardingsphere.infra.executor.audit.SQLAuditEngine;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
@@ -356,7 +356,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
Collection<ExecuteResult> executeResults =
executor.getRawExecutor().execute(createRawExecutionGroupContext(),
executionContext.getQueryContext(), new RawSQLExecutorCallback());
return accumulate(executeResults);
}
- return isNeedImplicitCommitTransaction(connection,
executionContext) ? executeUpdateWithImplicitCommitTransaction() :
useDriverToExecuteUpdate();
+ return isNeedImplicitCommitTransaction(connection,
Collections.singleton(executionContext)) ?
executeUpdateWithImplicitCommitTransaction() : useDriverToExecuteUpdate();
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
@@ -426,7 +426,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
Collection<ExecuteResult> executeResults =
executor.getRawExecutor().execute(createRawExecutionGroupContext(),
executionContext.getQueryContext(), new RawSQLExecutorCallback());
return executeResults.iterator().next() instanceof QueryResult;
}
- return isNeedImplicitCommitTransaction(connection,
executionContext) ? executeWithImplicitCommitTransaction() :
useDriverToExecute();
+ return isNeedImplicitCommitTransaction(connection,
Collections.singleton(executionContext)) ?
executeWithImplicitCommitTransaction() : useDriverToExecute();
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
diff --git
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index 1533f0a93db..d787b19f559 100644
---
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.driver.jdbc.core.statement;
+import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import lombok.AccessLevel;
import lombok.Getter;
@@ -123,7 +124,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
private boolean returnGeneratedKeys;
- private ExecutionContext executionContext;
+ private Collection<ExecutionContext> executionContexts;
private ResultSet currentResultSet;
@@ -174,12 +175,8 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
if (useFederation) {
return executeFederationQuery(queryContext);
}
- executionContext = createExecutionContext(queryContext);
- List<QueryResult> queryResults = executeQuery0();
- MergedResult mergedResult = mergeQuery(queryResults);
- boolean selectContainsEnhancedTable =
- executionContext.getSqlStatementContext() instanceof
SelectStatementContext && ((SelectStatementContext)
executionContext.getSqlStatementContext()).isContainsEnhancedTable();
- result = new ShardingSphereResultSet(getResultSets(),
mergedResult, this, selectContainsEnhancedTable, executionContext);
+ executionContexts = createExecutionContext(queryContext);
+ result = doExecuteQuery(executionContexts);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
@@ -192,6 +189,20 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
return result;
}
+ private ShardingSphereResultSet doExecuteQuery(final
Collection<ExecutionContext> executionContexts) throws SQLException {
+ ShardingSphereResultSet result = null;
+ for (ExecutionContext each : executionContexts) {
+ List<QueryResult> queryResults = executeQuery0(each);
+ MergedResult mergedResult = mergeQuery(queryResults,
each.getSqlStatementContext());
+ boolean selectContainsEnhancedTable =
+ each.getSqlStatementContext() instanceof
SelectStatementContext && ((SelectStatementContext)
each.getSqlStatementContext()).isContainsEnhancedTable();
+ if (null == result) {
+ result = new ShardingSphereResultSet(getResultSets(),
mergedResult, this, selectContainsEnhancedTable, each);
+ }
+ }
+ return result;
+ }
+
private boolean decide(final QueryContext queryContext, final
ShardingSphereDatabase database, final RuleMetaData globalRuleMetaData) {
return
executor.getSqlFederationEngine().decide(queryContext.getSqlStatementContext(),
queryContext.getParameters(), database, globalRuleMetaData);
}
@@ -214,12 +225,12 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
: Optional.empty();
}
- private List<QueryResult> executeQuery0() throws SQLException {
+ private List<QueryResult> executeQuery0(final ExecutionContext
executionContext) throws SQLException {
if
(metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules().stream().anyMatch(RawExecutionRule.class::isInstance))
{
return executor.getRawExecutor().execute(
- createRawExecutionContext(),
executionContext.getQueryContext(), new
RawSQLExecutorCallback()).stream().map(QueryResult.class::cast).collect(Collectors.toList());
+ createRawExecutionContext(executionContext),
executionContext.getQueryContext(), new
RawSQLExecutorCallback()).stream().map(QueryResult.class::cast).collect(Collectors.toList());
}
- ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
createExecutionGroupContext();
+ ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
createExecutionGroupContext(executionContext);
cacheStatements(executionGroupContext.getInputGroups());
StatementExecuteQueryCallback callback = new
StatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(),
metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(),
executionContext.getSqlStatementContext().getSqlStatement(),
@@ -304,9 +315,9 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
}
}
- private int executeUpdate(final ExecuteUpdateCallback updateCallback,
final SQLStatementContext sqlStatementContext) throws SQLException {
- return isNeedImplicitCommitTransaction(connection, executionContext) ?
executeUpdateWithImplicitCommitTransaction(updateCallback, sqlStatementContext)
- : useDriverToExecuteUpdate(updateCallback,
sqlStatementContext);
+ private int executeUpdate(final ExecuteUpdateCallback updateCallback,
final SQLStatementContext sqlStatementContext, final
Collection<ExecutionContext> executionContexts) throws SQLException {
+ return isNeedImplicitCommitTransaction(connection, executionContexts)
? executeUpdateWithImplicitCommitTransaction(updateCallback,
sqlStatementContext, executionContexts)
+ : useDriverToExecuteUpdate(updateCallback,
sqlStatementContext, executionContexts);
}
private int executeUpdate0(final String sql, final ExecuteUpdateCallback
updateCallback, final TrafficExecutorCallback<Integer> trafficCallback) throws
SQLException {
@@ -319,18 +330,23 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
JDBCExecutionUnit executionUnit =
createTrafficExecutionUnit(trafficInstanceId, queryContext);
return executor.getTrafficExecutor().execute(executionUnit,
trafficCallback);
}
- executionContext = createExecutionContext(queryContext);
+ executionContexts = createExecutionContext(queryContext);
if
(metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules().stream().anyMatch(RawExecutionRule.class::isInstance))
{
- return
accumulate(executor.getRawExecutor().execute(createRawExecutionContext(),
executionContext.getQueryContext(), new RawSQLExecutorCallback()));
+ Collection<ExecuteResult> results = new LinkedList<>();
+ for (ExecutionContext each : executionContexts) {
+
results.addAll(executor.getRawExecutor().execute(createRawExecutionContext(each),
each.getQueryContext(), new RawSQLExecutorCallback()));
+ }
+ return accumulate(results);
}
- return executeUpdate(updateCallback,
executionContext.getSqlStatementContext());
+ return executeUpdate(updateCallback,
queryContext.getSqlStatementContext(), executionContexts);
}
- private int executeUpdateWithImplicitCommitTransaction(final
ExecuteUpdateCallback updateCallback, final SQLStatementContext
sqlStatementContext) throws SQLException {
+ private int executeUpdateWithImplicitCommitTransaction(final
ExecuteUpdateCallback updateCallback, final SQLStatementContext
sqlStatementContext,
+ final
Collection<ExecutionContext> executionContexts) throws SQLException {
int result;
try {
connection.setAutoCommit(false);
- result = useDriverToExecuteUpdate(updateCallback,
sqlStatementContext);
+ result = useDriverToExecuteUpdate(updateCallback,
sqlStatementContext, executionContexts);
connection.commit();
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
@@ -343,12 +359,21 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
return result;
}
- private int useDriverToExecuteUpdate(final ExecuteUpdateCallback
updateCallback, final SQLStatementContext sqlStatementContext) throws
SQLException {
- ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
createExecutionGroupContext();
- cacheStatements(executionGroupContext.getInputGroups());
- JDBCExecutorCallback<Integer> callback =
createExecuteUpdateCallback(updateCallback, sqlStatementContext);
- return
executor.getRegularExecutor().executeUpdate(executionGroupContext,
- executionContext.getQueryContext(),
executionContext.getRouteContext().getRouteUnits(), callback);
+ private int useDriverToExecuteUpdate(final ExecuteUpdateCallback
updateCallback, final SQLStatementContext sqlStatementContext,
+ final Collection<ExecutionContext>
executionContexts) throws SQLException {
+ Integer result = null;
+ Preconditions.checkArgument(!executionContexts.isEmpty());
+ for (ExecutionContext each : executionContexts) {
+ ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
createExecutionGroupContext(each);
+ cacheStatements(executionGroupContext.getInputGroups());
+ JDBCExecutorCallback<Integer> callback =
createExecuteUpdateCallback(updateCallback, sqlStatementContext);
+ int effectedCount =
executor.getRegularExecutor().executeUpdate(executionGroupContext,
+ each.getQueryContext(),
each.getRouteContext().getRouteUnits(), callback);
+ if (null == result) {
+ result = effectedCount;
+ }
+ }
+ return result;
}
private JDBCExecutorCallback<Integer> createExecuteUpdateCallback(final
ExecuteUpdateCallback updateCallback, final SQLStatementContext
sqlStatementContext) {
@@ -446,12 +471,16 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
ResultSet resultSet = executeFederationQuery(queryContext);
return null != resultSet;
}
- executionContext = createExecutionContext(queryContext);
+ executionContexts = createExecutionContext(queryContext);
if
(metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules().stream().anyMatch(RawExecutionRule.class::isInstance))
{
- Collection<ExecuteResult> results =
executor.getRawExecutor().execute(createRawExecutionContext(),
executionContext.getQueryContext(), new RawSQLExecutorCallback());
+ Collection<ExecuteResult> results = new LinkedList<>();
+ for (ExecutionContext each : executionContexts) {
+
results.addAll(executor.getRawExecutor().execute(createRawExecutionContext(each),
each.getQueryContext(), new RawSQLExecutorCallback()));
+ }
return results.iterator().next() instanceof QueryResult;
}
- return isNeedImplicitCommitTransaction(connection,
executionContext) ? executeWithImplicitCommitTransaction(executeCallback) :
useDriverToExecute(executeCallback);
+ return isNeedImplicitCommitTransaction(connection,
executionContexts) ? executeWithImplicitCommitTransaction(executeCallback,
executionContexts)
+ : useDriverToExecute(executeCallback, executionContexts);
} finally {
currentResultSet = null;
}
@@ -507,31 +536,31 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
return protocolType.getTrunkDatabaseType().orElse(protocolType);
}
- private ExecutionContext createExecutionContext(final QueryContext
queryContext) throws SQLException {
+ private Collection<ExecutionContext> createExecutionContext(final
QueryContext queryContext) throws SQLException {
clearStatements();
RuleMetaData globalRuleMetaData =
metaDataContexts.getMetaData().getGlobalRuleMetaData();
ShardingSphereDatabase currentDatabase =
metaDataContexts.getMetaData().getDatabase(databaseName);
SQLAuditEngine.audit(queryContext.getSqlStatementContext(),
queryContext.getParameters(), globalRuleMetaData, currentDatabase, null,
queryContext.getHintValueContext());
- return kernelProcessor.generateExecutionContext(queryContext,
currentDatabase, globalRuleMetaData, metaDataContexts.getMetaData().getProps(),
-
connection.getDatabaseConnectionManager().getConnectionContext());
+ return
Collections.singleton(kernelProcessor.generateExecutionContext(queryContext,
currentDatabase, globalRuleMetaData, metaDataContexts.getMetaData().getProps(),
+
connection.getDatabaseConnectionManager().getConnectionContext()));
}
- private ExecutionGroupContext<JDBCExecutionUnit>
createExecutionGroupContext() throws SQLException {
+ private ExecutionGroupContext<JDBCExecutionUnit>
createExecutionGroupContext(final ExecutionContext executionContext) throws
SQLException {
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = createDriverExecutionPrepareEngine();
return prepareEngine.prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(), new
ExecutionGroupReportContext(databaseName));
}
- private ExecutionGroupContext<RawSQLExecutionUnit>
createRawExecutionContext() throws SQLException {
+ private ExecutionGroupContext<RawSQLExecutionUnit>
createRawExecutionContext(final ExecutionContext executionContext) throws
SQLException {
int maxConnectionsSizePerQuery =
metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery,
metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules())
.prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(), new
ExecutionGroupReportContext(databaseName));
}
- private boolean executeWithImplicitCommitTransaction(final ExecuteCallback
callback) throws SQLException {
+ private boolean executeWithImplicitCommitTransaction(final ExecuteCallback
callback, final Collection<ExecutionContext> executionContexts) throws
SQLException {
boolean result;
try {
connection.setAutoCommit(false);
- result = useDriverToExecute(callback);
+ result = useDriverToExecute(callback, executionContexts);
connection.commit();
// CHECKSTYLE:OFF
} catch (final Exception ex) {
@@ -544,12 +573,20 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
return result;
}
- private boolean useDriverToExecute(final ExecuteCallback callback) throws
SQLException {
- ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
createExecutionGroupContext();
- cacheStatements(executionGroupContext.getInputGroups());
- JDBCExecutorCallback<Boolean> jdbcExecutorCallback =
createExecuteCallback(callback,
executionContext.getSqlStatementContext().getSqlStatement());
- return executor.getRegularExecutor().execute(executionGroupContext,
- executionContext.getQueryContext(),
executionContext.getRouteContext().getRouteUnits(), jdbcExecutorCallback);
+ private boolean useDriverToExecute(final ExecuteCallback callback, final
Collection<ExecutionContext> executionContexts) throws SQLException {
+ Boolean result = null;
+ Preconditions.checkArgument(!executionContexts.isEmpty());
+ for (ExecutionContext each : executionContexts) {
+ ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
createExecutionGroupContext(each);
+ cacheStatements(executionGroupContext.getInputGroups());
+ JDBCExecutorCallback<Boolean> jdbcExecutorCallback =
createExecuteCallback(callback,
each.getSqlStatementContext().getSqlStatement());
+ boolean isWrite =
executor.getRegularExecutor().execute(executionGroupContext,
+ each.getQueryContext(),
each.getRouteContext().getRouteUnits(), jdbcExecutorCallback);
+ if (null == result) {
+ result = isWrite;
+ }
+ }
+ return result;
}
private void cacheStatements(final
Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups) throws
SQLException {
@@ -593,15 +630,16 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
if (useFederation) {
return executor.getSqlFederationEngine().getResultSet();
}
- if (executionContext.getSqlStatementContext() instanceof
SelectStatementContext ||
executionContext.getSqlStatementContext().getSqlStatement() instanceof
DALStatement) {
+ if (executionContexts.iterator().next().getSqlStatementContext()
instanceof SelectStatementContext
+ ||
executionContexts.iterator().next().getSqlStatementContext().getSqlStatement()
instanceof DALStatement) {
List<ResultSet> resultSets = getResultSets();
if (resultSets.isEmpty()) {
return currentResultSet;
}
- MergedResult mergedResult =
mergeQuery(getQueryResults(resultSets));
- boolean selectContainsEnhancedTable =
- executionContext.getSqlStatementContext() instanceof
SelectStatementContext && ((SelectStatementContext)
executionContext.getSqlStatementContext()).isContainsEnhancedTable();
- currentResultSet = new ShardingSphereResultSet(resultSets,
mergedResult, this, selectContainsEnhancedTable, executionContext);
+ SQLStatementContext sqlStatementContext =
executionContexts.iterator().next().getSqlStatementContext();
+ MergedResult mergedResult =
mergeQuery(getQueryResults(resultSets), sqlStatementContext);
+ boolean selectContainsEnhancedTable = sqlStatementContext
instanceof SelectStatementContext && ((SelectStatementContext)
sqlStatementContext).isContainsEnhancedTable();
+ currentResultSet = new ShardingSphereResultSet(resultSets,
mergedResult, this, selectContainsEnhancedTable,
executionContexts.iterator().next());
}
return currentResultSet;
}
@@ -626,10 +664,10 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
return result;
}
- private MergedResult mergeQuery(final List<QueryResult> queryResults)
throws SQLException {
+ private MergedResult mergeQuery(final List<QueryResult> queryResults,
final SQLStatementContext sqlStatementContext) throws SQLException {
MergeEngine mergeEngine = new
MergeEngine(metaDataContexts.getMetaData().getDatabase(databaseName),
metaDataContexts.getMetaData().getProps(),
connection.getDatabaseConnectionManager().getConnectionContext());
- return mergeEngine.merge(queryResults,
executionContext.getSqlStatementContext());
+ return mergeEngine.merge(queryResults, sqlStatementContext);
}
@SuppressWarnings("MagicConstant")
@@ -652,7 +690,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
@Override
public boolean isAccumulate() {
return
metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().findRules(DataNodeContainedRule.class).stream()
- .anyMatch(each ->
each.isNeedAccumulate(executionContext.getSqlStatementContext().getTablesContext().getTableNames()));
+ .anyMatch(each ->
each.isNeedAccumulate(executionContexts.iterator().next().getSqlStatementContext().getTablesContext().getTableNames()));
}
@Override
@@ -678,8 +716,8 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
}
private Optional<GeneratedKeyContext> findGeneratedKey() {
- return executionContext.getSqlStatementContext() instanceof
InsertStatementContext
- ? ((InsertStatementContext)
executionContext.getSqlStatementContext()).getGeneratedKeyContext()
+ return executionContexts.iterator().next().getSqlStatementContext()
instanceof InsertStatementContext
+ ? ((InsertStatementContext)
executionContexts.iterator().next().getSqlStatementContext()).getGeneratedKeyContext()
: Optional.empty();
}
diff --git
a/jdbc/core/src/test/java/org/apache/shardingsphere/driver/jdbc/adapter/StatementAdapterTest.java
b/jdbc/core/src/test/java/org/apache/shardingsphere/driver/jdbc/adapter/StatementAdapterTest.java
index e907a752eac..816625d8614 100644
---
a/jdbc/core/src/test/java/org/apache/shardingsphere/driver/jdbc/adapter/StatementAdapterTest.java
+++
b/jdbc/core/src/test/java/org/apache/shardingsphere/driver/jdbc/adapter/StatementAdapterTest.java
@@ -272,6 +272,6 @@ class StatementAdapterTest {
@SneakyThrows(ReflectiveOperationException.class)
private void setExecutionContext(final ShardingSphereStatement statement,
final ExecutionContext executionContext) {
-
Plugins.getMemberAccessor().set(statement.getClass().getDeclaredField("executionContext"),
statement, executionContext);
+
Plugins.getMemberAccessor().set(statement.getClass().getDeclaredField("executionContexts"),
statement, Collections.singleton(executionContext));
}
}
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java
index 6b45eff2eda..871853cec25 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.proxy.backend.connector;
import com.google.common.base.Preconditions;
-import
org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine;
import
org.apache.shardingsphere.infra.binder.context.aware.CursorDefinitionAware;
import
org.apache.shardingsphere.infra.binder.context.segment.insert.keygen.GeneratedKeyContext;
import
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
@@ -31,6 +30,8 @@ import
org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
import
org.apache.shardingsphere.infra.connection.refresher.MetaDataRefreshEngine;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
+import
org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
@@ -46,7 +47,6 @@ import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import
org.apache.shardingsphere.infra.metadata.database.schema.util.SystemSchemaUtils;
import
org.apache.shardingsphere.infra.rule.identifier.type.DataNodeContainedRule;
import org.apache.shardingsphere.infra.session.query.QueryContext;
-import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import
org.apache.shardingsphere.proxy.backend.connector.jdbc.executor.callback.ProxyJDBCExecutorCallback;
@@ -191,7 +191,12 @@ public final class DatabaseConnector implements
DatabaseBackendHandler {
SQLStatement sqlStatement =
executionContexts.iterator().next().getSqlStatementContext().getSqlStatement();
return isWriteDMLStatement(sqlStatement) &&
executionContexts.iterator().next().getExecutionUnits().size() > 1;
}
- return executionContexts.stream().anyMatch(each ->
isWriteDMLStatement(each.getSqlStatementContext().getSqlStatement()));
+ for (ExecutionContext each : executionContexts) {
+ if
(isWriteDMLStatement(each.getSqlStatementContext().getSqlStatement())) {
+ return true;
+ }
+ }
+ return false;
}
private boolean isWriteDMLStatement(final SQLStatement sqlStatement) {