This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 2da211c153f Refactor DriverExecutor (#31442)
2da211c153f is described below
commit 2da211c153f365524e69c7a952d687c6d089224b
Author: Liang Zhang <[email protected]>
AuthorDate: Thu May 30 00:43:59 2024 +0800
Refactor DriverExecutor (#31442)
* Refactor DriverExecutor
* Refactor DriverExecutor
* Refactor DriverExecutor
* Refactor DriverExecutor
---
.../driver/executor/DriverExecutor.java | 162 +++++++++++++++++++--
.../core/resultset/ShardingSphereResultSet.java | 2 +
.../statement/ShardingSpherePreparedStatement.java | 53 +++----
.../core/statement/ShardingSphereStatement.java | 39 +----
.../core/statement/StatementReplayCallback.java | 39 +++++
5 files changed, 212 insertions(+), 83 deletions(-)
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
index 5c6136bf646..04a7a4e0108 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
@@ -22,16 +22,37 @@ import
org.apache.shardingsphere.driver.executor.callback.ExecuteQueryCallback;
import
org.apache.shardingsphere.driver.executor.callback.impl.PreparedStatementExecuteQueryCallback;
import
org.apache.shardingsphere.driver.executor.callback.impl.StatementExecuteQueryCallback;
import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
+import
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSet;
+import
org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSetUtils;
+import
org.apache.shardingsphere.driver.jdbc.core.statement.StatementReplayCallback;
+import
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
+import
org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
+import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
+import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
+import org.apache.shardingsphere.infra.executor.audit.SQLAuditEngine;
import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
+import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
+import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
+import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
+import
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
+import org.apache.shardingsphere.infra.merge.MergeEngine;
+import org.apache.shardingsphere.infra.merge.result.MergedResult;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
+import
org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
import org.apache.shardingsphere.infra.session.query.QueryContext;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine;
@@ -45,7 +66,13 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
import java.util.Optional;
+import java.util.stream.Collectors;
/**
* Driver executor.
@@ -66,6 +93,14 @@ public final class DriverExecutor implements AutoCloseable {
private ExecuteType executeType = ExecuteType.REGULAR;
+ private final KernelProcessor kernelProcessor;
+
+ @Getter
+ private final List<Statement> statements = new ArrayList<>();
+
+ @Getter
+ private final List<List<Object>> parameterSets = new ArrayList<>();
+
public DriverExecutor(final ShardingSphereConnection connection) {
this.connection = connection;
MetaDataContexts metaDataContexts =
connection.getContextManager().getMetaDataContexts();
@@ -73,10 +108,10 @@ public final class DriverExecutor implements AutoCloseable
{
JDBCExecutor jdbcExecutor = new JDBCExecutor(executorEngine,
connection.getDatabaseConnectionManager().getConnectionContext());
regularExecutor = new DriverJDBCExecutor(connection.getDatabaseName(),
connection.getContextManager(), jdbcExecutor);
rawExecutor = new RawExecutor(executorEngine,
connection.getDatabaseConnectionManager().getConnectionContext());
- ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName());
- String schemaName = new
DatabaseTypeRegistry(database.getProtocolType()).getDefaultSchemaName(connection.getDatabaseName());
+ String schemaName = new
DatabaseTypeRegistry(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType()).getDefaultSchemaName(connection.getDatabaseName());
trafficExecutor = new TrafficExecutor();
sqlFederationEngine = new
SQLFederationEngine(connection.getDatabaseName(), schemaName,
metaDataContexts.getMetaData(), metaDataContexts.getStatistics(), jdbcExecutor);
+ kernelProcessor = new KernelProcessor();
}
/**
@@ -86,36 +121,129 @@ public final class DriverExecutor implements
AutoCloseable {
* @param database database
* @param queryContext query context
* @param prepareEngine prepare engine
+ * @param statement statement
+ * @param columnLabelAndIndexMap column label and index map
+ * @param statementReplayCallback statement replay callback
* @return result set
* @throws SQLException SQL exception
*/
- public Optional<ResultSet> executeAdvanceQuery(final
ShardingSphereMetaData metaData, final ShardingSphereDatabase database, final
QueryContext queryContext,
- final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine)
throws SQLException {
+ public ResultSet executeAdvanceQuery(final ShardingSphereMetaData
metaData, final ShardingSphereDatabase database, final QueryContext
queryContext,
+ final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final Statement statement,
+ final Map<String, Integer>
columnLabelAndIndexMap, final StatementReplayCallback statementReplayCallback)
throws SQLException {
Optional<String> trafficInstanceId =
connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class),
queryContext);
if (trafficInstanceId.isPresent()) {
- return Optional.of(trafficExecutor.execute(
- connection.getProcessId(), database.getName(),
trafficInstanceId.get(), queryContext, prepareEngine,
getTrafficExecutorCallback(prepareEngine)));
+ return trafficExecutor.execute(connection.getProcessId(),
database.getName(), trafficInstanceId.get(), queryContext, prepareEngine,
getTrafficExecutorCallback(prepareEngine));
}
if (sqlFederationEngine.decide(queryContext.getSqlStatementContext(),
queryContext.getParameters(), database, metaData.getGlobalRuleMetaData())) {
- return Optional.of(sqlFederationEngine.executeQuery(
- prepareEngine, getSQLFederationCallback(database,
queryContext, prepareEngine), new SQLFederationContext(false, queryContext,
metaData, connection.getProcessId())));
+ return sqlFederationEngine.executeQuery(
+ prepareEngine, getExecuteQueryCallback(database,
queryContext, prepareEngine.getType()), new SQLFederationContext(false,
queryContext, metaData, connection.getProcessId()));
}
- return Optional.empty();
+ return doExecuteQuery(metaData, database, queryContext, prepareEngine,
statement, columnLabelAndIndexMap, statementReplayCallback);
}
private TrafficExecutorCallback<ResultSet>
getTrafficExecutorCallback(final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine) {
return JDBCDriverType.STATEMENT.equals(prepareEngine.getType()) ?
Statement::executeQuery : ((statement, sql) -> ((PreparedStatement)
statement).executeQuery());
}
- private ExecuteQueryCallback getSQLFederationCallback(final
ShardingSphereDatabase database, final QueryContext queryContext,
- final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine) {
- return JDBCDriverType.STATEMENT.equals(prepareEngine.getType())
+ private ExecuteQueryCallback getExecuteQueryCallback(final
ShardingSphereDatabase database, final QueryContext queryContext, final String
jdbcDriverType) {
+ return JDBCDriverType.STATEMENT.equals(jdbcDriverType)
? new
StatementExecuteQueryCallback(database.getProtocolType(),
database.getResourceMetaData(),
queryContext.getSqlStatementContext().getSqlStatement(),
SQLExecutorExceptionHandler.isExceptionThrown())
: new
PreparedStatementExecuteQueryCallback(database.getProtocolType(),
database.getResourceMetaData(),
queryContext.getSqlStatementContext().getSqlStatement(),
SQLExecutorExceptionHandler.isExceptionThrown());
}
+ private ShardingSphereResultSet doExecuteQuery(final
ShardingSphereMetaData metaData, final ShardingSphereDatabase database, final
QueryContext queryContext,
+ final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final Statement statement,
+ final Map<String, Integer>
columnLabelAndIndexMap, final StatementReplayCallback statementReplayCallback)
throws SQLException {
+ List<QueryResult> queryResults = executeQuery0(metaData, database,
queryContext, prepareEngine, statementReplayCallback);
+ MergedResult mergedResult = mergeQuery(metaData, database,
queryResults, queryContext.getSqlStatementContext());
+ boolean selectContainsEnhancedTable =
queryContext.getSqlStatementContext() instanceof SelectStatementContext
+ && ((SelectStatementContext)
queryContext.getSqlStatementContext()).isContainsEnhancedTable();
+ List<ResultSet> resultSets = getResultSets();
+ return new ShardingSphereResultSet(resultSets, mergedResult,
statement, selectContainsEnhancedTable, queryContext.getSqlStatementContext(),
+ null == columnLabelAndIndexMap
+ ?
ShardingSphereResultSetUtils.createColumnLabelAndIndexMap(queryContext.getSqlStatementContext(),
selectContainsEnhancedTable, resultSets.get(0).getMetaData())
+ : columnLabelAndIndexMap);
+ }
+
+ private List<QueryResult> executeQuery0(final ShardingSphereMetaData
metaData, final ShardingSphereDatabase database, final QueryContext
queryContext,
+ final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
+ final StatementReplayCallback
statementReplayCallback) throws SQLException {
+ ExecutionContext executionContext = createExecutionContext(metaData,
database, queryContext);
+ if (hasRawExecutionRule(database)) {
+ return
rawExecutor.execute(createRawExecutionGroupContext(metaData, database,
executionContext),
+ queryContext, new
RawSQLExecutorCallback()).stream().map(QueryResult.class::cast).collect(Collectors.toList());
+ }
+ ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
prepareEngine.prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(),
+ new ExecutionGroupReportContext(connection.getProcessId(),
database.getName(), new Grantee("", "")));
+ for (ExecutionGroup<JDBCExecutionUnit> each :
executionGroupContext.getInputGroups()) {
+ statements.addAll(getStatements(each));
+ if
(JDBCDriverType.PREPARED_STATEMENT.equals(prepareEngine.getType())) {
+ parameterSets.addAll(getParameterSets(each));
+ }
+ }
+ statementReplayCallback.replay(statements, parameterSets);
+ return regularExecutor.executeQuery(executionGroupContext,
queryContext, getExecuteQueryCallback(database, queryContext,
prepareEngine.getType()));
+ }
+
+ private Collection<Statement> getStatements(final
ExecutionGroup<JDBCExecutionUnit> executionGroup) {
+ Collection<Statement> result = new LinkedList<>();
+ for (JDBCExecutionUnit each : executionGroup.getInputs()) {
+ result.add(each.getStorageResource());
+ }
+ return result;
+ }
+
+ private Collection<List<Object>> getParameterSets(final
ExecutionGroup<JDBCExecutionUnit> executionGroup) {
+ Collection<List<Object>> result = new LinkedList<>();
+ for (JDBCExecutionUnit each : executionGroup.getInputs()) {
+ result.add(each.getExecutionUnit().getSqlUnit().getParameters());
+ }
+ return result;
+ }
+
+ private ExecutionContext createExecutionContext(final
ShardingSphereMetaData metaData, final ShardingSphereDatabase database, final
QueryContext queryContext) throws SQLException {
+ clearStatements();
+ RuleMetaData globalRuleMetaData = metaData.getGlobalRuleMetaData();
+ SQLAuditEngine.audit(queryContext.getSqlStatementContext(),
queryContext.getParameters(), globalRuleMetaData, database, null,
queryContext.getHintValueContext());
+ return kernelProcessor.generateExecutionContext(queryContext,
database, globalRuleMetaData, metaData.getProps(),
connection.getDatabaseConnectionManager().getConnectionContext());
+ }
+
+ private void clearStatements() throws SQLException {
+ for (Statement each : statements) {
+ each.close();
+ }
+ statements.clear();
+ }
+
+ private boolean hasRawExecutionRule(final ShardingSphereDatabase database)
{
+ return
!database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty();
+ }
+
+ private ExecutionGroupContext<RawSQLExecutionUnit>
createRawExecutionGroupContext(final ShardingSphereMetaData metaData,
+
final ShardingSphereDatabase database, final ExecutionContext
executionContext) throws SQLException {
+ int maxConnectionsSizePerQuery =
metaData.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
+ return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery,
database.getRuleMetaData().getRules()).prepare(
+ executionContext.getRouteContext(),
executionContext.getExecutionUnits(), new
ExecutionGroupReportContext(connection.getProcessId(), database.getName(), new
Grantee("", "")));
+ }
+
+ private MergedResult mergeQuery(final ShardingSphereMetaData metaData,
final ShardingSphereDatabase database,
+ final List<QueryResult> queryResults,
final SQLStatementContext sqlStatementContext) throws SQLException {
+ MergeEngine mergeEngine = new
MergeEngine(metaData.getGlobalRuleMetaData(), database, metaData.getProps(),
connection.getDatabaseConnectionManager().getConnectionContext());
+ return mergeEngine.merge(queryResults, sqlStatementContext);
+ }
+
+ private List<ResultSet> getResultSets() throws SQLException {
+ List<ResultSet> result = new ArrayList<>(statements.size());
+ for (Statement each : statements) {
+ if (null != each.getResultSet()) {
+ result.add(each.getResultSet());
+ }
+ }
+ return result;
+ }
+
/**
* Execute advance update.
*
@@ -159,7 +287,7 @@ public final class DriverExecutor implements AutoCloseable {
if (sqlFederationEngine.decide(queryContext.getSqlStatementContext(),
queryContext.getParameters(), database, metaData.getGlobalRuleMetaData())) {
executeType = ExecuteType.FEDERATION;
ResultSet resultSet = sqlFederationEngine.executeQuery(
- prepareEngine, getSQLFederationCallback(database,
queryContext, prepareEngine), new SQLFederationContext(false, queryContext,
metaData, connection.getProcessId()));
+ prepareEngine, getExecuteQueryCallback(database,
queryContext, prepareEngine.getType()), new SQLFederationContext(false,
queryContext, metaData, connection.getProcessId()));
return Optional.of(null != resultSet);
}
return Optional.empty();
@@ -181,6 +309,14 @@ public final class DriverExecutor implements AutoCloseable
{
}
}
+ /**
+ * Clear.
+ */
+ public void clear() {
+ statements.clear();
+ parameterSets.clear();
+ }
+
@Override
public void close() throws SQLException {
sqlFederationEngine.close();
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/resultset/ShardingSphereResultSet.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/resultset/ShardingSphereResultSet.java
index d293eac5992..cf23ec8d66a 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/resultset/ShardingSphereResultSet.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/resultset/ShardingSphereResultSet.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.driver.jdbc.core.resultset;
+import lombok.Getter;
import org.apache.shardingsphere.driver.jdbc.adapter.AbstractResultSetAdapter;
import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
import
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
@@ -62,6 +63,7 @@ public final class ShardingSphereResultSet extends
AbstractResultSetAdapter {
private final MergedResult mergeResultSet;
+ @Getter
private final Map<String, Integer> columnLabelAndIndexMap;
public ShardingSphereResultSet(final List<ResultSet> resultSets, final
MergedResult mergeResultSet, final Statement statement, final boolean
selectContainsEnhancedTable,
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index 2326ca199fd..5f5d9d8443f 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -23,7 +23,6 @@ import lombok.Getter;
import org.apache.shardingsphere.driver.executor.DriverExecutor;
import org.apache.shardingsphere.driver.executor.batch.BatchExecutionUnit;
import
org.apache.shardingsphere.driver.executor.batch.BatchPreparedStatementExecutor;
-import
org.apache.shardingsphere.driver.executor.callback.impl.PreparedStatementExecuteQueryCallback;
import
org.apache.shardingsphere.driver.jdbc.adapter.AbstractPreparedStatementAdapter;
import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
import
org.apache.shardingsphere.driver.jdbc.core.resultset.GeneratedKeysResultSet;
@@ -99,7 +98,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.stream.Collectors;
/**
* ShardingSphere prepared statement.
@@ -224,13 +222,16 @@ public final class ShardingSpherePreparedStatement
extends AbstractPreparedState
QueryContext queryContext = createQueryContext();
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
- Optional<ResultSet> advancedResultSet =
executor.executeAdvanceQuery(metaDataContexts.getMetaData(), database,
queryContext, createDriverExecutionPrepareEngine(database));
- if (advancedResultSet.isPresent()) {
- currentResultSet = advancedResultSet.get();
- return currentResultSet;
+ findGeneratedKey().ifPresent(optional ->
generatedValues.addAll(optional.getGeneratedValues()));
+ currentResultSet =
executor.executeAdvanceQuery(metaDataContexts.getMetaData(), database,
queryContext, createDriverExecutionPrepareEngine(database), this,
columnLabelAndIndexMap,
+ (StatementReplayCallback<PreparedStatement>) this::replay);
+ if (currentResultSet instanceof ShardingSphereResultSet) {
+ columnLabelAndIndexMap = ((ShardingSphereResultSet)
currentResultSet).getColumnLabelAndIndexMap();
}
- ExecutionContext executionContext =
createExecutionContext(queryContext);
- currentResultSet = doExecuteQuery(executionContext);
+ for (Statement each : executor.getStatements()) {
+ statements.add((PreparedStatement) each);
+ }
+ parameterSets.addAll(executor.getParameterSets());
return currentResultSet;
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
@@ -243,28 +244,6 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
}
}
- private ShardingSphereResultSet doExecuteQuery(final ExecutionContext
executionContext) throws SQLException {
- List<QueryResult> queryResults = executeQuery0(executionContext);
- MergedResult mergedResult = mergeQuery(queryResults,
sqlStatementContext);
- List<ResultSet> resultSets = getResultSets();
- if (null == columnLabelAndIndexMap) {
- columnLabelAndIndexMap =
ShardingSphereResultSetUtils.createColumnLabelAndIndexMap(sqlStatementContext,
selectContainsEnhancedTable, resultSets.get(0).getMetaData());
- }
- return new ShardingSphereResultSet(resultSets, mergedResult, this,
selectContainsEnhancedTable, sqlStatementContext, columnLabelAndIndexMap);
- }
-
- private List<QueryResult> executeQuery0(final ExecutionContext
executionContext) throws SQLException {
- if (hasRawExecutionRule()) {
- return
executor.getRawExecutor().execute(createRawExecutionGroupContext(executionContext),
- executionContext.getQueryContext(), new
RawSQLExecutorCallback()).stream().map(QueryResult.class::cast).collect(Collectors.toList());
- }
- ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
createExecutionGroupContext(executionContext);
- cacheStatements(executionGroupContext.getInputGroups());
- PreparedStatementExecuteQueryCallback callback = new
PreparedStatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(),
-
metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(),
sqlStatement, SQLExecutorExceptionHandler.isExceptionThrown());
- return
executor.getRegularExecutor().executeQuery(executionGroupContext,
executionContext.getQueryContext(), callback);
- }
-
private boolean hasRawExecutionRule() {
return
!metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty();
}
@@ -278,7 +257,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
private void resetParameters() throws SQLException {
parameterSets.clear();
parameterSets.add(getParameters());
- replaySetParameter();
+ replaySetParameter(statements, parameterSets);
}
private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
createDriverExecutionPrepareEngine(final ShardingSphereDatabase database) {
@@ -513,17 +492,17 @@ public final class ShardingSpherePreparedStatement
extends AbstractPreparedState
parameterSets.add(eachInput.getExecutionUnit().getSqlUnit().getParameters());
});
}
- replay();
+ replay(statements, parameterSets);
}
- private void replay() throws SQLException {
- replaySetParameter();
+ private void replay(final List<PreparedStatement> statements, final
List<List<Object>> parameterSets) throws SQLException {
+ replaySetParameter(statements, parameterSets);
for (Statement each : statements) {
getMethodInvocationRecorder().replay(each);
}
}
- private void replaySetParameter() throws SQLException {
+ private void replaySetParameter(final List<PreparedStatement> statements,
final List<List<Object>> parameterSets) throws SQLException {
for (int i = 0; i < statements.size(); i++) {
replaySetParameter(statements.get(i), parameterSets.get(i));
}
@@ -534,6 +513,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
statements.clear();
parameterSets.clear();
generatedValues.clear();
+ executor.clear();
}
private Optional<GeneratedKeyContext> findGeneratedKey() {
@@ -567,7 +547,8 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
public void addBatch() {
try {
QueryContext queryContext = createQueryContext();
- executionContext = connection.getTrafficInstanceId(trafficRule,
queryContext)
+ Optional<String> trafficInstanceId =
connection.getTrafficInstanceId(trafficRule, queryContext);
+ executionContext = trafficInstanceId
.map(optional -> createExecutionContext(queryContext,
optional)).orElseGet(() -> createExecutionContext(queryContext));
batchPreparedStatementExecutor.addBatchForExecutionUnits(executionContext.getExecutionUnits());
} finally {
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index af0cbc38445..bda16fec1d4 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -23,7 +23,6 @@ import
org.apache.shardingsphere.driver.executor.DriverExecutor;
import org.apache.shardingsphere.driver.executor.batch.BatchStatementExecutor;
import org.apache.shardingsphere.driver.executor.callback.ExecuteCallback;
import
org.apache.shardingsphere.driver.executor.callback.ExecuteUpdateCallback;
-import
org.apache.shardingsphere.driver.executor.callback.impl.StatementExecuteQueryCallback;
import org.apache.shardingsphere.driver.jdbc.adapter.AbstractStatementAdapter;
import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
import
org.apache.shardingsphere.driver.jdbc.core.resultset.GeneratedKeysResultSet;
@@ -152,13 +151,9 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
sqlStatementContext = queryContext.getSqlStatementContext();
- Optional<ResultSet> advancedResultSet =
executor.executeAdvanceQuery(metaDataContexts.getMetaData(), database,
queryContext, createDriverExecutionPrepareEngine(database));
- if (advancedResultSet.isPresent()) {
- currentResultSet = advancedResultSet.get();
- return currentResultSet;
- }
- ExecutionContext executionContext =
createExecutionContext(queryContext);
- currentResultSet = doExecuteQuery(executionContext);
+ currentResultSet =
executor.executeAdvanceQuery(metaDataContexts.getMetaData(), database,
queryContext, createDriverExecutionPrepareEngine(database), this, null,
+ (StatementReplayCallback<Statement>) (statements,
parameterSets) -> replay(statements));
+ statements.addAll(executor.getStatements());
return currentResultSet;
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
@@ -169,30 +164,6 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
}
}
- private ShardingSphereResultSet doExecuteQuery(final ExecutionContext
executionContext) throws SQLException {
- List<QueryResult> queryResults = executeQuery0(executionContext);
- MergedResult mergedResult = mergeQuery(queryResults,
sqlStatementContext);
- boolean selectContainsEnhancedTable = sqlStatementContext instanceof
SelectStatementContext && ((SelectStatementContext)
sqlStatementContext).isContainsEnhancedTable();
- return new ShardingSphereResultSet(getResultSets(), mergedResult,
this, selectContainsEnhancedTable, sqlStatementContext);
- }
-
- private List<QueryResult> executeQuery0(final ExecutionContext
executionContext) throws SQLException {
- if (hasRawExecutionRule()) {
- return
executor.getRawExecutor().execute(createRawExecutionGroupContext(executionContext),
- executionContext.getQueryContext(), new
RawSQLExecutorCallback()).stream().map(QueryResult.class::cast).collect(Collectors.toList());
- }
- ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
createExecutionGroupContext(executionContext);
- cacheStatements(executionGroupContext.getInputGroups());
- StatementExecuteQueryCallback callback = new
StatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(),
-
metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(),
sqlStatementContext.getSqlStatement(),
- SQLExecutorExceptionHandler.isExceptionThrown());
- return
executor.getRegularExecutor().executeQuery(executionGroupContext,
executionContext.getQueryContext(), callback);
- }
-
- private boolean hasRawExecutionRule() {
- return
!metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty();
- }
-
private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
createDriverExecutionPrepareEngine(final ShardingSphereDatabase database) {
int maxConnectionsSizePerQuery =
metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
return new DriverExecutionPrepareEngine<>(JDBCDriverType.STATEMENT,
maxConnectionsSizePerQuery, connection.getDatabaseConnectionManager(),
statementManager, statementOption,
@@ -470,7 +441,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
for (ExecutionGroup<JDBCExecutionUnit> each : executionGroups) {
statements.addAll(each.getInputs().stream().map(JDBCExecutionUnit::getStorageResource).collect(Collectors.toList()));
}
- replay();
+ replay(statements);
}
private JDBCExecutorCallback<Boolean> createExecuteCallback(final
ExecuteCallback executeCallback, final SQLStatement sqlStatement) {
@@ -490,7 +461,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
};
}
- private void replay() throws SQLException {
+ private void replay(final List<Statement> statements) throws SQLException {
for (Statement each : statements) {
getMethodInvocationRecorder().replay(each);
}
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/StatementReplayCallback.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/StatementReplayCallback.java
new file mode 100644
index 00000000000..0ad5b4456c0
--- /dev/null
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/StatementReplayCallback.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.driver.jdbc.core.statement;
+
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+
+/**
+ * Statement replay callback.
+ *
+ * @param <T> type of statement
+ */
+public interface StatementReplayCallback<T extends Statement> {
+
+ /**
+ * Replay statement.
+ *
+ * @param statements statements
+ * @param parameterSets parameter sets
+ * @throws SQLException SQL exception
+ */
+ void replay(List<T> statements, List<List<Object>> parameterSets) throws
SQLException;
+}