This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 407e320  Retrieve federated execution from jdbc statement and proxy 
statement. (#10720)
407e320 is described below

commit 407e320d16eba1aacbd4d9f3d0e457790bf23b8d
Author: Juan Pan(Trista) <[email protected]>
AuthorDate: Tue Jun 8 20:50:10 2021 +0800

    Retrieve federated execution from jdbc statement and proxy statement. 
(#10720)
    
    * Retrieve federated execution from jdbc statement and proxy statement.
    
    * reverse
    
    * fix close()
    
    * get results
---
 .../sql/federate/execute/FederateExecutor.java     |  9 ++-
 .../sql/federate/execute/FederateJDBCExecutor.java | 79 ++++++++++++++++------
 .../federate/execute/raw/FederateRawExecutor.java  |  6 +-
 .../optimize/context/OptimizeContextFactory.java   |  1 +
 .../jdbc/adapter/AbstractStatementAdapter.java     |  9 +--
 .../statement/ShardingSpherePreparedStatement.java | 33 ++++-----
 .../core/statement/ShardingSphereStatement.java    | 26 +++----
 .../backend/communication/ProxySQLExecutor.java    | 15 ++--
 8 files changed, 99 insertions(+), 79 deletions(-)

diff --git 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateExecutor.java
 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateExecutor.java
index f2ab39e..58da05e 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateExecutor.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateExecutor.java
@@ -17,6 +17,9 @@
 
 package org.apache.shardingsphere.infra.executor.sql.federate.execute;
 
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
 
 import java.sql.ResultSet;
@@ -31,12 +34,12 @@ public interface FederateExecutor {
     /**
      * Execute query.
      *
-     * @param sql sql
-     * @param parameters parameters
+     * @param executionContext execution context
+     * @param callback callback
      * @return execute result
      * @throws SQLException SQL exception
      */
-    List<QueryResult> executeQuery(String sql, List<Object> parameters) throws 
SQLException;
+    List<QueryResult> executeQuery(ExecutionContext executionContext, 
JDBCExecutorCallback<? extends ExecuteResult> callback) throws SQLException;
     
     /**
      * Close.
diff --git 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateJDBCExecutor.java
 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateJDBCExecutor.java
index 00f474a..bc17b25 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateJDBCExecutor.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateJDBCExecutor.java
@@ -19,10 +19,19 @@ package 
org.apache.shardingsphere.infra.executor.sql.federate.execute;
 
 import org.apache.calcite.config.CalciteConnectionProperty;
 import org.apache.calcite.jdbc.CalciteConnection;
-import org.apache.calcite.schema.SchemaPlus;
+import 
org.apache.shardingsphere.infra.config.properties.ConfigurationProperties;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
-import org.apache.shardingsphere.infra.optimize.context.OptimizeContext;
+import 
org.apache.shardingsphere.infra.executor.sql.federate.schema.FederateLogicSchema;
+import 
org.apache.shardingsphere.infra.executor.sql.federate.schema.row.FederateRowExecutor;
+import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCManager;
+import org.apache.shardingsphere.infra.optimize.context.OptimizeContextFactory;
+import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -30,6 +39,7 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
@@ -43,9 +53,17 @@ public final class FederateJDBCExecutor implements 
FederateExecutor {
     
     public static final String DRIVER_NAME = "org.apache.calcite.jdbc.Driver";
     
-    public static final Properties PROPERTIES = new Properties();
+    private final String schema;
     
-    private final OptimizeContext context;
+    private final OptimizeContextFactory factory;
+    
+    private final Collection<ShardingSphereRule> rules;
+    
+    private final ConfigurationProperties props;
+    
+    private final ExecutorJDBCManager jdbcManager;
+    
+    private final JDBCExecutor jdbcExecutor;
     
     private Statement statement;
     
@@ -57,23 +75,29 @@ public final class FederateJDBCExecutor implements 
FederateExecutor {
         }
     }
     
-    public FederateJDBCExecutor(final OptimizeContext context) {
-        this.context = context;
-        PROPERTIES.setProperty(CalciteConnectionProperty.LEX.camelName(), 
context.getConnectionProperties().getProperty(CalciteConnectionProperty.LEX.camelName()));
-        
PROPERTIES.setProperty(CalciteConnectionProperty.CONFORMANCE.camelName(), 
context.getConnectionProperties().getProperty(CalciteConnectionProperty.CONFORMANCE.camelName()));
+    public FederateJDBCExecutor(final String schema, final 
OptimizeContextFactory factory, final Collection<ShardingSphereRule> rules, 
+                                final ConfigurationProperties props, final 
ExecutorJDBCManager jdbcManager, final JDBCExecutor jdbcExecutor) {
+        this.schema = schema;
+        this.factory = factory;
+        this.rules = rules;
+        this.props = props;
+        this.jdbcManager = jdbcManager;
+        this.jdbcExecutor = jdbcExecutor;
     }
     
     @Override
-    public List<QueryResult> executeQuery(final String sql, final List<Object> 
parameters) throws SQLException {
-        QueryResult result = new JDBCStreamQueryResult(execute(sql, 
parameters));
+    public List<QueryResult> executeQuery(final ExecutionContext 
executionContext, final JDBCExecutorCallback<? extends ExecuteResult> callback) 
throws SQLException {
+        QueryResult result = new 
JDBCStreamQueryResult(execute(executionContext, callback));
         return Collections.singletonList(result);
     }
     
     @Override
     public void close() throws SQLException {
-        Connection connection = statement.getConnection();
-        connection.close();
-        statement.close();
+        if (null != statement) {
+            Connection connection = statement.getConnection();
+            connection.close();
+            statement.close();
+        }
     }
     
     @Override
@@ -81,22 +105,35 @@ public final class FederateJDBCExecutor implements 
FederateExecutor {
         return statement.getResultSet();
     }
     
-    private ResultSet execute(final String sql, final List<Object> parameters) 
throws SQLException {
-        PreparedStatement statement = getConnection().prepareStatement(sql);
-        setParameters(statement, parameters);
+    private ResultSet execute(final ExecutionContext executionContext, final 
JDBCExecutorCallback<? extends ExecuteResult> callback) throws SQLException {
+        SQLUnit sqlUnit = 
executionContext.getExecutionUnits().iterator().next().getSqlUnit();
+        PreparedStatement statement = getConnection(executionContext, 
callback).prepareStatement(sqlUnit.getSql());
+        setParameters(statement, sqlUnit.getParameters());
         this.statement = statement;
         return statement.executeQuery();
     }
     
-    private Connection getConnection() throws SQLException {
-        Connection result = DriverManager.getConnection(CONNECTION_URL, 
PROPERTIES);
+    private Connection getConnection(final ExecutionContext executionContext, 
final JDBCExecutorCallback<? extends ExecuteResult> callback) throws 
SQLException {
+        Connection result = DriverManager.getConnection(CONNECTION_URL, 
getProperties());
         CalciteConnection calciteConnection = 
result.unwrap(CalciteConnection.class);
-        SchemaPlus rootSchema = calciteConnection.getRootSchema();
-        rootSchema.add(context.getSchemaName(), context.getLogicSchema());
-        calciteConnection.setSchema(context.getSchemaName());
+        addSchema(calciteConnection, executionContext, callback);
         return result;
     }
     
+    private Properties getProperties() {
+        Properties result = new Properties();
+        result.setProperty(CalciteConnectionProperty.LEX.camelName(), 
factory.getProperties().getProperty(CalciteConnectionProperty.LEX.camelName()));
+        result.setProperty(CalciteConnectionProperty.CONFORMANCE.camelName(), 
factory.getProperties().getProperty(CalciteConnectionProperty.CONFORMANCE.camelName()));
+        return result;
+    }
+    
+    private void addSchema(final CalciteConnection calciteConnection, final 
ExecutionContext executionContext, final JDBCExecutorCallback<? extends 
ExecuteResult> callback) throws SQLException {
+        FederateRowExecutor executor = new FederateRowExecutor(rules, props, 
jdbcManager, jdbcExecutor, executionContext, callback);
+        FederateLogicSchema logicSchema = new 
FederateLogicSchema(factory.getSchemaMetadatas().getDefaultSchemaMetadata(), 
executor);
+        calciteConnection.getRootSchema().add(schema, logicSchema);
+        calciteConnection.setSchema(schema);
+    }
+    
     private void setParameters(final PreparedStatement preparedStatement, 
final List<Object> parameters) throws SQLException {
         int count = 1;
         for (Object each : parameters) {
diff --git 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/raw/FederateRawExecutor.java
 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/raw/FederateRawExecutor.java
index 128669f..112f797 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/raw/FederateRawExecutor.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/raw/FederateRawExecutor.java
@@ -24,12 +24,16 @@ import org.apache.calcite.linq4j.Enumerable;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
 import 
org.apache.shardingsphere.infra.executor.sql.federate.execute.FederateExecutor;
 import org.apache.shardingsphere.infra.optimize.ShardingSphereOptimizer;
 import org.apache.shardingsphere.infra.optimize.context.OptimizeContext;
 
 import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
 
@@ -46,7 +50,7 @@ public final class FederateRawExecutor implements 
FederateExecutor {
     }
     
     @Override
-    public List<QueryResult> executeQuery(final String sql, final List<Object> 
parameters) {
+    public List<QueryResult> executeQuery(final ExecutionContext 
executionContext, final JDBCExecutorCallback<? extends ExecuteResult> callback) 
throws SQLException {
         // TODO
         return Collections.emptyList();
     }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/context/OptimizeContextFactory.java
 
b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/context/OptimizeContextFactory.java
index b2f67e1..25b290b 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/context/OptimizeContextFactory.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/context/OptimizeContextFactory.java
@@ -67,6 +67,7 @@ public final class OptimizeContextFactory {
     
     private static final String CONFORMANCE_CAMEL_NAME = 
CalciteConnectionProperty.CONFORMANCE.camelName();
     
+    @Getter
     private final Properties properties = new Properties();
     
     private final CalciteConnectionConfig connectionConfig;
diff --git 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
index dc5d893..8893d7d 100644
--- 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
+++ 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
@@ -51,19 +51,12 @@ public abstract class AbstractStatementAdapter extends 
AbstractUnsupportedOperat
         closed = true;
         try {
             forceExecuteTemplate.execute((Collection) getRoutedStatements(), 
Statement::close);
-            closeFederateExecutor();
+            getFederateExecutor().close();
         } finally {
             getRoutedStatements().clear();
         }
     }
     
-    private void closeFederateExecutor() throws SQLException {
-        FederateExecutor executor = getFederateExecutor();
-        if (null != executor) {
-            executor.close();
-        }
-    }
-    
     @Override
     public final boolean isClosed() {
         return closed;
diff --git 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index 9b78174..bc334af 100644
--- 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++ 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -47,7 +47,6 @@ import 
org.apache.shardingsphere.infra.executor.check.SQLCheckEngine;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
 import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
-import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
@@ -62,8 +61,6 @@ import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.dr
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
 import 
org.apache.shardingsphere.infra.executor.sql.federate.execute.FederateExecutor;
 import 
org.apache.shardingsphere.infra.executor.sql.federate.execute.FederateJDBCExecutor;
-import 
org.apache.shardingsphere.infra.executor.sql.federate.schema.FederateLogicSchema;
-import 
org.apache.shardingsphere.infra.executor.sql.federate.schema.row.FederateRowExecutor;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
@@ -116,9 +113,12 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
     @Getter
     private final ParameterMetaData parameterMetaData;
     
+    private final DriverJDBCExecutor driverJDBCExecutor;
+    
     private final RawExecutor rawExecutor;
     
-    private final DriverJDBCExecutor driverJDBCExecutor;
+    @Getter(AccessLevel.PROTECTED)
+    private final FederateExecutor federateExecutor;
     
     private final BatchPreparedStatementExecutor 
batchPreparedStatementExecutor;
     
@@ -129,9 +129,6 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
     private ExecutionContext executionContext;
     
     private ResultSet currentResultSet;
-    
-    @Getter(AccessLevel.PROTECTED)
-    private FederateExecutor federateExecutor;
 
     public ShardingSpherePreparedStatement(final ShardingSphereConnection 
connection, final String sql) throws SQLException {
         this(connection, sql, ResultSet.TYPE_FORWARD_ONLY, 
ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT, false);
@@ -166,8 +163,11 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
         parameterMetaData = new ShardingSphereParameterMetaData(sqlStatement);
         statementOption = returnGeneratedKeys ? new StatementOption(true) : 
new StatementOption(resultSetType, resultSetConcurrency, resultSetHoldability);
         JDBCExecutor jdbcExecutor = new 
JDBCExecutor(metaDataContexts.getExecutorEngine(), 
connection.isHoldTransaction());
-        rawExecutor = new RawExecutor(metaDataContexts.getExecutorEngine(), 
connection.isHoldTransaction(), metaDataContexts.getProps());
         driverJDBCExecutor = new DriverJDBCExecutor(metaDataContexts, 
jdbcExecutor);
+        rawExecutor = new RawExecutor(metaDataContexts.getExecutorEngine(), 
connection.isHoldTransaction(), metaDataContexts.getProps());
+        // TODO Consider FederateRawExecutor
+        federateExecutor = new FederateJDBCExecutor(DefaultSchema.LOGIC_NAME, 
metaDataContexts.getOptimizeContextFactory(), 
metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules(),
+                metaDataContexts.getProps(), connection, jdbcExecutor);
         batchPreparedStatementExecutor = new 
BatchPreparedStatementExecutor(metaDataContexts, jdbcExecutor);
         kernelProcessor = new KernelProcessor();
     }
@@ -189,7 +189,7 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
     }
     
     private List<ResultSet> getResultSetsForShardingSphereResultSet() throws 
SQLException {
-        if (null != federateExecutor) {
+        if (executionContext.getRouteContext().isFederated()) {
             return Collections.singletonList(federateExecutor.getResultSet());
         }
         return 
statements.stream().map(this::getResultSet).collect(Collectors.toList());
@@ -213,19 +213,12 @@ public final class ShardingSpherePreparedStatement 
extends AbstractPreparedState
         if (executionContext.getExecutionUnits().isEmpty()) {
             return Collections.emptyList();
         }
-        federateExecutor = createFederateExecutor();
-        SQLUnit sqlUnit = 
executionContext.getExecutionUnits().iterator().next().getSqlUnit();
-        return federateExecutor.executeQuery(sqlUnit.getSql(), 
sqlUnit.getParameters());
-    }
-    
-    private FederateExecutor createFederateExecutor() {
+        // TODO : Please fix me here.
+        // PreparedStatementExecuteQueryCallback callback = new 
PreparedStatementExecuteQueryCallback(metaDataContexts.getDefaultMetaData().getResource().getDatabaseType(),
 
+        //         sqlStatement, 
SQLExecutorExceptionHandler.isExceptionThrown());
         StatementExecuteQueryCallback callback = new 
StatementExecuteQueryCallback(metaDataContexts.getDefaultMetaData().getResource().getDatabaseType(),
                 executionContext.getSqlStatementContext().getSqlStatement(), 
SQLExecutorExceptionHandler.isExceptionThrown());
-        FederateRowExecutor executor = new 
FederateRowExecutor(metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules(),
-                metaDataContexts.getProps(), connection, 
driverJDBCExecutor.getJdbcExecutor(), executionContext, callback);
-        // TODO Consider FederateRawExecutor
-        FederateLogicSchema logicSchema = new 
FederateLogicSchema(metaDataContexts.getOptimizeContextFactory().getSchemaMetadatas().getSchemas().get(DefaultSchema.LOGIC_NAME),
 executor);
-        return new 
FederateJDBCExecutor(metaDataContexts.getOptimizeContextFactory().create(DefaultSchema.LOGIC_NAME,
 logicSchema));
+        return federateExecutor.executeQuery(executionContext, callback);
     }
     
     @Override
diff --git 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index 9947317..c5c489b 100644
--- 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++ 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -45,7 +45,6 @@ import 
org.apache.shardingsphere.infra.executor.check.SQLCheckEngine;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
 import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
-import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
@@ -60,8 +59,6 @@ import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.dr
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
 import 
org.apache.shardingsphere.infra.executor.sql.federate.execute.FederateExecutor;
 import 
org.apache.shardingsphere.infra.executor.sql.federate.execute.FederateJDBCExecutor;
-import 
org.apache.shardingsphere.infra.executor.sql.federate.schema.FederateLogicSchema;
-import 
org.apache.shardingsphere.infra.executor.sql.federate.schema.row.FederateRowExecutor;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
@@ -108,6 +105,9 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
     
     private final RawExecutor rawExecutor;
     
+    @Getter(AccessLevel.PROTECTED)
+    private final FederateExecutor federateExecutor;
+    
     private final KernelProcessor kernelProcessor;
     
     private boolean returnGeneratedKeys;
@@ -116,9 +116,6 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
     
     private ResultSet currentResultSet;
     
-    @Getter(AccessLevel.PROTECTED)
-    private FederateExecutor federateExecutor;
-    
     public ShardingSphereStatement(final ShardingSphereConnection connection) {
         this(connection, ResultSet.TYPE_FORWARD_ONLY, 
ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);
     }
@@ -136,6 +133,9 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         JDBCExecutor jdbcExecutor = new 
JDBCExecutor(metaDataContexts.getExecutorEngine(), 
connection.isHoldTransaction());
         driverJDBCExecutor = new DriverJDBCExecutor(metaDataContexts, 
jdbcExecutor);
         rawExecutor = new RawExecutor(metaDataContexts.getExecutorEngine(), 
connection.isHoldTransaction(), metaDataContexts.getProps());
+        // TODO Consider FederateRawExecutor
+        federateExecutor = new FederateJDBCExecutor(DefaultSchema.LOGIC_NAME, 
metaDataContexts.getOptimizeContextFactory(), 
metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules(),
+                metaDataContexts.getProps(), connection, jdbcExecutor);
         kernelProcessor = new KernelProcessor();
     }
     
@@ -158,7 +158,7 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
     }
     
     private List<ResultSet> getResultSetsForShardingSphereResultSet() throws 
SQLException {
-        if (null != federateExecutor) {
+        if (executionContext.getRouteContext().isFederated()) {
             return Collections.singletonList(federateExecutor.getResultSet());
         }
         return 
statements.stream().map(this::getResultSet).collect(Collectors.toList());
@@ -183,19 +183,9 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         if (executionContext.getExecutionUnits().isEmpty()) {
             return Collections.emptyList();
         }
-        federateExecutor = createFederateExecutor();
-        SQLUnit sqlUnit = 
executionContext.getExecutionUnits().iterator().next().getSqlUnit();
-        return federateExecutor.executeQuery(sqlUnit.getSql(), 
sqlUnit.getParameters());
-    }
-    
-    private FederateExecutor createFederateExecutor() {
         StatementExecuteQueryCallback callback = new 
StatementExecuteQueryCallback(metaDataContexts.getDefaultMetaData().getResource().getDatabaseType(),
                 executionContext.getSqlStatementContext().getSqlStatement(), 
SQLExecutorExceptionHandler.isExceptionThrown());
-        FederateRowExecutor executor = new 
FederateRowExecutor(metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules(),
-                metaDataContexts.getProps(), connection, 
driverJDBCExecutor.getJdbcExecutor(), executionContext, callback);
-        // TODO Consider FederateRawExecutor
-        FederateLogicSchema logicSchema = new 
FederateLogicSchema(metaDataContexts.getOptimizeContextFactory().getSchemaMetadatas().getSchemas().get(DefaultSchema.LOGIC_NAME),
 executor);
-        return new 
FederateJDBCExecutor(metaDataContexts.getOptimizeContextFactory().create(DefaultSchema.LOGIC_NAME,
 logicSchema));
+        return federateExecutor.executeQuery(executionContext, callback);
     }
     
     @Override
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
index 988bbe0..a71eb60 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
@@ -23,7 +23,6 @@ import 
org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
 import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
-import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
@@ -34,8 +33,6 @@ import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
 import 
org.apache.shardingsphere.infra.executor.sql.federate.execute.FederateExecutor;
 import 
org.apache.shardingsphere.infra.executor.sql.federate.execute.FederateJDBCExecutor;
-import 
org.apache.shardingsphere.infra.executor.sql.federate.schema.FederateLogicSchema;
-import 
org.apache.shardingsphere.infra.executor.sql.federate.schema.row.FederateRowExecutor;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
@@ -74,6 +71,8 @@ public final class ProxySQLExecutor {
     
     private final RawExecutor rawExecutor;
     
+    private final FederateExecutor federateExecutor;
+    
     public ProxySQLExecutor(final String type, final BackendConnection 
backendConnection) {
         this.type = type;
         this.backendConnection = backendConnection;
@@ -82,6 +81,10 @@ public final class ProxySQLExecutor {
         jdbcExecutor = new ProxyJDBCExecutor(type, backendConnection, new 
JDBCExecutor(executorEngine, isSerialExecute));
         MetaDataContexts metaDataContexts = 
ProxyContext.getInstance().getMetaDataContexts();
         rawExecutor = new RawExecutor(executorEngine, isSerialExecute, 
metaDataContexts.getProps());
+        // TODO Consider FederateRawExecutor
+        federateExecutor = new 
FederateJDBCExecutor(backendConnection.getSchemaName(), 
metaDataContexts.getOptimizeContextFactory(), 
+                
metaDataContexts.getMetaData(backendConnection.getSchemaName()).getRuleMetaData().getRules(),
 
+                metaDataContexts.getProps(), backendConnection, new 
JDBCExecutor(executorEngine, isSerialExecute));
     }
     
     /**
@@ -145,12 +148,8 @@ public final class ProxySQLExecutor {
         MetaDataContexts metaData = 
ProxyContext.getInstance().getMetaDataContexts();
         ProxyJDBCExecutorCallback callback = 
ProxyJDBCExecutorCallbackFactory.newInstance(type, 
metaData.getMetaData(backendConnection.getSchemaName()).getResource().getDatabaseType(),
 
                 executionContext.getSqlStatementContext().getSqlStatement(), 
backendConnection, isReturnGeneratedKeys, isExceptionThrown, true);
-        FederateRowExecutor executor = new FederateRowExecutor(rules, 
metaData.getProps(), backendConnection, jdbcExecutor.getJdbcExecutor(), 
executionContext, callback);
-        FederateLogicSchema logicSchema = new 
FederateLogicSchema(metaData.getOptimizeContextFactory().getSchemaMetadatas().getSchemas().get(backendConnection.getSchemaName()),
 executor);
-        FederateExecutor federateExecutor = new 
FederateJDBCExecutor(metaData.getOptimizeContextFactory().create(backendConnection.getSchemaName(),
 logicSchema));
         backendConnection.setFederateExecutor(federateExecutor);
-        SQLUnit sqlUnit = 
executionContext.getExecutionUnits().iterator().next().getSqlUnit();
-        return federateExecutor.executeQuery(sqlUnit.getSql(), 
sqlUnit.getParameters()).stream().map(each -> (ExecuteResult) 
each).collect(Collectors.toList());
+        return federateExecutor.executeQuery(executionContext, 
callback).stream().map(each -> (ExecuteResult) 
each).collect(Collectors.toList());
     }
     
     private Collection<ExecuteResult> useDriverToExecute(final 
ExecutionContext executionContext, final Collection<ShardingSphereRule> rules, 

Reply via email to