This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new a80f057  refactor federation executor scan table logic (#13988)
a80f057 is described below

commit a80f057f2eafcdaa34b6fb7ed15f4ede0bb470c4
Author: Zhengqiang Duan <[email protected]>
AuthorDate: Thu Dec 9 09:39:00 2021 +0800

    refactor federation executor scan table logic (#13988)
    
    * refactor federation executor scan table logic
    
    * remove useless class
    
    * adjust adaptor execute logic
    
    * optimize logic
    
    * resolve code conflict
    
    * fix checkstyle
    
    * optimize logic
    
    * optimize logic
    
    * optimize logic
    
    * optimize java doc
    
    * remove useless method
    
    * fix integration test
    
    * fix integration test
    
    * setParameters for jdbc statement
    
    * fix integration test
    
    * fix integration test
    
    * fix integration test
    
    * optimize binding table judge logic
    
    * optimize rel to sql converter init logic
    
    * optimize binding table judge logic when join condition in where segment
---
 .../shardingsphere/sharding/rule/ShardingRule.java |  18 ++-
 .../pom.xml                                        |  10 ++
 .../federation/executor/FederationExecutor.java    |  12 +-
 .../customized/CustomizedFilterableExecutor.java   |  11 +-
 .../original/OriginalFilterableExecutor.java       |  32 ++--
 .../original/row/FilterableRowEnumerator.java      |  45 ++----
 .../sql/FilterableExecutionContextGenerator.java   |  76 ----------
 .../original/sql/FilterableSQLGenerator.java       |  63 --------
 .../executor/original/sql/RouteContextFilter.java  |  57 -------
 .../executor/original/table/FilterableTable.java   |  14 +-
 .../table/FilterableTableScanExecutor.java         | 163 +++++++++++++++++++--
 .../optimizer/context/OptimizerContext.java        |   8 +-
 .../optimizer/context/OptimizerContextFactory.java |   8 +-
 .../planner/OptimizerPlannerContextFactory.java    |   3 +-
 .../optimizer/ShardingSphereOptimizerTest.java     |  15 +-
 .../driver/executor/DriverJDBCExecutor.java        |   2 +-
 .../statement/ShardingSpherePreparedStatement.java |  43 +++---
 .../core/statement/ShardingSphereStatement.java    |  47 +++---
 .../mode/manager/ContextManager.java               |  18 +--
 .../mode/metadata/MetaDataContexts.java            |   8 +-
 .../mode/metadata/MetaDataContextsBuilder.java     |  12 +-
 .../communication/DatabaseCommunicationEngine.java |  63 +++++++-
 .../backend/communication/ProxySQLExecutor.java    |  36 +----
 .../select_join_encrypt.xml                        |  12 +-
 .../cases/dql/dql-integration-test-cases.xml       |   2 +-
 25 files changed, 364 insertions(+), 414 deletions(-)

diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/rule/ShardingRule.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/rule/ShardingRule.java
index e6bf2a7..9aefc06 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/rule/ShardingRule.java
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/rule/ShardingRule.java
@@ -55,6 +55,7 @@ import 
org.apache.shardingsphere.sql.parser.sql.common.util.WhereExtractUtil;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
@@ -288,7 +289,13 @@ public final class ShardingRule implements SchemaRule, 
DataNodeContainedRule, Ta
         if (!(sqlStatementContext instanceof SelectStatementContext && 
((SelectStatementContext) sqlStatementContext).isContainsJoinQuery())) {
             return isAllBindingTables(logicTableNames);
         }
-        return isAllBindingTables(logicTableNames) && 
isJoinConditionContainsShardingColumns(schema, (SelectStatementContext) 
sqlStatementContext, logicTableNames);
+        if (!isAllBindingTables(logicTableNames)) {
+            return false;
+        }
+        SelectStatementContext select = (SelectStatementContext) 
sqlStatementContext;
+        Collection<WhereSegment> joinSegments = 
WhereExtractUtil.getJoinWhereSegments(select.getSqlStatement());
+        Collection<WhereSegment> whereSegments = select.getWhere().isPresent() 
? Collections.singletonList(select.getWhere().get()) : Collections.emptyList();
+        return isJoinConditionContainsShardingColumns(schema, select, 
logicTableNames, joinSegments) || 
isJoinConditionContainsShardingColumns(schema, select, logicTableNames, 
whereSegments);
     }
     
     private Optional<BindingTableRule> findBindingTableRule(final 
Collection<String> logicTableNames) {
@@ -554,10 +561,11 @@ public final class ShardingRule implements SchemaRule, 
DataNodeContainedRule, Ta
         return ShardingRule.class.getSimpleName();
     }
     
-    private boolean isJoinConditionContainsShardingColumns(final 
ShardingSphereSchema schema, final SelectStatementContext select, final 
Collection<String> tableNames) {
+    private boolean isJoinConditionContainsShardingColumns(final 
ShardingSphereSchema schema, final SelectStatementContext select, 
+                                                           final 
Collection<String> tableNames, final Collection<WhereSegment> whereSegments) {
         Collection<String> databaseJoinConditionTables = new 
HashSet<>(tableNames.size());
         Collection<String> tableJoinConditionTables = new 
HashSet<>(tableNames.size());
-        for (WhereSegment each : 
WhereExtractUtil.getJoinWhereSegments(select.getSqlStatement())) {
+        for (WhereSegment each : whereSegments) {
             Collection<AndPredicate> andPredicates = 
ExpressionExtractUtil.getAndPredicates(each.getExpr());
             if (andPredicates.size() > 1) {
                 return false;
@@ -568,9 +576,9 @@ public final class ShardingRule implements SchemaRule, 
DataNodeContainedRule, Ta
             }
         }
         TableRule tableRule = getTableRule(tableNames.iterator().next());
-        boolean containsDatabaseShardingColumns = 
!(tableRule.getDatabaseShardingStrategyConfig() instanceof 
StandardShardingStrategyConfiguration)
+        boolean containsDatabaseShardingColumns = 
!(getDatabaseShardingStrategyConfiguration(tableRule) instanceof 
StandardShardingStrategyConfiguration) 
                 || databaseJoinConditionTables.containsAll(tableNames);
-        boolean containsTableShardingColumns = 
!(tableRule.getTableShardingStrategyConfig() instanceof 
StandardShardingStrategyConfiguration) || 
tableJoinConditionTables.containsAll(tableNames);
+        boolean containsTableShardingColumns = 
!(getTableShardingStrategyConfiguration(tableRule) instanceof 
StandardShardingStrategyConfiguration) || 
tableJoinConditionTables.containsAll(tableNames);
         return containsDatabaseShardingColumns && containsTableShardingColumns;
     }
     
diff --git 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/pom.xml
 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/pom.xml
index 707751a..7e79a9b 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/pom.xml
+++ 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/pom.xml
@@ -38,5 +38,15 @@
             <artifactId>shardingsphere-infra-executor</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.shardingsphere</groupId>
+            <artifactId>shardingsphere-infra-context</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.shardingsphere</groupId>
+            <artifactId>shardingsphere-infra-merge</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 </project>
diff --git 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/FederationExecutor.java
 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/FederationExecutor.java
index 53df685..b3f74c1 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/FederationExecutor.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/FederationExecutor.java
@@ -17,17 +17,15 @@
 
 package org.apache.shardingsphere.infra.federation.executor;
 
-import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import org.apache.shardingsphere.infra.binder.LogicSQL;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.List;
 
 /**
  * Federation executor.
@@ -39,12 +37,12 @@ public interface FederationExecutor extends AutoCloseable {
      *
      * @param prepareEngine prepare engine
      * @param callback callback
-     * @param executionContext execution context
-     * @return query results
+     * @param logicSQL logic SQL
+     * @return result set
      * @throws SQLException SQL exception
      */
-    List<QueryResult> 
executeQuery(DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
prepareEngine, 
-                                   JDBCExecutorCallback<? extends 
ExecuteResult> callback, ExecutionContext executionContext) throws SQLException;
+    ResultSet executeQuery(DriverExecutionPrepareEngine<JDBCExecutionUnit, 
Connection> prepareEngine, 
+                           JDBCExecutorCallback<? extends ExecuteResult> 
callback, LogicSQL logicSQL) throws SQLException;
     
     /**
      * Get result set.
diff --git 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/customized/CustomizedFilterableExecutor.java
 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/customized/CustomizedFilterableExecutor.java
index eb7bec2..558491b 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/customized/CustomizedFilterableExecutor.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/customized/CustomizedFilterableExecutor.java
@@ -23,11 +23,10 @@ import org.apache.calcite.linq4j.Enumerable;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
-import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import org.apache.shardingsphere.infra.binder.LogicSQL;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 import org.apache.shardingsphere.infra.federation.executor.FederationExecutor;
 import 
org.apache.shardingsphere.infra.federation.optimizer.ShardingSphereOptimizer;
@@ -37,8 +36,6 @@ import 
org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.Collections;
-import java.util.List;
 
 /**
  * Customized filterable executor.
@@ -55,10 +52,10 @@ public final class CustomizedFilterableExecutor implements 
FederationExecutor {
     }
     
     @Override
-    public List<QueryResult> executeQuery(final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, 
-                                          final JDBCExecutorCallback<? extends 
ExecuteResult> callback, final ExecutionContext executionContext) throws 
SQLException {
+    public ResultSet executeQuery(final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, 
+                                  final JDBCExecutorCallback<? extends 
ExecuteResult> callback, final LogicSQL logicSQL) throws SQLException {
         // TODO
-        return Collections.emptyList();
+        return null;
     }
     
     @Override
diff --git 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/OriginalFilterableExecutor.java
 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/OriginalFilterableExecutor.java
index 01a1a57..70e8506 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/OriginalFilterableExecutor.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/OriginalFilterableExecutor.java
@@ -19,15 +19,12 @@ package 
org.apache.shardingsphere.infra.federation.executor.original;
 
 import lombok.RequiredArgsConstructor;
 import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.shardingsphere.infra.binder.LogicSQL;
 import 
org.apache.shardingsphere.infra.config.properties.ConfigurationProperties;
-import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
-import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 import org.apache.shardingsphere.infra.federation.executor.FederationExecutor;
 import 
org.apache.shardingsphere.infra.federation.executor.original.table.FilterableTableScanExecutor;
@@ -40,7 +37,6 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.Collections;
 import java.util.List;
 
 /**
@@ -72,33 +68,25 @@ public final class OriginalFilterableExecutor implements 
FederationExecutor {
     }
     
     @Override
-    public List<QueryResult> executeQuery(final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, 
-                                          final JDBCExecutorCallback<? extends 
ExecuteResult> callback, final ExecutionContext executionContext) throws 
SQLException {
-        ResultSet resultSet = execute(prepareEngine, callback, 
executionContext);
-        return Collections.singletonList(new JDBCStreamQueryResult(resultSet));
-    }
-    
-    private ResultSet execute(final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, 
-                              final JDBCExecutorCallback<? extends 
ExecuteResult> callback, final ExecutionContext executionContext) throws 
SQLException {
-        SQLUnit sqlUnit = 
executionContext.getExecutionUnits().iterator().next().getSqlUnit();
-        PreparedStatement preparedStatement = createConnection(prepareEngine, 
callback, 
executionContext).prepareStatement(SQLUtil.trimSemicolon(sqlUnit.getSql()));
-        setParameters(preparedStatement, sqlUnit.getParameters());
+    public ResultSet executeQuery(final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, 
+                                  final JDBCExecutorCallback<? extends 
ExecuteResult> callback, final LogicSQL logicSQL) throws SQLException {
+        PreparedStatement preparedStatement = createConnection(prepareEngine, 
callback, 
logicSQL.getParameters()).prepareStatement(SQLUtil.trimSemicolon(logicSQL.getSql()));
+        setParameters(preparedStatement, logicSQL.getParameters());
         this.statement = preparedStatement;
         return preparedStatement.executeQuery();
     }
     
     private Connection createConnection(final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, 
-                                        final JDBCExecutorCallback<? extends 
ExecuteResult> callback, final ExecutionContext executionContext) throws 
SQLException {
+                                        final JDBCExecutorCallback<? extends 
ExecuteResult> callback, final List<Object> parameters) throws SQLException {
         Connection result = DriverManager.getConnection(CONNECTION_URL, 
optimizerContext.getParserContexts().get(schemaName).getDialectProps());
-        addSchema(result.unwrap(CalciteConnection.class), prepareEngine, 
callback, executionContext);
+        addSchema(result.unwrap(CalciteConnection.class), prepareEngine, 
callback, parameters);
         return result;
     }
     
     private void addSchema(final CalciteConnection connection, final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, 
-                           final JDBCExecutorCallback<? extends ExecuteResult> 
callback, final ExecutionContext executionContext) throws SQLException {
-        FilterableTableScanExecutor executor = new FilterableTableScanExecutor(
-                prepareEngine, jdbcExecutor, callback, props, 
executionContext, 
optimizerContext.getParserContexts().get(schemaName).getDatabaseType().getQuoteCharacter());
-        FilterableSchema schema = new 
FilterableSchema(optimizerContext.getMetaData().getSchemas().get(schemaName), 
executor);
+                           final JDBCExecutorCallback<? extends ExecuteResult> 
callback, final List<Object> parameters) throws SQLException {
+        FilterableTableScanExecutor executor = new 
FilterableTableScanExecutor(prepareEngine, jdbcExecutor, callback, props, 
optimizerContext, schemaName, parameters);
+        FilterableSchema schema = new 
FilterableSchema(optimizerContext.getFederationMetaData().getSchemas().get(schemaName),
 executor);
         connection.getRootSchema().add(schemaName, schema);
         connection.setSchema(schemaName);
     }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/row/FilterableRowEnumerator.java
 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/row/FilterableRowEnumerator.java
index 3b55e96..83ee68f 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/row/FilterableRowEnumerator.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/row/FilterableRowEnumerator.java
@@ -18,31 +18,25 @@
 package org.apache.shardingsphere.infra.federation.executor.original.row;
 
 import org.apache.calcite.linq4j.Enumerator;
-import org.apache.shardingsphere.infra.exception.ShardingSphereException;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResultMetaData;
+import org.apache.shardingsphere.infra.merge.result.MergedResult;
 
 import java.sql.SQLException;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
 
 /**
  * Filterable row enumerator.
  */
 public final class FilterableRowEnumerator implements Enumerator<Object[]> {
     
-    private final Collection<QueryResult> queryResults = new LinkedList<>();
+    private final MergedResult result;
     
-    private final Iterator<QueryResult> iterator;
-    
-    private QueryResult currentResultSet;
+    private final QueryResultMetaData metaData;
     
     private Object[] currentRow;
     
-    public FilterableRowEnumerator(final Collection<QueryResult> queryResults) 
{
-        this.queryResults.addAll(queryResults);
-        iterator = this.queryResults.iterator();
-        currentResultSet = iterator.next();
+    public FilterableRowEnumerator(final MergedResult queryResult, final 
QueryResultMetaData metaData) {
+        this.result = queryResult;
+        this.metaData = metaData;
     }
     
     @Override
@@ -60,23 +54,17 @@ public final class FilterableRowEnumerator implements 
Enumerator<Object[]> {
     }
     
     private boolean moveNext0() throws SQLException {
-        if (currentResultSet.next()) {
+        if (result.next()) {
             setCurrentRow();
             return true;
         }
-        if (!iterator.hasNext()) {
-            currentRow = null;
-            return false;
-        }
-        currentResultSet = iterator.next();
-        return moveNext0();
+        return false;
     }
     
     private void setCurrentRow() throws SQLException {
-        int columnCount = currentResultSet.getMetaData().getColumnCount();
-        currentRow = new Object[columnCount];
-        for (int i = 0; i < columnCount; i++) {
-            currentRow[i] = currentResultSet.getValue(i + 1, Object.class);
+        currentRow = new Object[metaData.getColumnCount()];
+        for (int i = 0; i < metaData.getColumnCount(); i++) {
+            currentRow[i] = result.getValue(i + 1, Object.class);
         }
     }
     
@@ -86,13 +74,6 @@ public final class FilterableRowEnumerator implements 
Enumerator<Object[]> {
     
     @Override
     public void close() {
-        try {
-            for (QueryResult each : queryResults) {
-                each.close();
-            }
-            currentRow = null;
-        } catch (final SQLException ex) {
-            throw new ShardingSphereException(ex);
-        }
+        currentRow = null;
     }
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/sql/FilterableExecutionContextGenerator.java
 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/sql/FilterableExecutionContextGenerator.java
deleted file mode 100644
index 758e4b2..0000000
--- 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/sql/FilterableExecutionContextGenerator.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.infra.federation.executor.original.sql;
-
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
-import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
-import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
-import 
org.apache.shardingsphere.infra.federation.executor.original.table.FilterableTableScanContext;
-import 
org.apache.shardingsphere.infra.federation.optimizer.metadata.FederationTableMetaData;
-import org.apache.shardingsphere.infra.route.context.RouteContext;
-import org.apache.shardingsphere.infra.route.context.RouteMapper;
-import org.apache.shardingsphere.infra.route.context.RouteUnit;
-import org.apache.shardingsphere.sql.parser.sql.common.constant.QuoteCharacter;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashSet;
-import java.util.stream.Collectors;
-
-/**
- * Filterable execution context generator.
- */
-@RequiredArgsConstructor
-public final class FilterableExecutionContextGenerator {
-    
-    private final ExecutionContext routeExecutionContext;
-    
-    private final QuoteCharacter quoteCharacter;
-    
-    /**
-     * Generate execution context.
-     * 
-     * @param tableMetaData table meta data
-     * @param scanContext filterable table scan context
-     * @return generated execution context
-     */
-    public ExecutionContext generate(final FederationTableMetaData 
tableMetaData, final FilterableTableScanContext scanContext) {
-        RouteContext filteredRouteContext = new 
RouteContextFilter().filter(tableMetaData.getName(), 
routeExecutionContext.getRouteContext());
-        return new ExecutionContext(routeExecutionContext.getLogicSQL(), 
generate(filteredRouteContext.getRouteUnits(), tableMetaData, scanContext, 
quoteCharacter), filteredRouteContext);
-    }
-    
-    private Collection<ExecutionUnit> generate(final Collection<RouteUnit> 
routeUnits,
-                                               final FederationTableMetaData 
tableMetaData, final FilterableTableScanContext scanContext, final 
QuoteCharacter quoteCharacter) {
-        Collection<ExecutionUnit> result = new LinkedHashSet<>();
-        FilterableSQLGenerator sqlGenerator = new 
FilterableSQLGenerator(tableMetaData, scanContext, quoteCharacter);
-        for (RouteUnit each: routeUnits) {
-            result.addAll(generate(each, sqlGenerator));
-        }
-        return result;
-    }
-    
-    private Collection<ExecutionUnit> generate(final RouteUnit routeUnit, 
final FilterableSQLGenerator sqlGenerator) {
-        return routeUnit.getTableMappers().stream().map(each -> 
generate(routeUnit, each, sqlGenerator)).collect(Collectors.toList());
-    }
-    
-    private ExecutionUnit generate(final RouteUnit routeUnit, final 
RouteMapper tableMapper, final FilterableSQLGenerator sqlGenerator) {
-        String sql = sqlGenerator.generate(tableMapper.getActualName());
-        return new 
ExecutionUnit(routeUnit.getDataSourceMapper().getActualName(), new SQLUnit(sql, 
Collections.emptyList(), Collections.singletonList(tableMapper)));
-    }
-}
diff --git 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/sql/FilterableSQLGenerator.java
 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/sql/FilterableSQLGenerator.java
deleted file mode 100644
index 0a4a00e..0000000
--- 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/sql/FilterableSQLGenerator.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.infra.federation.executor.original.sql;
-
-import lombok.RequiredArgsConstructor;
-import 
org.apache.shardingsphere.infra.federation.executor.original.table.FilterableTableScanContext;
-import 
org.apache.shardingsphere.infra.federation.optimizer.metadata.FederationTableMetaData;
-import org.apache.shardingsphere.sql.parser.sql.common.constant.QuoteCharacter;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.stream.Collectors;
-
-/**
- * Filterable SQL generator.
- */
-@RequiredArgsConstructor
-public final class FilterableSQLGenerator {
-    
-    private final FederationTableMetaData tableMetaData;
-    
-    private final FilterableTableScanContext scanContext;
-    
-    private final QuoteCharacter quoteCharacter;
-    
-    /**
-     * Generate federation SQL.
-     * 
-     * @param actualTableName actual table name
-     * @return generated federation SQL
-     */
-    public String generate(final String actualTableName) {
-        String projections = getQuotedProjections(tableMetaData, scanContext, 
quoteCharacter);
-        String table = getQuotedTable(actualTableName, quoteCharacter);
-        // TODO generate SQL with filters
-        return String.format("SELECT %s FROM %s", projections, table);
-    }
-    
-    private String getQuotedProjections(final FederationTableMetaData 
tableMetaData, final FilterableTableScanContext scanContext, final 
QuoteCharacter quoteCharacter) {
-        Collection<String> actualColumnNames = null == 
scanContext.getProjects()
-                ? tableMetaData.getColumnNames() : 
Arrays.stream(scanContext.getProjects()).mapToObj(tableMetaData.getColumnNames()::get).collect(Collectors.toList());
-        return 
actualColumnNames.stream().map(quoteCharacter::wrap).collect(Collectors.joining(",
 "));
-    }
-    
-    private String getQuotedTable(final String actualTableName, final 
QuoteCharacter quoteCharacter) {
-        return quoteCharacter.wrap(actualTableName);
-    }
-}
diff --git 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/sql/RouteContextFilter.java
 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/sql/RouteContextFilter.java
deleted file mode 100644
index 580e214..0000000
--- 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/sql/RouteContextFilter.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.infra.federation.executor.original.sql;
-
-import org.apache.shardingsphere.infra.route.context.RouteContext;
-import org.apache.shardingsphere.infra.route.context.RouteMapper;
-import org.apache.shardingsphere.infra.route.context.RouteUnit;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.stream.Collectors;
-
-/**
- * Route context filter.
- */
-public final class RouteContextFilter {
-    
-    /**
-     * Filter route context.
-     * 
-     * @param toBeKeptTableName to be kept table name
-     * @param routeContext route context
-     * @return filtered route context
-     */
-    public RouteContext filter(final String toBeKeptTableName, final 
RouteContext routeContext) {
-        RouteContext result = new RouteContext();
-        result.getRouteUnits().addAll(filterRouteUnits(toBeKeptTableName, 
routeContext));
-        return result;
-    }
-    
-    private Collection<RouteUnit> filterRouteUnits(final String 
toBeKeptTableName, final RouteContext routeContext) {
-        return routeContext.getRouteUnits().stream().map(each -> 
filterRouteUnit(toBeKeptTableName, each)).filter(each -> 
!each.getTableMappers().isEmpty()).collect(Collectors.toList());
-    }
-    
-    private RouteUnit filterRouteUnit(final String toBeKeptTableName, final 
RouteUnit routeUnit) {
-        return new RouteUnit(routeUnit.getDataSourceMapper(), 
filterTableMappers(toBeKeptTableName, routeUnit));
-    }
-    
-    private List<RouteMapper> filterTableMappers(final String 
toBeKeptTableName, final RouteUnit routeUnit) {
-        return routeUnit.getTableMappers().stream().filter(each -> 
each.getLogicName().equalsIgnoreCase(toBeKeptTableName)).collect(Collectors.toList());
-    }
-}
diff --git 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/table/FilterableTable.java
 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/table/FilterableTable.java
index 886cafa..a1e165c 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/table/FilterableTable.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/table/FilterableTable.java
@@ -19,21 +19,16 @@ package 
org.apache.shardingsphere.infra.federation.executor.original.table;
 
 import lombok.RequiredArgsConstructor;
 import org.apache.calcite.DataContext;
-import org.apache.calcite.linq4j.AbstractEnumerable;
 import org.apache.calcite.linq4j.Enumerable;
-import org.apache.calcite.linq4j.Enumerator;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.schema.ProjectableFilterableTable;
 import org.apache.calcite.schema.Statistic;
 import org.apache.calcite.schema.impl.AbstractTable;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
 import 
org.apache.shardingsphere.infra.federation.executor.original.FederationTableStatistic;
-import 
org.apache.shardingsphere.infra.federation.executor.original.row.FilterableRowEnumerator;
 import 
org.apache.shardingsphere.infra.federation.optimizer.metadata.FederationTableMetaData;
 
-import java.util.Collection;
 import java.util.List;
 
 /**
@@ -55,14 +50,7 @@ public final class FilterableTable extends AbstractTable 
implements ProjectableF
     
     @Override
     public Enumerable<Object[]> scan(final DataContext root, final 
List<RexNode> filters, final int[] projects) {
-        Collection<QueryResult> queryResults = executor.execute(metaData, new 
FilterableTableScanContext(root, filters, projects));
-        return new AbstractEnumerable<Object[]>() {
-            
-            @Override
-            public Enumerator<Object[]> enumerator() {
-                return new FilterableRowEnumerator(queryResults);
-            }
-        };
+        return executor.execute(metaData, new FilterableTableScanContext(root, 
filters, projects));
     }
     
     @Override
diff --git 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/table/FilterableTableScanExecutor.java
 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/table/FilterableTableScanExecutor.java
index f988190..8c5ffc6 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/table/FilterableTableScanExecutor.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/table/FilterableTableScanExecutor.java
@@ -17,8 +17,40 @@
 
 package org.apache.shardingsphere.infra.federation.executor.original.table;
 
+import com.google.common.collect.ImmutableList;
+import lombok.SneakyThrows;
+import org.apache.calcite.linq4j.AbstractEnumerable;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptSchema;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.rel2sql.RelToSqlConverter;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.dialect.MssqlSqlDialect;
+import org.apache.calcite.sql.dialect.MysqlSqlDialect;
+import org.apache.calcite.sql.dialect.OracleSqlDialect;
+import org.apache.calcite.sql.dialect.PostgresqlSqlDialect;
+import org.apache.calcite.sql.util.SqlString;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.shardingsphere.infra.binder.LogicSQL;
+import org.apache.shardingsphere.infra.binder.SQLStatementContextFactory;
+import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
 import 
org.apache.shardingsphere.infra.config.properties.ConfigurationProperties;
+import org.apache.shardingsphere.infra.context.kernel.KernelProcessor;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
+import org.apache.shardingsphere.infra.database.type.dialect.H2DatabaseType;
+import 
org.apache.shardingsphere.infra.database.type.dialect.MariaDBDatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import 
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
+import 
org.apache.shardingsphere.infra.database.type.dialect.OracleDatabaseType;
+import 
org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
+import 
org.apache.shardingsphere.infra.database.type.dialect.SQLServerDatabaseType;
 import org.apache.shardingsphere.infra.exception.ShardingSphereException;
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
 import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
@@ -26,15 +58,28 @@ import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.J
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResultMetaData;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 import 
org.apache.shardingsphere.infra.executor.sql.process.ExecuteProcessEngine;
-import 
org.apache.shardingsphere.infra.federation.executor.original.sql.FilterableExecutionContextGenerator;
+import 
org.apache.shardingsphere.infra.federation.executor.original.row.FilterableRowEnumerator;
+import 
org.apache.shardingsphere.infra.federation.optimizer.context.OptimizerContext;
 import 
org.apache.shardingsphere.infra.federation.optimizer.metadata.FederationTableMetaData;
-import org.apache.shardingsphere.sql.parser.sql.common.constant.QuoteCharacter;
+import org.apache.shardingsphere.infra.merge.MergeEngine;
+import org.apache.shardingsphere.infra.merge.result.MergedResult;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import org.apache.shardingsphere.infra.parser.sql.SQLStatementParserEngine;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 
 import java.sql.Connection;
+import java.sql.PreparedStatement;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 /**
@@ -42,6 +87,18 @@ import java.util.stream.Collectors;
  */
 public final class FilterableTableScanExecutor {
     
+    private static final Map<Class<? extends DatabaseType>, SqlDialect> 
SQL_DIALECTS = new HashMap<>();
+    
+    static {
+        SQL_DIALECTS.put(H2DatabaseType.class, MysqlSqlDialect.DEFAULT);
+        SQL_DIALECTS.put(MySQLDatabaseType.class, MysqlSqlDialect.DEFAULT);
+        SQL_DIALECTS.put(MariaDBDatabaseType.class, MysqlSqlDialect.DEFAULT);
+        SQL_DIALECTS.put(OracleDatabaseType.class, OracleSqlDialect.DEFAULT);
+        SQL_DIALECTS.put(SQLServerDatabaseType.class, MssqlSqlDialect.DEFAULT);
+        SQL_DIALECTS.put(PostgreSQLDatabaseType.class, 
PostgresqlSqlDialect.DEFAULT);
+        SQL_DIALECTS.put(OpenGaussDatabaseType.class, 
PostgresqlSqlDialect.DEFAULT);
+    }
+    
     private final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
prepareEngine;
     
     private final JDBCExecutor jdbcExecutor;
@@ -50,16 +107,22 @@ public final class FilterableTableScanExecutor {
     
     private final ConfigurationProperties props;
     
-    private final FilterableExecutionContextGenerator 
executionContextGenerator;
+    private final OptimizerContext optimizerContext;
+    
+    private final String schemaName;
     
-    public FilterableTableScanExecutor(final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
-                                       final JDBCExecutor jdbcExecutor, final 
JDBCExecutorCallback<? extends ExecuteResult> callback,
-                                       final ConfigurationProperties props, 
final ExecutionContext routeExecutionContext, final QuoteCharacter 
quoteCharacter) {
+    private final List<Object> parameters;
+    
+    public FilterableTableScanExecutor(final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, 
final JDBCExecutor jdbcExecutor, 
+                                       final JDBCExecutorCallback<? extends 
ExecuteResult> callback, final ConfigurationProperties props, 
+                                       final OptimizerContext 
optimizerContext, final String schemaName, final List<Object> parameters) {
         this.jdbcExecutor = jdbcExecutor;
         this.callback = callback;
         this.prepareEngine = prepareEngine;
         this.props = props;
-        executionContextGenerator = new 
FilterableExecutionContextGenerator(routeExecutionContext, quoteCharacter);
+        this.optimizerContext = optimizerContext;
+        this.schemaName = schemaName;
+        this.parameters = parameters;
     }
     
     /**
@@ -69,18 +132,96 @@ public final class FilterableTableScanExecutor {
      * @param scanContext filterable table scan context
      * @return query results
      */
-    public Collection<QueryResult> execute(final FederationTableMetaData 
tableMetaData, final FilterableTableScanContext scanContext) {
-        ExecutionContext context = 
executionContextGenerator.generate(tableMetaData, scanContext);
+    public Enumerable<Object[]> execute(final FederationTableMetaData 
tableMetaData, final FilterableTableScanContext scanContext) {
+        DatabaseType databaseType = 
DatabaseTypeRegistry.getTrunkDatabaseType(optimizerContext.getParserContexts().get(schemaName).getDatabaseType().getName());
+        SqlString sqlString = createSQLString(tableMetaData, scanContext, 
databaseType);
+        // TODO replace sql parse with sql convert
+        SQLStatement sqlStatement = new 
SQLStatementParserEngine(databaseType.getName(), 
optimizerContext.getSqlParserRule()).parse(sqlString.getSql(), false);
+        LogicSQL logicSQL = createLogicSQL(optimizerContext.getMetaDataMap(), 
sqlString.getSql(), getParameters(sqlString.getDynamicParameters()), 
sqlStatement);
+        ShardingSphereMetaData metaData = 
optimizerContext.getMetaDataMap().get(schemaName);
+        ExecutionContext context = new 
KernelProcessor().generateExecutionContext(logicSQL, metaData, props);
         try {
             ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = 
prepareEngine.prepare(context.getRouteContext(), context.getExecutionUnits());
+            setParameters(executionGroupContext.getInputGroups(), 
logicSQL.getParameters());
             ExecuteProcessEngine.initialize(context.getLogicSQL(), 
executionGroupContext, props);
-            Collection<QueryResult> result = 
jdbcExecutor.execute(executionGroupContext, callback).stream().map(each -> 
(QueryResult) each).collect(Collectors.toList());
+            List<QueryResult> result = 
jdbcExecutor.execute(executionGroupContext, callback).stream().map(each -> 
(QueryResult) each).collect(Collectors.toList());
             
ExecuteProcessEngine.finish(executionGroupContext.getExecutionID());
-            return result;
+            MergeEngine mergeEngine = new MergeEngine(schemaName, 
databaseType, metaData.getSchema(), props, 
metaData.getRuleMetaData().getRules());
+            MergedResult mergedResult = mergeEngine.merge(result, 
logicSQL.getSqlStatementContext());
+            return createEnumerable(mergedResult, result.get(0).getMetaData());
         } catch (final SQLException ex) {
             throw new ShardingSphereException(ex);
         } finally {
             ExecuteProcessEngine.clean();
         }
     }
+    
+    private SqlString createSQLString(final FederationTableMetaData 
tableMetaData, final FilterableTableScanContext scanContext, final DatabaseType 
databaseType) {
+        SqlDialect sqlDialect = 
SQL_DIALECTS.getOrDefault(databaseType.getClass(), MysqlSqlDialect.DEFAULT);
+        return new 
RelToSqlConverter(sqlDialect).visitRoot(createRelNode(tableMetaData, 
scanContext)).asStatement().toSqlString(sqlDialect);
+    }
+    
+    @SneakyThrows
+    private void setParameters(final 
Collection<ExecutionGroup<JDBCExecutionUnit>> inputGroups, final List<Object> 
parameters) {
+        for (ExecutionGroup<JDBCExecutionUnit> each : inputGroups) {
+            for (JDBCExecutionUnit executionUnit : each.getInputs()) {
+                if (!(executionUnit.getStorageResource() instanceof 
PreparedStatement)) {
+                    continue;
+                }
+                setParameters((PreparedStatement) 
executionUnit.getStorageResource(), parameters);
+            }
+        }
+    }
+    
+    @SneakyThrows
+    private void setParameters(final PreparedStatement preparedStatement, 
final List<Object> parameters) {
+        for (int i = 0; i < parameters.size(); i++) {
+            Object parameter = parameters.get(i);
+            preparedStatement.setObject(i + 1, parameter);
+        }
+    }
+    
+    private List<Object> getParameters(final ImmutableList<Integer> 
parameterIndices) {
+        if (null == parameterIndices) {
+            return Collections.emptyList();
+        }
+        List<Object> result = new ArrayList<>();
+        for (Integer each : parameterIndices) {
+            result.add(parameters.get(each));
+        }
+        return result;
+    }
+    
+    private RelNode createRelNode(final FederationTableMetaData tableMetaData, 
final FilterableTableScanContext scanContext) {
+        RelOptCluster relOptCluster = 
optimizerContext.getPlannerContexts().get(schemaName).getConverter().getCluster();
+        RelOptSchema relOptSchema = (RelOptSchema) 
optimizerContext.getPlannerContexts().get(schemaName).getValidator().getCatalogReader();
+        RelBuilder builder = 
RelFactories.LOGICAL_BUILDER.create(relOptCluster, 
relOptSchema).scan(tableMetaData.getName()).filter(scanContext.getFilters());
+        if (null != scanContext.getProjects()) {
+            builder.project(createProjections(scanContext.getProjects(), 
builder, tableMetaData.getColumnNames()));
+        }
+        return builder.build();
+    }
+    
+    private Collection<RexNode> createProjections(final int[] projects, final 
RelBuilder relBuilder, final List<String> columnNames) {
+        Collection<RexNode> result = new LinkedList<>();
+        for (int each : projects) {
+            result.add(relBuilder.field(columnNames.get(each)));
+        }
+        return result;
+    }
+    
+    private AbstractEnumerable<Object[]> createEnumerable(final MergedResult 
mergedResult, final QueryResultMetaData metaData) {
+        return new AbstractEnumerable<Object[]>() {
+            
+            @Override
+            public Enumerator<Object[]> enumerator() {
+                return new FilterableRowEnumerator(mergedResult, metaData);
+            }
+        };
+    }
+    
+    private LogicSQL createLogicSQL(final Map<String, ShardingSphereMetaData> 
metaDataMap, final String sql, final List<Object> parameters, final 
SQLStatement sqlStatement) {
+        SQLStatementContext<?> sqlStatementContext = 
SQLStatementContextFactory.newInstance(metaDataMap, parameters, sqlStatement, 
sql);
+        return new LogicSQL(sqlStatementContext, sql, parameters);
+    }
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/OptimizerContext.java
 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/OptimizerContext.java
index d82263b..d892f42 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/OptimizerContext.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/OptimizerContext.java
@@ -22,6 +22,8 @@ import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.infra.federation.optimizer.context.parser.OptimizerParserContext;
 import 
org.apache.shardingsphere.infra.federation.optimizer.context.planner.OptimizerPlannerContext;
 import 
org.apache.shardingsphere.infra.federation.optimizer.metadata.FederationMetaData;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import org.apache.shardingsphere.parser.rule.SQLParserRule;
 
 import java.util.Map;
 
@@ -32,7 +34,11 @@ import java.util.Map;
 @Getter
 public final class OptimizerContext {
     
-    private final FederationMetaData metaData;
+    private final SQLParserRule sqlParserRule;
+    
+    private final FederationMetaData federationMetaData;
+    
+    private final Map<String, ShardingSphereMetaData> metaDataMap;
     
     private final Map<String, OptimizerParserContext> parserContexts;
     
diff --git 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/OptimizerContextFactory.java
 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/OptimizerContextFactory.java
index 170df49..c8b15a1 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/OptimizerContextFactory.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/OptimizerContextFactory.java
@@ -25,6 +25,8 @@ import 
org.apache.shardingsphere.infra.federation.optimizer.context.planner.Opti
 import 
org.apache.shardingsphere.infra.federation.optimizer.context.planner.OptimizerPlannerContextFactory;
 import 
org.apache.shardingsphere.infra.federation.optimizer.metadata.FederationMetaData;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import 
org.apache.shardingsphere.infra.metadata.rule.ShardingSphereRuleMetaData;
+import org.apache.shardingsphere.parser.rule.SQLParserRule;
 
 import java.util.Map;
 
@@ -38,12 +40,14 @@ public final class OptimizerContextFactory {
      * Create optimize context.
      *
      * @param metaDataMap meta data map
+     * @param globalRuleMetaData global rule meta data
      * @return created optimizer context
      */
-    public static OptimizerContext create(final Map<String, 
ShardingSphereMetaData> metaDataMap) {
+    public static OptimizerContext create(final Map<String, 
ShardingSphereMetaData> metaDataMap, final ShardingSphereRuleMetaData 
globalRuleMetaData) {
         FederationMetaData federationMetaData = new 
FederationMetaData(metaDataMap);
         Map<String, OptimizerParserContext> parserContexts = 
OptimizerParserContextFactory.create(metaDataMap);
         Map<String, OptimizerPlannerContext> plannerContexts = 
OptimizerPlannerContextFactory.create(federationMetaData);
-        return new OptimizerContext(federationMetaData, parserContexts, 
plannerContexts);
+        SQLParserRule sqlParserRule = 
globalRuleMetaData.findSingleRule(SQLParserRule.class).orElse(null);
+        return new OptimizerContext(sqlParserRule, federationMetaData, 
metaDataMap, parserContexts, plannerContexts);
     }
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/planner/OptimizerPlannerContextFactory.java
 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/planner/OptimizerPlannerContextFactory.java
index 072e2b4..fda7b2a 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/planner/OptimizerPlannerContextFactory.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/planner/OptimizerPlannerContextFactory.java
@@ -68,7 +68,8 @@ public final class OptimizerPlannerContextFactory {
             RelDataTypeFactory relDataTypeFactory = new JavaTypeFactoryImpl();
             CalciteCatalogReader catalogReader = 
createCatalogReader(schemaName, federationSchema, relDataTypeFactory, 
connectionConfig);
             SqlValidator validator = createValidator(catalogReader, 
relDataTypeFactory, connectionConfig);
-            result.put(schemaName, new OptimizerPlannerContext(validator, 
createConverter(catalogReader, validator, relDataTypeFactory)));
+            SqlToRelConverter converter = createConverter(catalogReader, 
validator, relDataTypeFactory);
+            result.put(schemaName, new OptimizerPlannerContext(validator, 
converter));
         }
         return result;
     }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/test/java/org/apache/shardingsphere/infra/federation/optimizer/ShardingSphereOptimizerTest.java
 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/test/java/org/apache/shardingsphere/infra/federation/optimizer/ShardingSphereOptimizerTest.java
index d7fd09d..ba21e39 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/test/java/org/apache/shardingsphere/infra/federation/optimizer/ShardingSphereOptimizerTest.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/test/java/org/apache/shardingsphere/infra/federation/optimizer/ShardingSphereOptimizerTest.java
@@ -22,20 +22,26 @@ import 
org.apache.shardingsphere.infra.database.type.dialect.H2DatabaseType;
 import 
org.apache.shardingsphere.infra.federation.optimizer.context.OptimizerContextFactory;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import 
org.apache.shardingsphere.infra.metadata.resource.ShardingSphereResource;
+import 
org.apache.shardingsphere.infra.metadata.rule.ShardingSphereRuleMetaData;
 import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
 import org.apache.shardingsphere.infra.metadata.schema.model.ColumnMetaData;
 import org.apache.shardingsphere.infra.metadata.schema.model.TableMetaData;
 import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
+import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
+import org.apache.shardingsphere.parser.config.SQLParserRuleConfiguration;
 import org.apache.shardingsphere.parser.rule.SQLParserRule;
 import 
org.apache.shardingsphere.parser.rule.builder.DefaultSQLParserRuleConfigurationBuilder;
+import org.apache.shardingsphere.sql.parser.api.CacheOption;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.sql.Types;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.Map;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -100,7 +106,14 @@ public final class ShardingSphereOptimizerTest {
         tableMetaDataMap.put("t_order_federate", createOrderTableMetaData());
         tableMetaDataMap.put("t_user_info", createUserInfoTableMetaData());
         ShardingSphereMetaData metaData = new 
ShardingSphereMetaData(schemaName, mockResource(), null, new 
ShardingSphereSchema(tableMetaDataMap));
-        optimizer = new 
ShardingSphereOptimizer(OptimizerContextFactory.create(Collections.singletonMap(schemaName,
 metaData)));
+        optimizer = new 
ShardingSphereOptimizer(OptimizerContextFactory.create(Collections.singletonMap(schemaName,
 metaData), createGlobalRuleMetaData()));
+    }
+    
+    private ShardingSphereRuleMetaData createGlobalRuleMetaData() {
+        Collection<ShardingSphereRule> rules = new LinkedList<>();
+        CacheOption cacheOption = new CacheOption(128, 1024L, 4);
+        rules.add(new SQLParserRule(new SQLParserRuleConfiguration(false, 
cacheOption, cacheOption)));
+        return new ShardingSphereRuleMetaData(Collections.emptyList(), rules);
     }
     
     private ShardingSphereResource mockResource() {
diff --git 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
index f2c25ec..b04833b 100644
--- 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
+++ 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
@@ -58,7 +58,7 @@ public final class DriverJDBCExecutor {
         this.metaDataContexts = metaDataContexts;
         this.jdbcExecutor = jdbcExecutor;
         metadataRefreshEngine = new 
MetaDataRefreshEngine(metaDataContexts.getMetaData(schemaName),
-                
metaDataContexts.getOptimizerContext().getMetaData().getSchemas().get(schemaName),
 metaDataContexts.getProps());
+                
metaDataContexts.getOptimizerContext().getFederationMetaData().getSchemas().get(schemaName),
 metaDataContexts.getProps());
     }
     
     /**
diff --git 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index bf36cb7..6f7d295 100644
--- 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++ 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -82,7 +82,6 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Optional;
@@ -180,7 +179,12 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
                 return statements.iterator().next().executeQuery();
             }
             clearPrevious();
-            executionContext = createExecutionContext();
+            LogicSQL logicSQL = createLogicSQL();
+            // TODO move federation route logic to binder
+            executionContext = createExecutionContext(logicSQL);
+            if (executionContext.getRouteContext().isFederated()) {
+                return executeFederationQuery(logicSQL);
+            }
             List<QueryResult> queryResults = executeQuery0();
             MergedResult mergedResult = mergeQuery(queryResults);
             return new ShardingSphereResultSet(getShardingSphereResultSet(), 
mergedResult, this, executionContext);
@@ -195,10 +199,7 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
         replaySetParameter();
     }
     
-    private List<ResultSet> getShardingSphereResultSet() throws SQLException {
-        if (executionContext.getRouteContext().isFederated()) {
-            return 
Collections.singletonList(executor.getFederationExecutor().getResultSet());
-        }
+    private List<ResultSet> getShardingSphereResultSet() {
         List<ResultSet> result = new ArrayList<>(statements.size());
         for (PreparedStatement each : statements) {
             ResultSet resultSet = getResultSet(each);
@@ -212,9 +213,6 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
             return 
executor.getRawExecutor().execute(createRawExecutionGroupContext(), 
executionContext.getLogicSQL(),
                     new RawSQLExecutorCallback()).stream().map(each -> 
(QueryResult) each).collect(Collectors.toList());
         }
-        if (executionContext.getRouteContext().isFederated()) {
-            return executeFederatedQuery();
-        }
         ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = 
createExecutionGroupContext();
         cacheStatements(executionGroupContext.getInputGroups());
         return 
executor.getRegularExecutor().executeQuery(executionGroupContext, 
executionContext.getLogicSQL(),
@@ -222,13 +220,10 @@ public final class ShardingSpherePreparedStatement 
extends AbstractPreparedState
                         SQLExecutorExceptionHandler.isExceptionThrown()));
     }
     
-    private List<QueryResult> executeFederatedQuery() throws SQLException {
-        if (executionContext.getExecutionUnits().isEmpty()) {
-            return Collections.emptyList();
-        }
+    private ResultSet executeFederationQuery(final LogicSQL logicSQL) throws 
SQLException {
         PreparedStatementExecuteQueryCallback callback = new 
PreparedStatementExecuteQueryCallback(metaDataContexts.getMetaData(connection.getSchema()).getResource().getDatabaseType(),
                  sqlStatement, 
SQLExecutorExceptionHandler.isExceptionThrown());
-        return 
executor.getFederationExecutor().executeQuery(createDriverExecutionPrepareEngine(),
 callback, executionContext);
+        return 
executor.getFederationExecutor().executeQuery(createDriverExecutionPrepareEngine(),
 callback, logicSQL);
     }
     
     private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
createDriverExecutionPrepareEngine() {
@@ -245,7 +240,7 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
                 return statements.iterator().next().executeUpdate();
             }
             clearPrevious();
-            executionContext = createExecutionContext();
+            executionContext = createExecutionContext(createLogicSQL());
             if 
(metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules().stream().anyMatch(each
 -> each instanceof RawExecutionRule)) {
                 Collection<ExecuteResult> executeResults = 
executor.getRawExecutor().execute(createRawExecutionGroupContext(), 
executionContext.getLogicSQL(), new RawSQLExecutorCallback());
                 return accumulate(executeResults);
@@ -291,15 +286,16 @@ public final class ShardingSpherePreparedStatement 
extends AbstractPreparedState
                 return statements.iterator().next().execute();
             }
             clearPrevious();
-            executionContext = createExecutionContext();
+            LogicSQL logicSQL = createLogicSQL();
+            executionContext = createExecutionContext(logicSQL);
             if 
(metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules().stream().anyMatch(each
 -> each instanceof RawExecutionRule)) {
                 // TODO process getStatement
                 Collection<ExecuteResult> executeResults = 
executor.getRawExecutor().execute(createRawExecutionGroupContext(), 
executionContext.getLogicSQL(), new RawSQLExecutorCallback());
                 return executeResults.iterator().next() instanceof QueryResult;
             }
             if (executionContext.getRouteContext().isFederated()) {
-                List<QueryResult> queryResults = executeFederatedQuery();
-                return !queryResults.isEmpty();
+                ResultSet resultSet = executeFederationQuery(logicSQL);
+                return null != resultSet;
             }
             ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = 
createExecutionGroupContext();
             cacheStatements(executionGroupContext.getInputGroups());
@@ -342,6 +338,9 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
         if (null != currentResultSet) {
             return currentResultSet;
         }
+        if (executionContext.getRouteContext().isFederated()) {
+            return executor.getFederationExecutor().getResultSet();
+        }
         if (executionContext.getSqlStatementContext() instanceof 
SelectStatementContext || 
executionContext.getSqlStatementContext().getSqlStatement() instanceof 
DALStatement) {
             List<ResultSet> resultSets = getResultSets();
             MergedResult mergedResult = 
mergeQuery(getQueryResults(resultSets));
@@ -363,9 +362,6 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
         for (Statement each : statements) {
             result.add(each.getResultSet());
         }
-        if (executionContext.getRouteContext().isFederated()) {
-            result.add(executor.getFederationExecutor().getResultSet());
-        }
         return result;
     }
     
@@ -379,8 +375,7 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
         return result;
     }
     
-    private ExecutionContext createExecutionContext() {
-        LogicSQL logicSQL = createLogicSQL();
+    private ExecutionContext createExecutionContext(final LogicSQL logicSQL) {
         
SQLCheckEngine.check(logicSQL.getSqlStatementContext().getSqlStatement(), 
logicSQL.getParameters(), 
                 
metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules(),
 connection.getSchema(), metaDataContexts.getMetaDataMap(), null);
         ExecutionContext result = 
kernelProcessor.generateExecutionContext(logicSQL, 
metaDataContexts.getMetaData(connection.getSchema()), 
metaDataContexts.getProps());
@@ -452,7 +447,7 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
     @Override
     public void addBatch() {
         try {
-            executionContext = createExecutionContext();
+            executionContext = createExecutionContext(createLogicSQL());
             
batchPreparedStatementExecutor.addBatchForExecutionUnits(executionContext.getExecutionUnits());
         } finally {
             currentResultSet = null;
diff --git 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index e92ef61..0872bcf 100644
--- 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++ 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -133,7 +133,12 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         }
         ResultSet result;
         try {
-            executionContext = createExecutionContext(sql);
+            LogicSQL logicSQL = createLogicSQL(sql);
+            executionContext = createExecutionContext(logicSQL);
+            // TODO move federation route logic to binder
+            if (executionContext.getRouteContext().isFederated()) {
+                return executeFederationQuery(logicSQL);
+            }
             List<QueryResult> queryResults = executeQuery0();
             MergedResult mergedResult = mergeQuery(queryResults);
             result = new 
ShardingSphereResultSet(getShardingSphereResultSets(), mergedResult, this, 
executionContext);
@@ -144,9 +149,8 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         return result;
     }
     
-    private List<ResultSet> getShardingSphereResultSets() throws SQLException {
-        return executionContext.getRouteContext().isFederated()
-                ? 
Collections.singletonList(executor.getFederationExecutor().getResultSet()) : 
statements.stream().map(this::getResultSet).collect(Collectors.toList());
+    private List<ResultSet> getShardingSphereResultSets() {
+        return 
statements.stream().map(this::getResultSet).collect(Collectors.toList());
     }
     
     private List<QueryResult> executeQuery0() throws SQLException {
@@ -154,9 +158,6 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
             return 
executor.getRawExecutor().execute(createRawExecutionContext(), 
executionContext.getLogicSQL(),
                     new RawSQLExecutorCallback()).stream().map(each -> 
(QueryResult) each).collect(Collectors.toList());
         }
-        if (executionContext.getRouteContext().isFederated()) {
-            return executeFederationQuery();
-        }
         ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = 
createExecutionContext();
         cacheStatements(executionGroupContext.getInputGroups());
         StatementExecuteQueryCallback callback = new 
StatementExecuteQueryCallback(metaDataContexts.getMetaData(connection.getSchema()).getResource().getDatabaseType(),
@@ -164,13 +165,10 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         return 
executor.getRegularExecutor().executeQuery(executionGroupContext, 
executionContext.getLogicSQL(), callback);
     }
     
-    private List<QueryResult> executeFederationQuery() throws SQLException {
-        if (executionContext.getExecutionUnits().isEmpty()) {
-            return Collections.emptyList();
-        }
+    private ResultSet executeFederationQuery(final LogicSQL logicSQL) throws 
SQLException {
         StatementExecuteQueryCallback callback = new 
StatementExecuteQueryCallback(metaDataContexts.getMetaData(connection.getSchema()).getResource().getDatabaseType(),
                 executionContext.getSqlStatementContext().getSqlStatement(), 
SQLExecutorExceptionHandler.isExceptionThrown());
-        return 
executor.getFederationExecutor().executeQuery(createDriverExecutionPrepareEngine(),
 callback, executionContext);
+        return 
executor.getFederationExecutor().executeQuery(createDriverExecutionPrepareEngine(),
 callback, logicSQL);
     }
     
     private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
createDriverExecutionPrepareEngine() {
@@ -182,7 +180,7 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
     @Override
     public int executeUpdate(final String sql) throws SQLException {
         try {
-            executionContext = createExecutionContext(sql);
+            executionContext = createExecutionContext(createLogicSQL(sql));
             if 
(metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules().stream().anyMatch(each
 -> each instanceof RawExecutionRule)) {
                 return 
accumulate(executor.getRawExecutor().execute(createRawExecutionContext(), 
executionContext.getLogicSQL(), new RawSQLExecutorCallback()));
             }
@@ -201,7 +199,7 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
             returnGeneratedKeys = true;
         }
         try {
-            executionContext = createExecutionContext(sql);
+            executionContext = createExecutionContext(createLogicSQL(sql));
             if 
(metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules().stream().anyMatch(each
 -> each instanceof RawExecutionRule)) {
                 return 
accumulate(executor.getRawExecutor().execute(createRawExecutionContext(), 
executionContext.getLogicSQL(), new RawSQLExecutorCallback()));
             }
@@ -218,7 +216,7 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
     public int executeUpdate(final String sql, final int[] columnIndexes) 
throws SQLException {
         returnGeneratedKeys = true;
         try {
-            executionContext = createExecutionContext(sql);
+            executionContext = createExecutionContext(createLogicSQL(sql));
             if 
(metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules().stream().anyMatch(each
 -> each instanceof RawExecutionRule)) {
                 return 
accumulate(executor.getRawExecutor().execute(createRawExecutionContext(), 
executionContext.getLogicSQL(), new RawSQLExecutorCallback()));
             }
@@ -235,7 +233,7 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
     public int executeUpdate(final String sql, final String[] columnNames) 
throws SQLException {
         returnGeneratedKeys = true;
         try {
-            executionContext = createExecutionContext(sql);
+            executionContext = createExecutionContext(createLogicSQL(sql));
             if 
(metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules().stream().anyMatch(each
 -> each instanceof RawExecutionRule)) {
                 return 
accumulate(executor.getRawExecutor().execute(createRawExecutionContext(), 
executionContext.getLogicSQL(), new RawSQLExecutorCallback()));
             }
@@ -321,15 +319,17 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
     
     private boolean execute0(final String sql, final ExecuteCallback callback) 
throws SQLException {
         try {
-            executionContext = createExecutionContext(sql);
+            LogicSQL logicSQL = createLogicSQL(sql);
+            executionContext = createExecutionContext(logicSQL);
             if 
(metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules().stream().anyMatch(each
 -> each instanceof RawExecutionRule)) {
                 // TODO process getStatement
                 Collection<ExecuteResult> results = 
executor.getRawExecutor().execute(createRawExecutionContext(), 
executionContext.getLogicSQL(), new RawSQLExecutorCallback());
                 return results.iterator().next() instanceof QueryResult;
             }
+            // TODO move federation route logic to binder
             if (executionContext.getRouteContext().isFederated()) {
-                List<QueryResult> queryResults = executeFederationQuery();
-                return !queryResults.isEmpty();
+                ResultSet resultSet = executeFederationQuery(logicSQL);
+                return null != resultSet;
             }
             ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = 
createExecutionContext();
             cacheStatements(executionGroupContext.getInputGroups());
@@ -360,9 +360,8 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         return optionalSQLParserRule.orElse(null);
     }
     
-    private ExecutionContext createExecutionContext(final String sql) throws 
SQLException {
+    private ExecutionContext createExecutionContext(final LogicSQL logicSQL) 
throws SQLException {
         clearStatements();
-        LogicSQL logicSQL = createLogicSQL(sql);
         
SQLCheckEngine.check(logicSQL.getSqlStatementContext().getSqlStatement(), 
logicSQL.getParameters(), 
                 
metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules(),
 connection.getSchema(), metaDataContexts.getMetaDataMap(), null);
         return kernelProcessor.generateExecutionContext(logicSQL, 
metaDataContexts.getMetaData(connection.getSchema()), 
metaDataContexts.getProps());
@@ -397,6 +396,9 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         if (null != currentResultSet) {
             return currentResultSet;
         }
+        if (executionContext.getRouteContext().isFederated()) {
+            return executor.getFederationExecutor().getResultSet();
+        }
         if (executionContext.getSqlStatementContext() instanceof 
SelectStatementContext || 
executionContext.getSqlStatementContext().getSqlStatement() instanceof 
DALStatement) {
             List<ResultSet> resultSets = getResultSets();
             MergedResult mergedResult = 
mergeQuery(getQueryResults(resultSets));
@@ -418,9 +420,6 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         for (Statement each : statements) {
             result.add(each.getResultSet());
         }
-        if (executionContext.getRouteContext().isFederated()) {
-            result.add(executor.getFederationExecutor().getResultSet());
-        }
         return result;
     }
     
diff --git 
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
 
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
index 0553def..7869045 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
@@ -127,8 +127,8 @@ public final class ContextManager implements AutoCloseable {
             return;
         }
         MetaDataContexts newMetaDataContexts = 
buildNewMetaDataContext(schemaName);
-        
metaDataContexts.getOptimizerContext().getMetaData().getSchemas().put(schemaName,
-                
newMetaDataContexts.getOptimizerContext().getMetaData().getSchemas().get(schemaName));
+        
metaDataContexts.getOptimizerContext().getFederationMetaData().getSchemas().put(schemaName,
+                
newMetaDataContexts.getOptimizerContext().getFederationMetaData().getSchemas().get(schemaName));
         metaDataContexts.getMetaDataMap().put(schemaName, 
newMetaDataContexts.getMetaData(schemaName));
     }
     
@@ -139,7 +139,7 @@ public final class ContextManager implements AutoCloseable {
      */
     public void deleteSchema(final String schemaName) {
         if (metaDataContexts.getMetaDataMap().containsKey(schemaName)) {
-            
metaDataContexts.getOptimizerContext().getMetaData().getSchemas().remove(schemaName);
+            
metaDataContexts.getOptimizerContext().getFederationMetaData().getSchemas().remove(schemaName);
             
metaDataContexts.getOptimizerContext().getParserContexts().remove(schemaName);
             
metaDataContexts.getOptimizerContext().getPlannerContexts().remove(schemaName);
             ShardingSphereMetaData removeMetaData = 
metaDataContexts.getMetaDataMap().remove(schemaName);
@@ -189,7 +189,7 @@ public final class ContextManager implements AutoCloseable {
     public void alterRuleConfiguration(final String schemaName, final 
Collection<RuleConfiguration> ruleConfigs) {
         try {
             MetaDataContexts changedMetaDataContexts = 
buildChangedMetaDataContext(metaDataContexts.getMetaDataMap().get(schemaName), 
ruleConfigs);
-            
metaDataContexts.getOptimizerContext().getMetaData().getSchemas().putAll(changedMetaDataContexts.getOptimizerContext().getMetaData().getSchemas());
+            
metaDataContexts.getOptimizerContext().getFederationMetaData().getSchemas().putAll(changedMetaDataContexts.getOptimizerContext().getFederationMetaData().getSchemas());
             Map<String, ShardingSphereMetaData> metaDataMap = new 
HashMap<>(metaDataContexts.getMetaDataMap());
             metaDataMap.putAll(changedMetaDataContexts.getMetaDataMap());
             renewMetaDataContexts(rebuildMetaDataContexts(metaDataMap));
@@ -207,7 +207,7 @@ public final class ContextManager implements AutoCloseable {
     public void alterDataSourceConfiguration(final String schemaName, final 
Map<String, DataSourceConfiguration> dataSourceConfigurations) {
         try {
             MetaDataContexts changedMetaDataContext = 
buildChangedMetaDataContextWithChangedDataSource(metaDataContexts.getMetaDataMap().get(schemaName),
 dataSourceConfigurations);
-            
metaDataContexts.getOptimizerContext().getMetaData().getSchemas().putAll(changedMetaDataContext.getOptimizerContext().getMetaData().getSchemas());
+            
metaDataContexts.getOptimizerContext().getFederationMetaData().getSchemas().putAll(changedMetaDataContext.getOptimizerContext().getFederationMetaData().getSchemas());
             Map<String, ShardingSphereMetaData> metaDataMap = new 
HashMap<>(metaDataContexts.getMetaDataMap());
             metaDataMap.putAll(changedMetaDataContext.getMetaDataMap());
             Collection<DataSource> pendingClosedDataSources = 
getPendingClosedDataSources(schemaName, dataSourceConfigurations);
@@ -232,8 +232,8 @@ public final class ContextManager implements AutoCloseable {
                 
metaDataContexts.getMetaData(schemaName).getRuleMetaData().getRules()));
         Map<String, ShardingSphereMetaData> kernelMetaDataMap = new 
HashMap<>(metaDataContexts.getMetaDataMap());
         kernelMetaDataMap.put(schemaName, kernelMetaData);
-        
metaDataContexts.getOptimizerContext().getMetaData().getSchemas().put(schemaName,
-                new FederationSchemaMetaData(schemaName, 
SchemaBuilder.buildFederationSchema(tableMetaDataList,
+        
metaDataContexts.getOptimizerContext().getFederationMetaData().getSchemas().put(schemaName,
+                new FederationSchemaMetaData(schemaName, 
SchemaBuilder.buildKernelSchema(tableMetaDataList,
                         
metaDataContexts.getMetaData(schemaName).getRuleMetaData().getRules()).getTables()));
         renewMetaDataContexts(rebuildMetaDataContexts(kernelMetaDataMap));
     }
@@ -321,7 +321,7 @@ public final class ContextManager implements AutoCloseable {
         Collection<MutableDataNodeRule> rules = 
metaDataContexts.getMetaData(schemaName).getRuleMetaData().findRules(MutableDataNodeRule.class);
         for (String table : tables) {
             metaDataContexts.getMetaData(schemaName).getSchema().remove(table);
-            
metaDataContexts.getOptimizerContext().getMetaData().getSchemas().get(schemaName).remove(table);
+            
metaDataContexts.getOptimizerContext().getFederationMetaData().getSchemas().get(schemaName).remove(table);
             rules.forEach(rule -> rule.remove(table));
         }
         metaDataContexts.getMetaDataPersistService().ifPresent(optional -> 
optional.getSchemaMetaDataService().persist(schemaName, 
metaDataContexts.getMetaData(schemaName).getSchema()));
@@ -395,7 +395,7 @@ public final class ContextManager implements AutoCloseable {
     private void refreshMetaDataContext(final String schemaName, final 
Map<String, DataSourceConfiguration> dataSourceConfigs) throws SQLException {
         MetaDataContexts changedMetaDataContext = 
buildChangedMetaDataContextWithAddedDataSource(metaDataContexts.getMetaDataMap().get(schemaName),
 dataSourceConfigs);
         
metaDataContexts.getMetaDataMap().putAll(changedMetaDataContext.getMetaDataMap());
-        
metaDataContexts.getOptimizerContext().getMetaData().getSchemas().putAll(changedMetaDataContext.getOptimizerContext().getMetaData().getSchemas());
+        
metaDataContexts.getOptimizerContext().getFederationMetaData().getSchemas().putAll(changedMetaDataContext.getOptimizerContext().getFederationMetaData().getSchemas());
         
metaDataContexts.getOptimizerContext().getParserContexts().putAll(changedMetaDataContext.getOptimizerContext().getParserContexts());
         
metaDataContexts.getOptimizerContext().getPlannerContexts().putAll(changedMetaDataContext.getOptimizerContext().getPlannerContexts());
         renewTransactionContext(schemaName, 
metaDataContexts.getMetaData(schemaName).getResource());
diff --git 
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContexts.java
 
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContexts.java
index 6e2fc1f..6bf3d0d 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContexts.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContexts.java
@@ -20,11 +20,11 @@ package org.apache.shardingsphere.mode.metadata;
 import lombok.Getter;
 import 
org.apache.shardingsphere.infra.config.properties.ConfigurationProperties;
 import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
+import 
org.apache.shardingsphere.infra.federation.optimizer.context.OptimizerContext;
+import 
org.apache.shardingsphere.infra.federation.optimizer.context.OptimizerContextFactory;
 import org.apache.shardingsphere.infra.lock.ShardingSphereLock;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import 
org.apache.shardingsphere.infra.metadata.rule.ShardingSphereRuleMetaData;
-import 
org.apache.shardingsphere.infra.federation.optimizer.context.OptimizerContext;
-import 
org.apache.shardingsphere.infra.federation.optimizer.context.OptimizerContextFactory;
 import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
 
 import java.util.Collection;
@@ -54,8 +54,8 @@ public final class MetaDataContexts implements AutoCloseable {
     private final ConfigurationProperties props;
     
     public MetaDataContexts(final MetaDataPersistService 
metaDataPersistService) {
-        this(metaDataPersistService, new LinkedHashMap<>(), new 
ShardingSphereRuleMetaData(Collections.emptyList(), Collections.emptyList()),
-                null, new ConfigurationProperties(new Properties()), 
OptimizerContextFactory.create(new HashMap<>()));
+        this(metaDataPersistService, new LinkedHashMap<>(), new 
ShardingSphereRuleMetaData(Collections.emptyList(), Collections.emptyList()), 
null, 
+                new ConfigurationProperties(new Properties()), 
OptimizerContextFactory.create(new HashMap<>(), new 
ShardingSphereRuleMetaData(Collections.emptyList(), Collections.emptyList())));
     }
     
     public MetaDataContexts(final MetaDataPersistService 
metaDataPersistService, final Map<String, ShardingSphereMetaData> metaDataMap, 
final ShardingSphereRuleMetaData globalRuleMetaData,
diff --git 
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContextsBuilder.java
 
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContextsBuilder.java
index f1e7afd..b07ab05 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContextsBuilder.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContextsBuilder.java
@@ -24,6 +24,7 @@ import 
org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKe
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeRecognizer;
 import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
+import 
org.apache.shardingsphere.infra.federation.optimizer.context.OptimizerContextFactory;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import 
org.apache.shardingsphere.infra.metadata.resource.CachedDatabaseMetaData;
 import org.apache.shardingsphere.infra.metadata.resource.DataSourcesMetaData;
@@ -32,7 +33,6 @@ import 
org.apache.shardingsphere.infra.metadata.rule.ShardingSphereRuleMetaData;
 import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
 import org.apache.shardingsphere.infra.metadata.schema.builder.SchemaBuilder;
 import org.apache.shardingsphere.infra.metadata.schema.model.TableMetaData;
-import 
org.apache.shardingsphere.infra.federation.optimizer.context.OptimizerContextFactory;
 import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
 import org.apache.shardingsphere.infra.rule.builder.global.GlobalRulesBuilder;
 import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
@@ -88,8 +88,7 @@ public final class MetaDataContextsBuilder {
      * @return meta data contexts
      */
     public MetaDataContexts build(final MetaDataPersistService 
metaDataPersistService) throws SQLException {
-        Map<String, ShardingSphereMetaData> kernelMetaData = new 
HashMap<>(schemaRuleConfigs.size(), 1);
-        Map<String, ShardingSphereMetaData> federationMetaData = new 
HashMap<>(schemaRuleConfigs.size(), 1);
+        Map<String, ShardingSphereMetaData> metaData = new 
HashMap<>(schemaRuleConfigs.size(), 1);
         for (String each : schemaRuleConfigs.keySet()) {
             Map<String, DataSource> dataSourceMap = dataSources.get(each);
             Collection<RuleConfiguration> ruleConfigs = 
schemaRuleConfigs.get(each);
@@ -97,11 +96,10 @@ public final class MetaDataContextsBuilder {
             ShardingSphereRuleMetaData ruleMetaData = new 
ShardingSphereRuleMetaData(ruleConfigs, rules.get(each));
             ShardingSphereResource resource = buildResource(databaseType, 
dataSourceMap);
             Collection<TableMetaData> tableMetaDataList = 
schemas.get(each).getTables().values();
-            federationMetaData.put(each, new ShardingSphereMetaData(each, 
resource, ruleMetaData, SchemaBuilder.buildFederationSchema(tableMetaDataList, 
rules.get(each))));
-            kernelMetaData.put(each, new ShardingSphereMetaData(each, 
resource, ruleMetaData, SchemaBuilder.buildKernelSchema(tableMetaDataList, 
rules.get(each))));
+            metaData.put(each, new ShardingSphereMetaData(each, resource, 
ruleMetaData, SchemaBuilder.buildKernelSchema(tableMetaDataList, 
rules.get(each))));
         }
-        return new MetaDataContexts(metaDataPersistService, kernelMetaData, 
-                buildGlobalSchemaMetaData(kernelMetaData), executorEngine, 
props, OptimizerContextFactory.create(federationMetaData));
+        ShardingSphereRuleMetaData globalMetaData = 
buildGlobalSchemaMetaData(metaData);
+        return new MetaDataContexts(metaDataPersistService, metaData, 
globalMetaData, executorEngine, props, OptimizerContextFactory.create(metaData, 
globalMetaData));
     }
     
     private ShardingSphereRuleMetaData buildGlobalSchemaMetaData(final 
Map<String, ShardingSphereMetaData> mataDataMap) {
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
index 9139964..e3b4948 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
@@ -21,19 +21,34 @@ import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.infra.binder.LogicSQL;
 import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
 import 
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
+import 
org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
 import org.apache.shardingsphere.infra.context.kernel.KernelProcessor;
 import org.apache.shardingsphere.infra.context.refresher.MetaDataRefreshEngine;
 import org.apache.shardingsphere.infra.database.DefaultSchema;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.metadata.JDBCQueryResultMetaData;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
+import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
+import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
+import org.apache.shardingsphere.infra.federation.executor.FederationExecutor;
+import 
org.apache.shardingsphere.infra.federation.executor.FederationExecutorFactory;
 import org.apache.shardingsphere.infra.merge.MergeEngine;
 import org.apache.shardingsphere.infra.merge.result.MergedResult;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import 
org.apache.shardingsphere.infra.rule.identifier.type.DataNodeContainedRule;
+import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
+import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.ProxyJDBCExecutorCallback;
+import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.ProxyJDBCExecutorCallbackFactory;
+import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseCell;
 import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseRow;
@@ -44,13 +59,17 @@ import 
org.apache.shardingsphere.proxy.backend.response.header.query.QueryRespon
 import 
org.apache.shardingsphere.proxy.backend.response.header.query.impl.QueryHeader;
 import 
org.apache.shardingsphere.proxy.backend.response.header.query.impl.QueryHeaderBuilder;
 import 
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
+import 
org.apache.shardingsphere.sharding.merge.dql.iterator.IteratorStreamMergedResult;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
+import 
org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLInsertStatement;
 
+import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Optional;
@@ -83,15 +102,24 @@ public final class DatabaseCommunicationEngine {
     
     private final Collection<ResultSet> cachedResultSets = new 
CopyOnWriteArrayList<>();
     
+    private final FederationExecutor federationExecutor;
+    
+    private final JDBCBackendConnection backendConnection;
+    
     public DatabaseCommunicationEngine(final String driverType, final 
ShardingSphereMetaData metaData, final LogicSQL logicSQL, final 
JDBCBackendConnection backendConnection) {
         this.driverType = driverType;
         this.metaData = metaData;
         this.logicSQL = logicSQL;
+        this.backendConnection = backendConnection;
         proxySQLExecutor = new ProxySQLExecutor(driverType, backendConnection, 
this);
         kernelProcessor = new KernelProcessor();
+        String schemaName = 
backendConnection.getConnectionSession().getSchemaName();
         metadataRefreshEngine = new MetaDataRefreshEngine(metaData,
-                
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getOptimizerContext().getMetaData().getSchemas().get(backendConnection.getConnectionSession().getSchemaName()),
+                
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getOptimizerContext().getFederationMetaData().getSchemas().get(schemaName),
                 
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getProps());
+        MetaDataContexts metaDataContexts = 
ProxyContext.getInstance().getContextManager().getMetaDataContexts();
+        federationExecutor = FederationExecutorFactory.newInstance(schemaName, 
metaDataContexts.getOptimizerContext(), 
+                metaDataContexts.getProps(), new 
JDBCExecutor(BackendExecutorContext.getInstance().getExecutorEngine(), 
backendConnection.isSerialExecute()));
     }
     
     /**
@@ -120,6 +148,12 @@ public final class DatabaseCommunicationEngine {
      */
     public ResponseHeader execute() throws SQLException {
         ExecutionContext executionContext = 
kernelProcessor.generateExecutionContext(logicSQL, metaData, 
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getProps());
+        // TODO move federation route logic to binder
+        if (executionContext.getRouteContext().isFederated()) {
+            MetaDataContexts metaDataContexts = 
ProxyContext.getInstance().getContextManager().getMetaDataContexts();
+            ResultSet resultSet = doExecuteFederation(logicSQL, 
metaDataContexts);
+            return processExecuteFederation(resultSet, metaDataContexts);
+        }
         if (executionContext.getExecutionUnits().isEmpty()) {
             return new 
UpdateResponseHeader(executionContext.getSqlStatementContext().getSqlStatement());
         }
@@ -131,6 +165,33 @@ public final class DatabaseCommunicationEngine {
                 : processExecuteUpdate(executionContext, 
executeResults.stream().map(each -> (UpdateResult) 
each).collect(Collectors.toList()));
     }
     
+    private ResultSet doExecuteFederation(final LogicSQL logicSQL, final 
MetaDataContexts metaDataContexts) throws SQLException {
+        boolean isReturnGeneratedKeys = 
logicSQL.getSqlStatementContext().getSqlStatement() instanceof 
MySQLInsertStatement;
+        DatabaseType databaseType = 
metaDataContexts.getMetaData(backendConnection.getConnectionSession().getSchemaName()).getResource().getDatabaseType();
+        ProxyJDBCExecutorCallback callback = 
ProxyJDBCExecutorCallbackFactory.newInstance(driverType, databaseType,
+                logicSQL.getSqlStatementContext().getSqlStatement(), this, 
isReturnGeneratedKeys, SQLExecutorExceptionHandler.isExceptionThrown(), true);
+        backendConnection.setFederationExecutor(federationExecutor);
+        DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
prepareEngine = createDriverExecutionPrepareEngine(isReturnGeneratedKeys, 
metaDataContexts);
+        return federationExecutor.executeQuery(prepareEngine, callback, 
logicSQL);
+    }
+    
+    private ResponseHeader processExecuteFederation(final ResultSet resultSet, 
final MetaDataContexts metaDataContexts) throws SQLException {
+        int columnCount = resultSet.getMetaData().getColumnCount();
+        queryHeaders = new ArrayList<>(columnCount);
+        for (int columnIndex = 1; columnIndex <= columnCount; columnIndex++) {
+            ShardingSphereMetaData metaData = 
metaDataContexts.getMetaData(backendConnection.getConnectionSession().getSchemaName());
+            queryHeaders.add(QueryHeaderBuilder.build(new 
JDBCQueryResultMetaData(resultSet.getMetaData()), metaData, columnIndex));
+        }
+        mergedResult = new 
IteratorStreamMergedResult(Collections.singletonList(new 
JDBCStreamQueryResult(resultSet)));
+        return new QueryResponseHeader(queryHeaders);
+    }
+    
+    private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
createDriverExecutionPrepareEngine(final boolean isReturnGeneratedKeys, final 
MetaDataContexts metaData) {
+        int maxConnectionsSizePerQuery = 
metaData.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
+        return new DriverExecutionPrepareEngine<>(driverType, 
maxConnectionsSizePerQuery, backendConnection, new 
StatementOption(isReturnGeneratedKeys),
+                
metaData.getMetaData(backendConnection.getConnectionSession().getSchemaName()).getRuleMetaData().getRules());
+    }
+    
     private Collection<ExecuteResult> doExecute(final ExecutionContext 
executionContext) throws SQLException {
         Collection<ExecuteResult> result = 
proxySQLExecutor.execute(executionContext);
         refreshMetaData(executionContext);
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
index e6bb8b0..a870ceb 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
@@ -33,19 +33,15 @@ import 
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
-import org.apache.shardingsphere.infra.federation.executor.FederationExecutor;
-import 
org.apache.shardingsphere.infra.federation.executor.FederationExecutorFactory;
 import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
 import org.apache.shardingsphere.infra.rule.identifier.type.RawExecutionRule;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.ProxyJDBCExecutor;
-import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.ProxyJDBCExecutorCallback;
-import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.ProxyJDBCExecutorCallbackFactory;
-import 
org.apache.shardingsphere.proxy.backend.session.transaction.TransactionStatus;
 import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import 
org.apache.shardingsphere.proxy.backend.exception.TableModifyInTransactionException;
+import 
org.apache.shardingsphere.proxy.backend.session.transaction.TransactionStatus;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.DDLStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLInsertStatement;
@@ -58,7 +54,6 @@ import java.sql.SQLException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Optional;
-import java.util.stream.Collectors;
 
 /**
  * Proxy SQL Executor.
@@ -69,25 +64,18 @@ public final class ProxySQLExecutor {
     
     private final JDBCBackendConnection backendConnection;
     
-    private final DatabaseCommunicationEngine databaseCommunicationEngine;
-    
     private final ProxyJDBCExecutor jdbcExecutor;
     
     private final RawExecutor rawExecutor;
     
-    private final FederationExecutor federationExecutor;
-    
     public ProxySQLExecutor(final String type, final JDBCBackendConnection 
backendConnection, final DatabaseCommunicationEngine 
databaseCommunicationEngine) {
         this.type = type;
         this.backendConnection = backendConnection;
-        this.databaseCommunicationEngine = databaseCommunicationEngine;
         ExecutorEngine executorEngine = 
BackendExecutorContext.getInstance().getExecutorEngine();
         boolean isSerialExecute = backendConnection.isSerialExecute();
         MetaDataContexts metaDataContexts = 
ProxyContext.getInstance().getContextManager().getMetaDataContexts();
         jdbcExecutor = new ProxyJDBCExecutor(type, 
backendConnection.getConnectionSession(), databaseCommunicationEngine, new 
JDBCExecutor(executorEngine, isSerialExecute));
         rawExecutor = new RawExecutor(executorEngine, isSerialExecute, 
metaDataContexts.getProps());
-        federationExecutor = FederationExecutorFactory.newInstance(
-                backendConnection.getConnectionSession().getSchemaName(), 
metaDataContexts.getOptimizerContext(), metaDataContexts.getProps(), new 
JDBCExecutor(executorEngine, isSerialExecute));
     }
     
     /**
@@ -133,9 +121,6 @@ public final class ProxySQLExecutor {
         if (rules.stream().anyMatch(each -> each instanceof RawExecutionRule)) 
{
             return rawExecute(executionContext, rules, 
maxConnectionsSizePerQuery);
         }
-        if (executionContext.getRouteContext().isFederated()) {
-            return federateExecute(executionContext, isReturnGeneratedKeys, 
SQLExecutorExceptionHandler.isExceptionThrown());
-        }
         return useDriverToExecute(executionContext, rules, 
maxConnectionsSizePerQuery, isReturnGeneratedKeys, 
SQLExecutorExceptionHandler.isExceptionThrown());
     }
     
@@ -153,25 +138,6 @@ public final class ProxySQLExecutor {
         return rawExecutor.execute(executionGroupContext, 
executionContext.getLogicSQL(), new RawSQLExecutorCallback());
     }
     
-    private Collection<ExecuteResult> federateExecute(final ExecutionContext 
executionContext, final boolean isReturnGeneratedKeys, final boolean 
isExceptionThrown) throws SQLException {
-        if (executionContext.getExecutionUnits().isEmpty()) {
-            return Collections.emptyList();
-        }
-        MetaDataContexts metaData = 
ProxyContext.getInstance().getContextManager().getMetaDataContexts();
-        String schemaName = 
backendConnection.getConnectionSession().getSchemaName();
-        ProxyJDBCExecutorCallback callback = 
ProxyJDBCExecutorCallbackFactory.newInstance(type, 
metaData.getMetaData(schemaName).getResource().getDatabaseType(),
-                executionContext.getSqlStatementContext().getSqlStatement(), 
databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown, true);
-        backendConnection.setFederationExecutor(federationExecutor);
-        DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
prepareEngine = createDriverExecutionPrepareEngine(isReturnGeneratedKeys, 
metaData);
-        return federationExecutor.executeQuery(prepareEngine, callback, 
executionContext).stream().map(each -> (ExecuteResult) 
each).collect(Collectors.toList());
-    }
-    
-    private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
createDriverExecutionPrepareEngine(final boolean isReturnGeneratedKeys, final 
MetaDataContexts metaData) {
-        int maxConnectionsSizePerQuery = 
metaData.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
-        return new DriverExecutionPrepareEngine<>(type, 
maxConnectionsSizePerQuery, backendConnection, new 
StatementOption(isReturnGeneratedKeys),
-                
metaData.getMetaData(backendConnection.getConnectionSession().getSchemaName()).getRuleMetaData().getRules());
-    }
-    
     private Collection<ExecuteResult> useDriverToExecute(final 
ExecutionContext executionContext, final Collection<ShardingSphereRule> rules, 
                                                          final int 
maxConnectionsSizePerQuery, final boolean isReturnGeneratedKeys, final boolean 
isExceptionThrown) throws SQLException {
         DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
prepareEngine = new DriverExecutionPrepareEngine<>(
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/resources/cases/dql/dataset/dbtbl_with_readwrite_splitting_and_encrypt/select_join_encrypt.xml
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/resources/cases/dql/dataset/dbtbl_with_readwrite_splitting_and_encrypt/select_join_encrypt.xml
index 8444e86..113265b 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/resources/cases/dql/dataset/dbtbl_with_readwrite_splitting_and_encrypt/select_join_encrypt.xml
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/resources/cases/dql/dataset/dbtbl_with_readwrite_splitting_and_encrypt/select_join_encrypt.xml
@@ -22,18 +22,10 @@
         <column name="pwd" />
         <column name="status" />
         <column name="item_id" />
-        <column name="user_id" />
-        <column name="status" />
+        <column name="user_id0" />
+        <column name="status0" />
         <column name="creation_date" />
     </metadata>
     <row values="0, 10000, a00, init_read, 100000, 0, init_read, 2017-08-08" />
     <row values="11, 11001, b11, init_read, 110001, 11, init_read, 2017-08-08" 
/>
-    <row values="22, 12002, c22, init_read, 120002, 22, init_read, 2017-08-08" 
/>
-    <row values="33, 13003, d33, init_read, 130003, 33, init_read, 2017-08-08" 
/>
-    <row values="44, 14004, e44, init_read, 140004, 44, init_read, 2017-08-08" 
/>
-    <row values="55, 15005, f55, init_read, 150005, 55, init_read, 2017-08-08" 
/>
-    <row values="66, 16006, g66, init_read, 160006, 66, init_read, 2017-08-08" 
/>
-    <row values="77, 17007, h77, init_read, 170007, 77, init_read, 2017-08-08" 
/>
-    <row values="88, 18008, i88, init_read, 180008, 88, init_read, 2017-08-08" 
/>
-    <row values="99, 19009, j99, init_read, 190009, 99, init_read, 2017-08-08" 
/>
 </dataset>
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/resources/cases/dql/dql-integration-test-cases.xml
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/resources/cases/dql/dql-integration-test-cases.xml
index 0eff84c..201bd60 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/resources/cases/dql/dql-integration-test-cases.xml
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/resources/cases/dql/dql-integration-test-cases.xml
@@ -513,7 +513,7 @@
         <assertion expected-data-file="select_in_or_encrypt.xml"/>
     </test-case>
     
-    <test-case sql="SELECT * FROM t_user u INNER JOIN t_user_item m ON 
u.user_id=m.user_id WHERE u.user_id IN (0, 11, 22, 33, 44, 55, 66, 77, 88, 99)" 
scenario-types="encrypt,dbtbl_with_readwrite_splitting_and_encrypt">
+    <test-case sql="SELECT * FROM t_user u INNER JOIN t_user_item m ON 
u.user_id=m.user_id WHERE u.user_id IN (0, 11)" 
scenario-types="encrypt,dbtbl_with_readwrite_splitting_and_encrypt">
         <assertion expected-data-file="select_join_encrypt.xml"/>
     </test-case>
 

Reply via email to