This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 407e320 Retrieve federated execution from jdbc statement and proxy
statement. (#10720)
407e320 is described below
commit 407e320d16eba1aacbd4d9f3d0e457790bf23b8d
Author: Juan Pan(Trista) <[email protected]>
AuthorDate: Tue Jun 8 20:50:10 2021 +0800
Retrieve federated execution from jdbc statement and proxy statement.
(#10720)
* Retrieve federated execution from jdbc statement and proxy statement.
* reverse
* fix close()
* get results
---
.../sql/federate/execute/FederateExecutor.java | 9 ++-
.../sql/federate/execute/FederateJDBCExecutor.java | 79 ++++++++++++++++------
.../federate/execute/raw/FederateRawExecutor.java | 6 +-
.../optimize/context/OptimizeContextFactory.java | 1 +
.../jdbc/adapter/AbstractStatementAdapter.java | 9 +--
.../statement/ShardingSpherePreparedStatement.java | 33 ++++-----
.../core/statement/ShardingSphereStatement.java | 26 +++----
.../backend/communication/ProxySQLExecutor.java | 15 ++--
8 files changed, 99 insertions(+), 79 deletions(-)
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateExecutor.java
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateExecutor.java
index f2ab39e..58da05e 100644
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateExecutor.java
+++
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateExecutor.java
@@ -17,6 +17,9 @@
package org.apache.shardingsphere.infra.executor.sql.federate.execute;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
+import
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import java.sql.ResultSet;
@@ -31,12 +34,12 @@ public interface FederateExecutor {
/**
* Execute query.
*
- * @param sql sql
- * @param parameters parameters
+ * @param executionContext execution context
+ * @param callback callback
* @return execute result
* @throws SQLException SQL exception
*/
- List<QueryResult> executeQuery(String sql, List<Object> parameters) throws
SQLException;
+ List<QueryResult> executeQuery(ExecutionContext executionContext,
JDBCExecutorCallback<? extends ExecuteResult> callback) throws SQLException;
/**
* Close.
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateJDBCExecutor.java
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateJDBCExecutor.java
index 00f474a..bc17b25 100644
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateJDBCExecutor.java
+++
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateJDBCExecutor.java
@@ -19,10 +19,19 @@ package
org.apache.shardingsphere.infra.executor.sql.federate.execute;
import org.apache.calcite.config.CalciteConnectionProperty;
import org.apache.calcite.jdbc.CalciteConnection;
-import org.apache.calcite.schema.SchemaPlus;
+import
org.apache.shardingsphere.infra.config.properties.ConfigurationProperties;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
+import
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
-import org.apache.shardingsphere.infra.optimize.context.OptimizeContext;
+import
org.apache.shardingsphere.infra.executor.sql.federate.schema.FederateLogicSchema;
+import
org.apache.shardingsphere.infra.executor.sql.federate.schema.row.FederateRowExecutor;
+import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCManager;
+import org.apache.shardingsphere.infra.optimize.context.OptimizeContextFactory;
+import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -30,6 +39,7 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
@@ -43,9 +53,17 @@ public final class FederateJDBCExecutor implements
FederateExecutor {
public static final String DRIVER_NAME = "org.apache.calcite.jdbc.Driver";
- public static final Properties PROPERTIES = new Properties();
+ private final String schema;
- private final OptimizeContext context;
+ private final OptimizeContextFactory factory;
+
+ private final Collection<ShardingSphereRule> rules;
+
+ private final ConfigurationProperties props;
+
+ private final ExecutorJDBCManager jdbcManager;
+
+ private final JDBCExecutor jdbcExecutor;
private Statement statement;
@@ -57,23 +75,29 @@ public final class FederateJDBCExecutor implements
FederateExecutor {
}
}
- public FederateJDBCExecutor(final OptimizeContext context) {
- this.context = context;
- PROPERTIES.setProperty(CalciteConnectionProperty.LEX.camelName(),
context.getConnectionProperties().getProperty(CalciteConnectionProperty.LEX.camelName()));
-
PROPERTIES.setProperty(CalciteConnectionProperty.CONFORMANCE.camelName(),
context.getConnectionProperties().getProperty(CalciteConnectionProperty.CONFORMANCE.camelName()));
+ public FederateJDBCExecutor(final String schema, final
OptimizeContextFactory factory, final Collection<ShardingSphereRule> rules,
+ final ConfigurationProperties props, final
ExecutorJDBCManager jdbcManager, final JDBCExecutor jdbcExecutor) {
+ this.schema = schema;
+ this.factory = factory;
+ this.rules = rules;
+ this.props = props;
+ this.jdbcManager = jdbcManager;
+ this.jdbcExecutor = jdbcExecutor;
}
@Override
- public List<QueryResult> executeQuery(final String sql, final List<Object>
parameters) throws SQLException {
- QueryResult result = new JDBCStreamQueryResult(execute(sql,
parameters));
+ public List<QueryResult> executeQuery(final ExecutionContext
executionContext, final JDBCExecutorCallback<? extends ExecuteResult> callback)
throws SQLException {
+ QueryResult result = new
JDBCStreamQueryResult(execute(executionContext, callback));
return Collections.singletonList(result);
}
@Override
public void close() throws SQLException {
- Connection connection = statement.getConnection();
- connection.close();
- statement.close();
+ if (null != statement) {
+ Connection connection = statement.getConnection();
+ connection.close();
+ statement.close();
+ }
}
@Override
@@ -81,22 +105,35 @@ public final class FederateJDBCExecutor implements
FederateExecutor {
return statement.getResultSet();
}
- private ResultSet execute(final String sql, final List<Object> parameters)
throws SQLException {
- PreparedStatement statement = getConnection().prepareStatement(sql);
- setParameters(statement, parameters);
+ private ResultSet execute(final ExecutionContext executionContext, final
JDBCExecutorCallback<? extends ExecuteResult> callback) throws SQLException {
+ SQLUnit sqlUnit =
executionContext.getExecutionUnits().iterator().next().getSqlUnit();
+ PreparedStatement statement = getConnection(executionContext,
callback).prepareStatement(sqlUnit.getSql());
+ setParameters(statement, sqlUnit.getParameters());
this.statement = statement;
return statement.executeQuery();
}
- private Connection getConnection() throws SQLException {
- Connection result = DriverManager.getConnection(CONNECTION_URL,
PROPERTIES);
+ private Connection getConnection(final ExecutionContext executionContext,
final JDBCExecutorCallback<? extends ExecuteResult> callback) throws
SQLException {
+ Connection result = DriverManager.getConnection(CONNECTION_URL,
getProperties());
CalciteConnection calciteConnection =
result.unwrap(CalciteConnection.class);
- SchemaPlus rootSchema = calciteConnection.getRootSchema();
- rootSchema.add(context.getSchemaName(), context.getLogicSchema());
- calciteConnection.setSchema(context.getSchemaName());
+ addSchema(calciteConnection, executionContext, callback);
return result;
}
+ private Properties getProperties() {
+ Properties result = new Properties();
+ result.setProperty(CalciteConnectionProperty.LEX.camelName(),
factory.getProperties().getProperty(CalciteConnectionProperty.LEX.camelName()));
+ result.setProperty(CalciteConnectionProperty.CONFORMANCE.camelName(),
factory.getProperties().getProperty(CalciteConnectionProperty.CONFORMANCE.camelName()));
+ return result;
+ }
+
+ private void addSchema(final CalciteConnection calciteConnection, final
ExecutionContext executionContext, final JDBCExecutorCallback<? extends
ExecuteResult> callback) throws SQLException {
+ FederateRowExecutor executor = new FederateRowExecutor(rules, props,
jdbcManager, jdbcExecutor, executionContext, callback);
+ FederateLogicSchema logicSchema = new
FederateLogicSchema(factory.getSchemaMetadatas().getDefaultSchemaMetadata(),
executor);
+ calciteConnection.getRootSchema().add(schema, logicSchema);
+ calciteConnection.setSchema(schema);
+ }
+
private void setParameters(final PreparedStatement preparedStatement,
final List<Object> parameters) throws SQLException {
int count = 1;
for (Object each : parameters) {
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/raw/FederateRawExecutor.java
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/raw/FederateRawExecutor.java
index 128669f..112f797 100644
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/raw/FederateRawExecutor.java
+++
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/raw/FederateRawExecutor.java
@@ -24,12 +24,16 @@ import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
+import
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import
org.apache.shardingsphere.infra.executor.sql.federate.execute.FederateExecutor;
import org.apache.shardingsphere.infra.optimize.ShardingSphereOptimizer;
import org.apache.shardingsphere.infra.optimize.context.OptimizeContext;
import java.sql.ResultSet;
+import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
@@ -46,7 +50,7 @@ public final class FederateRawExecutor implements
FederateExecutor {
}
@Override
- public List<QueryResult> executeQuery(final String sql, final List<Object>
parameters) {
+ public List<QueryResult> executeQuery(final ExecutionContext
executionContext, final JDBCExecutorCallback<? extends ExecuteResult> callback)
throws SQLException {
// TODO
return Collections.emptyList();
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/context/OptimizeContextFactory.java
b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/context/OptimizeContextFactory.java
index b2f67e1..25b290b 100644
---
a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/context/OptimizeContextFactory.java
+++
b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/context/OptimizeContextFactory.java
@@ -67,6 +67,7 @@ public final class OptimizeContextFactory {
private static final String CONFORMANCE_CAMEL_NAME =
CalciteConnectionProperty.CONFORMANCE.camelName();
+ @Getter
private final Properties properties = new Properties();
private final CalciteConnectionConfig connectionConfig;
diff --git
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
index dc5d893..8893d7d 100644
---
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
+++
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
@@ -51,19 +51,12 @@ public abstract class AbstractStatementAdapter extends
AbstractUnsupportedOperat
closed = true;
try {
forceExecuteTemplate.execute((Collection) getRoutedStatements(),
Statement::close);
- closeFederateExecutor();
+ getFederateExecutor().close();
} finally {
getRoutedStatements().clear();
}
}
- private void closeFederateExecutor() throws SQLException {
- FederateExecutor executor = getFederateExecutor();
- if (null != executor) {
- executor.close();
- }
- }
-
@Override
public final boolean isClosed() {
return closed;
diff --git
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index 9b78174..bc334af 100644
---
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -47,7 +47,6 @@ import
org.apache.shardingsphere.infra.executor.check.SQLCheckEngine;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
-import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
@@ -62,8 +61,6 @@ import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.dr
import
org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
import
org.apache.shardingsphere.infra.executor.sql.federate.execute.FederateExecutor;
import
org.apache.shardingsphere.infra.executor.sql.federate.execute.FederateJDBCExecutor;
-import
org.apache.shardingsphere.infra.executor.sql.federate.schema.FederateLogicSchema;
-import
org.apache.shardingsphere.infra.executor.sql.federate.schema.row.FederateRowExecutor;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
@@ -116,9 +113,12 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
@Getter
private final ParameterMetaData parameterMetaData;
+ private final DriverJDBCExecutor driverJDBCExecutor;
+
private final RawExecutor rawExecutor;
- private final DriverJDBCExecutor driverJDBCExecutor;
+ @Getter(AccessLevel.PROTECTED)
+ private final FederateExecutor federateExecutor;
private final BatchPreparedStatementExecutor
batchPreparedStatementExecutor;
@@ -129,9 +129,6 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
private ExecutionContext executionContext;
private ResultSet currentResultSet;
-
- @Getter(AccessLevel.PROTECTED)
- private FederateExecutor federateExecutor;
public ShardingSpherePreparedStatement(final ShardingSphereConnection
connection, final String sql) throws SQLException {
this(connection, sql, ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT, false);
@@ -166,8 +163,11 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
parameterMetaData = new ShardingSphereParameterMetaData(sqlStatement);
statementOption = returnGeneratedKeys ? new StatementOption(true) :
new StatementOption(resultSetType, resultSetConcurrency, resultSetHoldability);
JDBCExecutor jdbcExecutor = new
JDBCExecutor(metaDataContexts.getExecutorEngine(),
connection.isHoldTransaction());
- rawExecutor = new RawExecutor(metaDataContexts.getExecutorEngine(),
connection.isHoldTransaction(), metaDataContexts.getProps());
driverJDBCExecutor = new DriverJDBCExecutor(metaDataContexts,
jdbcExecutor);
+ rawExecutor = new RawExecutor(metaDataContexts.getExecutorEngine(),
connection.isHoldTransaction(), metaDataContexts.getProps());
+ // TODO Consider FederateRawExecutor
+ federateExecutor = new FederateJDBCExecutor(DefaultSchema.LOGIC_NAME,
metaDataContexts.getOptimizeContextFactory(),
metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules(),
+ metaDataContexts.getProps(), connection, jdbcExecutor);
batchPreparedStatementExecutor = new
BatchPreparedStatementExecutor(metaDataContexts, jdbcExecutor);
kernelProcessor = new KernelProcessor();
}
@@ -189,7 +189,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
}
private List<ResultSet> getResultSetsForShardingSphereResultSet() throws
SQLException {
- if (null != federateExecutor) {
+ if (executionContext.getRouteContext().isFederated()) {
return Collections.singletonList(federateExecutor.getResultSet());
}
return
statements.stream().map(this::getResultSet).collect(Collectors.toList());
@@ -213,19 +213,12 @@ public final class ShardingSpherePreparedStatement
extends AbstractPreparedState
if (executionContext.getExecutionUnits().isEmpty()) {
return Collections.emptyList();
}
- federateExecutor = createFederateExecutor();
- SQLUnit sqlUnit =
executionContext.getExecutionUnits().iterator().next().getSqlUnit();
- return federateExecutor.executeQuery(sqlUnit.getSql(),
sqlUnit.getParameters());
- }
-
- private FederateExecutor createFederateExecutor() {
+ // TODO : Please fix me here.
+ // PreparedStatementExecuteQueryCallback callback = new
PreparedStatementExecuteQueryCallback(metaDataContexts.getDefaultMetaData().getResource().getDatabaseType(),
+ // sqlStatement,
SQLExecutorExceptionHandler.isExceptionThrown());
StatementExecuteQueryCallback callback = new
StatementExecuteQueryCallback(metaDataContexts.getDefaultMetaData().getResource().getDatabaseType(),
executionContext.getSqlStatementContext().getSqlStatement(),
SQLExecutorExceptionHandler.isExceptionThrown());
- FederateRowExecutor executor = new
FederateRowExecutor(metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules(),
- metaDataContexts.getProps(), connection,
driverJDBCExecutor.getJdbcExecutor(), executionContext, callback);
- // TODO Consider FederateRawExecutor
- FederateLogicSchema logicSchema = new
FederateLogicSchema(metaDataContexts.getOptimizeContextFactory().getSchemaMetadatas().getSchemas().get(DefaultSchema.LOGIC_NAME),
executor);
- return new
FederateJDBCExecutor(metaDataContexts.getOptimizeContextFactory().create(DefaultSchema.LOGIC_NAME,
logicSchema));
+ return federateExecutor.executeQuery(executionContext, callback);
}
@Override
diff --git
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index 9947317..c5c489b 100644
---
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -45,7 +45,6 @@ import
org.apache.shardingsphere.infra.executor.check.SQLCheckEngine;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
-import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
@@ -60,8 +59,6 @@ import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.dr
import
org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
import
org.apache.shardingsphere.infra.executor.sql.federate.execute.FederateExecutor;
import
org.apache.shardingsphere.infra.executor.sql.federate.execute.FederateJDBCExecutor;
-import
org.apache.shardingsphere.infra.executor.sql.federate.schema.FederateLogicSchema;
-import
org.apache.shardingsphere.infra.executor.sql.federate.schema.row.FederateRowExecutor;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
@@ -108,6 +105,9 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
private final RawExecutor rawExecutor;
+ @Getter(AccessLevel.PROTECTED)
+ private final FederateExecutor federateExecutor;
+
private final KernelProcessor kernelProcessor;
private boolean returnGeneratedKeys;
@@ -116,9 +116,6 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
private ResultSet currentResultSet;
- @Getter(AccessLevel.PROTECTED)
- private FederateExecutor federateExecutor;
-
public ShardingSphereStatement(final ShardingSphereConnection connection) {
this(connection, ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);
}
@@ -136,6 +133,9 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
JDBCExecutor jdbcExecutor = new
JDBCExecutor(metaDataContexts.getExecutorEngine(),
connection.isHoldTransaction());
driverJDBCExecutor = new DriverJDBCExecutor(metaDataContexts,
jdbcExecutor);
rawExecutor = new RawExecutor(metaDataContexts.getExecutorEngine(),
connection.isHoldTransaction(), metaDataContexts.getProps());
+ // TODO Consider FederateRawExecutor
+ federateExecutor = new FederateJDBCExecutor(DefaultSchema.LOGIC_NAME,
metaDataContexts.getOptimizeContextFactory(),
metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules(),
+ metaDataContexts.getProps(), connection, jdbcExecutor);
kernelProcessor = new KernelProcessor();
}
@@ -158,7 +158,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
}
private List<ResultSet> getResultSetsForShardingSphereResultSet() throws
SQLException {
- if (null != federateExecutor) {
+ if (executionContext.getRouteContext().isFederated()) {
return Collections.singletonList(federateExecutor.getResultSet());
}
return
statements.stream().map(this::getResultSet).collect(Collectors.toList());
@@ -183,19 +183,9 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
if (executionContext.getExecutionUnits().isEmpty()) {
return Collections.emptyList();
}
- federateExecutor = createFederateExecutor();
- SQLUnit sqlUnit =
executionContext.getExecutionUnits().iterator().next().getSqlUnit();
- return federateExecutor.executeQuery(sqlUnit.getSql(),
sqlUnit.getParameters());
- }
-
- private FederateExecutor createFederateExecutor() {
StatementExecuteQueryCallback callback = new
StatementExecuteQueryCallback(metaDataContexts.getDefaultMetaData().getResource().getDatabaseType(),
executionContext.getSqlStatementContext().getSqlStatement(),
SQLExecutorExceptionHandler.isExceptionThrown());
- FederateRowExecutor executor = new
FederateRowExecutor(metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules(),
- metaDataContexts.getProps(), connection,
driverJDBCExecutor.getJdbcExecutor(), executionContext, callback);
- // TODO Consider FederateRawExecutor
- FederateLogicSchema logicSchema = new
FederateLogicSchema(metaDataContexts.getOptimizeContextFactory().getSchemaMetadatas().getSchemas().get(DefaultSchema.LOGIC_NAME),
executor);
- return new
FederateJDBCExecutor(metaDataContexts.getOptimizeContextFactory().create(DefaultSchema.LOGIC_NAME,
logicSchema));
+ return federateExecutor.executeQuery(executionContext, callback);
}
@Override
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
index 988bbe0..a71eb60 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
@@ -23,7 +23,6 @@ import
org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
-import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
@@ -34,8 +33,6 @@ import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.
import
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
import
org.apache.shardingsphere.infra.executor.sql.federate.execute.FederateExecutor;
import
org.apache.shardingsphere.infra.executor.sql.federate.execute.FederateJDBCExecutor;
-import
org.apache.shardingsphere.infra.executor.sql.federate.schema.FederateLogicSchema;
-import
org.apache.shardingsphere.infra.executor.sql.federate.schema.row.FederateRowExecutor;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
import
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
@@ -74,6 +71,8 @@ public final class ProxySQLExecutor {
private final RawExecutor rawExecutor;
+ private final FederateExecutor federateExecutor;
+
public ProxySQLExecutor(final String type, final BackendConnection
backendConnection) {
this.type = type;
this.backendConnection = backendConnection;
@@ -82,6 +81,10 @@ public final class ProxySQLExecutor {
jdbcExecutor = new ProxyJDBCExecutor(type, backendConnection, new
JDBCExecutor(executorEngine, isSerialExecute));
MetaDataContexts metaDataContexts =
ProxyContext.getInstance().getMetaDataContexts();
rawExecutor = new RawExecutor(executorEngine, isSerialExecute,
metaDataContexts.getProps());
+ // TODO Consider FederateRawExecutor
+ federateExecutor = new
FederateJDBCExecutor(backendConnection.getSchemaName(),
metaDataContexts.getOptimizeContextFactory(),
+
metaDataContexts.getMetaData(backendConnection.getSchemaName()).getRuleMetaData().getRules(),
+ metaDataContexts.getProps(), backendConnection, new
JDBCExecutor(executorEngine, isSerialExecute));
}
/**
@@ -145,12 +148,8 @@ public final class ProxySQLExecutor {
MetaDataContexts metaData =
ProxyContext.getInstance().getMetaDataContexts();
ProxyJDBCExecutorCallback callback =
ProxyJDBCExecutorCallbackFactory.newInstance(type,
metaData.getMetaData(backendConnection.getSchemaName()).getResource().getDatabaseType(),
executionContext.getSqlStatementContext().getSqlStatement(),
backendConnection, isReturnGeneratedKeys, isExceptionThrown, true);
- FederateRowExecutor executor = new FederateRowExecutor(rules,
metaData.getProps(), backendConnection, jdbcExecutor.getJdbcExecutor(),
executionContext, callback);
- FederateLogicSchema logicSchema = new
FederateLogicSchema(metaData.getOptimizeContextFactory().getSchemaMetadatas().getSchemas().get(backendConnection.getSchemaName()),
executor);
- FederateExecutor federateExecutor = new
FederateJDBCExecutor(metaData.getOptimizeContextFactory().create(backendConnection.getSchemaName(),
logicSchema));
backendConnection.setFederateExecutor(federateExecutor);
- SQLUnit sqlUnit =
executionContext.getExecutionUnits().iterator().next().getSqlUnit();
- return federateExecutor.executeQuery(sqlUnit.getSql(),
sqlUnit.getParameters()).stream().map(each -> (ExecuteResult)
each).collect(Collectors.toList());
+ return federateExecutor.executeQuery(executionContext,
callback).stream().map(each -> (ExecuteResult)
each).collect(Collectors.toList());
}
private Collection<ExecuteResult> useDriverToExecute(final
ExecutionContext executionContext, final Collection<ShardingSphereRule> rules,