This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 344c41d9c43 Add new SQLFederationRelConverter to simplify engine logic 
and refactor sql federation executor package (#35483)
344c41d9c43 is described below

commit 344c41d9c4347bab1cd2aa36f2920ef8156996a8
Author: Zhengqiang Duan <duanzhengqi...@apache.org>
AuthorDate: Thu May 22 17:15:08 2025 +0800

    Add new SQLFederationRelConverter to simplify engine logic and refactor sql 
federation executor package (#35483)
    
    * Add new SQLFederationRelConverter to simplify engine logic
    
    * move LogicalScanPushDownRelBuilder and LogicalScanRelRewriter to rel 
package
    
    * refactor sql federation executor context
    
    * refactor sql federation executor package
    
    * fix checkstyle
---
 .../executor/engine/DriverExecuteExecutor.java     |   2 +-
 .../engine/DriverExecuteQueryExecutor.java         |   2 +-
 .../compiler/compiler/SQLStatementCompiler.java    |  21 ++--
 .../ScanImplementor.java}                          |  10 +-
 .../ScanImplementorContext.java}                   |   6 +-
 .../enumerator/EmptyDataRowEnumerator.java}        |   6 +-
 .../metadata/schema/SQLFederationTable.java        |  22 ++--
 .../builder}/LogicalScanPushDownRelBuilder.java    |   2 +-
 .../converter/SQLFederationRelConverter.java}      | 137 +++++++++++----------
 .../rewriter/LogicalScanRelRewriter.java}          |  14 +--
 .../compiler/sql/operator/logical/LogicalScan.java |   2 +-
 .../sql/operator/physical/EnumerableScan.java      |   2 +-
 .../compiler/it/SQLStatementCompilerIT.java        |  32 ++---
 .../context/SQLFederationContext.java              |   2 +-
 .../sqlfederation/engine/SQLFederationEngine.java  |  51 +++-----
 .../engine/processor/SQLFederationProcessor.java   |   6 +-
 .../processor/SQLFederationProcessorFactory.java   |   6 +-
 .../impl/StandardSQLFederationProcessor.java       |  36 +++---
 ...onBindContext.java => ExecutorBindContext.java} |  10 +-
 .../executor/context/ExecutorContext.java          |  65 ++++++++++
 .../context/SQLFederationExecutorContext.java      |  41 ------
 .../enumerator/jdbc/JDBCDataRowEnumerator.java}    |   6 +-
 .../memory/MemoryDataRowEnumerator.java}           |  13 +-
 .../memory/MemoryDataTypeConverter.java}           |   4 +-
 .../memory/MemoryTableStatisticsBuilder.java}      |  56 +++++----
 .../EnumerableScanImplementor.java}                |  96 ++++++---------
 .../EnumerableScanImplementorTest.java}            |  44 ++++---
 .../connector/StandardDatabaseConnector.java       |   2 +-
 .../handler/distsql/rul/PreviewExecutor.java       |   2 +-
 .../OpenGaussSystemCatalogAdminQueryExecutor.java  |   2 +-
 30 files changed, 352 insertions(+), 348 deletions(-)

diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java
index 0de94186204..5713d36a04c 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java
@@ -40,7 +40,7 @@ import 
org.apache.shardingsphere.infra.session.query.QueryContext;
 import 
org.apache.shardingsphere.mode.metadata.refresher.federation.FederationMetaDataRefreshEngine;
 import 
org.apache.shardingsphere.sql.parser.statement.core.statement.tcl.TCLStatement;
 import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine;
-import 
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
+import org.apache.shardingsphere.sqlfederation.context.SQLFederationContext;
 
 import java.sql.Connection;
 import java.sql.ResultSet;
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteQueryExecutor.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteQueryExecutor.java
index e5becef2394..f8a66409307 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteQueryExecutor.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteQueryExecutor.java
@@ -32,7 +32,7 @@ import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
 import org.apache.shardingsphere.infra.session.query.QueryContext;
 import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine;
-import 
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
+import org.apache.shardingsphere.sqlfederation.context.SQLFederationContext;
 
 import java.sql.Connection;
 import java.sql.ResultSet;
diff --git 
a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/compiler/SQLStatementCompiler.java
 
b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/compiler/SQLStatementCompiler.java
index e30f4b2d6ba..c34e99c5606 100644
--- 
a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/compiler/SQLStatementCompiler.java
+++ 
b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/compiler/SQLStatementCompiler.java
@@ -25,14 +25,12 @@ import 
org.apache.calcite.rel.metadata.JaninoRelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataQueryBase;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql2rel.SqlToRelConverter;
 import 
org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatement;
 import 
org.apache.shardingsphere.sqlfederation.compiler.SQLFederationExecutionPlan;
 import 
org.apache.shardingsphere.sqlfederation.compiler.planner.builder.SQLFederationPlannerBuilder;
+import 
org.apache.shardingsphere.sqlfederation.compiler.rel.converter.SQLFederationRelConverter;
+import 
org.apache.shardingsphere.sqlfederation.compiler.rel.rewriter.LogicalScanRelRewriter;
 import 
org.apache.shardingsphere.sqlfederation.compiler.sql.ast.converter.SQLNodeConverterEngine;
-import 
org.apache.shardingsphere.sqlfederation.compiler.sql.operator.util.LogicalScanRelShuttle;
-
-import java.util.Objects;
 
 /**
  * SQL statement compiler.
@@ -40,7 +38,7 @@ import java.util.Objects;
 @RequiredArgsConstructor
 public final class SQLStatementCompiler {
     
-    private final SqlToRelConverter converter;
+    private final SQLFederationRelConverter converter;
     
     private final Convention convention;
     
@@ -55,20 +53,21 @@ public final class SQLStatementCompiler {
         
RelMetadataQueryBase.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.DEFAULT);
         SqlNode sqlNode = SQLNodeConverterEngine.convert(sqlStatement);
         RelNode logicalPlan = converter.convertQuery(sqlNode, true, true).rel;
-        RelDataType resultColumnType = 
Objects.requireNonNull(converter.validator).getValidatedNodeType(sqlNode);
-        RelNode replacePlan = LogicalScanRelShuttle.replace(logicalPlan, 
databaseType);
-        RelNode rewritePlan = rewrite(replacePlan, 
SQLFederationPlannerBuilder.buildHepPlanner());
+        RelDataType resultColumnType = converter.getValidatedNodeType(sqlNode);
+        RelNode rewritePlan = rewrite(logicalPlan, databaseType);
         RelNode physicalPlan = optimize(rewritePlan, converter);
         RelMetadataQueryBase.THREAD_PROVIDERS.remove();
         return new SQLFederationExecutionPlan(physicalPlan, resultColumnType);
     }
     
-    private RelNode rewrite(final RelNode logicalPlan, final RelOptPlanner 
hepPlanner) {
-        hepPlanner.setRoot(logicalPlan);
+    private RelNode rewrite(final RelNode logicalPlan, final String 
databaseType) {
+        RelNode rewrittenPlan = LogicalScanRelRewriter.rewrite(logicalPlan, 
databaseType);
+        RelOptPlanner hepPlanner = 
SQLFederationPlannerBuilder.buildHepPlanner();
+        hepPlanner.setRoot(rewrittenPlan);
         return hepPlanner.findBestExp();
     }
     
-    private RelNode optimize(final RelNode rewritePlan, final 
SqlToRelConverter converter) {
+    private RelNode optimize(final RelNode rewritePlan, final 
SQLFederationRelConverter converter) {
         RelOptPlanner planner = converter.getCluster().getPlanner();
         if 
(rewritePlan.getTraitSet().equals(converter.getCluster().traitSet().replace(convention)))
 {
             planner.setRoot(rewritePlan);
diff --git 
a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/metadata/schema/table/ScanExecutor.java
 
b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/implementor/ScanImplementor.java
similarity index 81%
rename from 
kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/metadata/schema/table/ScanExecutor.java
rename to 
kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/implementor/ScanImplementor.java
index 3fe845b6871..95624c4b0db 100644
--- 
a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/metadata/schema/table/ScanExecutor.java
+++ 
b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/implementor/ScanImplementor.java
@@ -15,22 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.sqlfederation.compiler.metadata.schema.table;
+package org.apache.shardingsphere.sqlfederation.compiler.implementor;
 
 import org.apache.calcite.linq4j.Enumerable;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
 
 /**
- * Scan executor interface.
+ * Scan implementor interface.
  */
-public interface ScanExecutor {
+public interface ScanImplementor {
     
     /**
-     * Execute.
+     * Implement.
      *
      * @param table table meta data
      * @param scanContext push down table scan context
      * @return query results
      */
-    Enumerable<Object> execute(ShardingSphereTable table, ScanExecutorContext 
scanContext);
+    Enumerable<Object> implement(ShardingSphereTable table, 
ScanImplementorContext scanContext);
 }
diff --git 
a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/metadata/schema/table/ScanExecutorContext.java
 
b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/implementor/ScanImplementorContext.java
similarity index 87%
rename from 
kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/metadata/schema/table/ScanExecutorContext.java
rename to 
kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/implementor/ScanImplementorContext.java
index f3f9e44b2ac..01d9773d670 100644
--- 
a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/metadata/schema/table/ScanExecutorContext.java
+++ 
b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/implementor/ScanImplementorContext.java
@@ -15,18 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.sqlfederation.compiler.metadata.schema.table;
+package org.apache.shardingsphere.sqlfederation.compiler.implementor;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import org.apache.calcite.DataContext;
 
 /**
- * Scan executor context.
+ * Scan implementor context.
  */
 @RequiredArgsConstructor
 @Getter
-public final class ScanExecutorContext {
+public final class ScanImplementorContext {
     
     private final DataContext root;
     
diff --git 
a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/metadata/schema/table/EmptyRowEnumerator.java
 
b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/implementor/enumerator/EmptyDataRowEnumerator.java
similarity index 85%
rename from 
kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/metadata/schema/table/EmptyRowEnumerator.java
rename to 
kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/implementor/enumerator/EmptyDataRowEnumerator.java
index e4c6b65ff6e..f4426f56162 100644
--- 
a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/metadata/schema/table/EmptyRowEnumerator.java
+++ 
b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/implementor/enumerator/EmptyDataRowEnumerator.java
@@ -15,14 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.sqlfederation.compiler.metadata.schema.table;
+package 
org.apache.shardingsphere.sqlfederation.compiler.implementor.enumerator;
 
 import org.apache.calcite.linq4j.Enumerator;
 
 /**
- * Empty row enumerator.
+ * Empty data row enumerator.
  */
-public final class EmptyRowEnumerator implements Enumerator<Object> {
+public final class EmptyDataRowEnumerator implements Enumerator<Object> {
     
     @Override
     public Object current() {
diff --git 
a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/metadata/schema/SQLFederationTable.java
 
b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/metadata/schema/SQLFederationTable.java
index 3a990e7f7e6..547d721a55c 100644
--- 
a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/metadata/schema/SQLFederationTable.java
+++ 
b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/metadata/schema/SQLFederationTable.java
@@ -46,9 +46,9 @@ import org.apache.calcite.schema.impl.AbstractTable;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import 
org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
-import 
org.apache.shardingsphere.sqlfederation.compiler.metadata.schema.table.EmptyRowEnumerator;
-import 
org.apache.shardingsphere.sqlfederation.compiler.metadata.schema.table.ScanExecutor;
-import 
org.apache.shardingsphere.sqlfederation.compiler.metadata.schema.table.ScanExecutorContext;
+import 
org.apache.shardingsphere.sqlfederation.compiler.implementor.ScanImplementor;
+import 
org.apache.shardingsphere.sqlfederation.compiler.implementor.ScanImplementorContext;
+import 
org.apache.shardingsphere.sqlfederation.compiler.implementor.enumerator.EmptyDataRowEnumerator;
 import 
org.apache.shardingsphere.sqlfederation.compiler.metadata.datatype.SQLFederationDataTypeBuilder;
 
 import java.lang.reflect.Type;
@@ -62,7 +62,7 @@ import java.util.List;
 @RequiredArgsConstructor
 public final class SQLFederationTable extends AbstractTable implements 
ModifiableTable, TranslatableTable {
     
-    private static final TransmittableThreadLocal<ScanExecutor> 
SCAN_EXECUTOR_HOLDER = new TransmittableThreadLocal<>();
+    private static final TransmittableThreadLocal<ScanImplementor> 
SCAN_EXECUTOR_HOLDER = new TransmittableThreadLocal<>();
     
     private final ShardingSphereTable table;
     
@@ -94,18 +94,18 @@ public final class SQLFederationTable extends AbstractTable 
implements Modifiabl
     }
     
     /**
-     * Execute.
+     * Implement.
      *
      * @param root data context
      * @param sql sql
      * @param paramIndexes param indexes
      * @return enumerable result
      */
-    public Enumerable<Object> execute(final DataContext root, final String 
sql, final int[] paramIndexes) {
+    public Enumerable<Object> implement(final DataContext root, final String 
sql, final int[] paramIndexes) {
         if (null == SCAN_EXECUTOR_HOLDER.get()) {
             return createEmptyEnumerable();
         }
-        return SCAN_EXECUTOR_HOLDER.get().execute(table, new 
ScanExecutorContext(root, sql, paramIndexes));
+        return SCAN_EXECUTOR_HOLDER.get().implement(table, new 
ScanImplementorContext(root, sql, paramIndexes));
     }
     
     private AbstractEnumerable<Object> createEmptyEnumerable() {
@@ -113,7 +113,7 @@ public final class SQLFederationTable extends AbstractTable 
implements Modifiabl
             
             @Override
             public Enumerator<Object> enumerator() {
-                return new EmptyRowEnumerator();
+                return new EmptyDataRowEnumerator();
             }
         };
     }
@@ -138,10 +138,10 @@ public final class SQLFederationTable extends 
AbstractTable implements Modifiabl
     /**
      * Set scan executor.
      *
-     * @param scanExecutor scan executor
+     * @param scanImplementor scan executor
      */
-    public void setScanExecutor(final ScanExecutor scanExecutor) {
-        SCAN_EXECUTOR_HOLDER.set(scanExecutor);
+    public void setScanExecutor(final ScanImplementor scanImplementor) {
+        SCAN_EXECUTOR_HOLDER.set(scanImplementor);
     }
     
     /**
diff --git 
a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/sql/operator/util/LogicalScanPushDownRelBuilder.java
 
b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/rel/builder/LogicalScanPushDownRelBuilder.java
similarity index 95%
rename from 
kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/sql/operator/util/LogicalScanPushDownRelBuilder.java
rename to 
kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/rel/builder/LogicalScanPushDownRelBuilder.java
index a63dce7ad8e..37a27201f7e 100644
--- 
a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/sql/operator/util/LogicalScanPushDownRelBuilder.java
+++ 
b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/rel/builder/LogicalScanPushDownRelBuilder.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.sqlfederation.compiler.sql.operator.util;
+package org.apache.shardingsphere.sqlfederation.compiler.rel.builder;
 
 import org.apache.calcite.plan.Contexts;
 import org.apache.calcite.plan.RelOptCluster;
diff --git 
a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/metadata/util/SQLFederationValidatorUtils.java
 
b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/rel/converter/SQLFederationRelConverter.java
similarity index 55%
rename from 
kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/metadata/util/SQLFederationValidatorUtils.java
rename to 
kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/rel/converter/SQLFederationRelConverter.java
index 742b998c3ed..5f4777390d0 100644
--- 
a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/metadata/util/SQLFederationValidatorUtils.java
+++ 
b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/rel/converter/SQLFederationRelConverter.java
@@ -15,20 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.sqlfederation.compiler.metadata.util;
+package org.apache.shardingsphere.sqlfederation.compiler.rel.converter;
 
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.config.CalciteConnectionConfig;
-import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.calcite.plan.Convention;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelOptTable.ViewExpander;
 import org.apache.calcite.prepare.CalciteCatalogReader;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlOperatorTable;
 import org.apache.calcite.sql.fun.SqlLibrary;
 import org.apache.calcite.sql.fun.SqlLibraryOperatorTableFactory;
@@ -38,25 +39,25 @@ import org.apache.calcite.sql.validate.SqlValidatorUtil;
 import org.apache.calcite.sql2rel.SqlToRelConverter;
 import org.apache.calcite.sql2rel.SqlToRelConverter.Config;
 import org.apache.calcite.sql2rel.StandardConvertletTable;
-import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.parser.rule.SQLParserRule;
-import 
org.apache.shardingsphere.sqlfederation.compiler.sql.function.SQLFederationFunctionRegister;
-import 
org.apache.shardingsphere.sqlfederation.compiler.sql.function.mysql.MySQLOperatorTable;
+import 
org.apache.shardingsphere.sqlfederation.compiler.context.CompilerContext;
 import 
org.apache.shardingsphere.sqlfederation.compiler.metadata.catalog.SQLFederationCatalogReader;
+import 
org.apache.shardingsphere.sqlfederation.compiler.metadata.datatype.SQLFederationDataTypeFactory;
 import 
org.apache.shardingsphere.sqlfederation.compiler.metadata.view.ShardingSphereViewExpander;
 import 
org.apache.shardingsphere.sqlfederation.compiler.planner.builder.SQLFederationPlannerBuilder;
+import 
org.apache.shardingsphere.sqlfederation.compiler.sql.function.mysql.MySQLOperatorTable;
 
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 /**
- * SQL federation validator utility class.
+ * SQL federation rel converter.
  */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class SQLFederationValidatorUtils {
+public final class SQLFederationRelConverter {
     
     private static final Map<String, SqlLibrary> DATABASE_TYPE_SQL_LIBRARIES = 
new HashMap<>();
     
@@ -67,35 +68,19 @@ public final class SQLFederationValidatorUtils {
         DATABASE_TYPE_SQL_LIBRARIES.put("Oracle", SqlLibrary.ORACLE);
     }
     
-    /**
-     * Create catalog reader.
-     *
-     * @param schemaName schema name
-     * @param schema schema
-     * @param relDataTypeFactory rel data type factory
-     * @param connectionConfig connection config
-     * @param databaseType database type
-     * @return calcite catalog reader
-     */
-    public static CalciteCatalogReader createCatalogReader(final String 
schemaName, final Schema schema, final RelDataTypeFactory relDataTypeFactory, 
final CalciteConnectionConfig connectionConfig,
-                                                           final DatabaseType 
databaseType) {
-        CalciteSchema rootSchema = CalciteSchema.createRootSchema(true);
-        rootSchema.add(schemaName, schema);
-        
DatabaseTypedSPILoader.findService(SQLFederationFunctionRegister.class, 
databaseType).ifPresent(optional -> 
optional.registerFunction(rootSchema.plus(), schemaName));
-        return new SQLFederationCatalogReader(rootSchema, 
Collections.singletonList(schemaName), relDataTypeFactory, connectionConfig);
+    private final SqlToRelConverter sqlToRelConverter;
+    
+    public SQLFederationRelConverter(final CompilerContext compilerContext, 
final List<String> schemaPath, final DatabaseType databaseType, final 
Convention convention) {
+        JavaTypeFactory typeFactory = 
SQLFederationDataTypeFactory.getInstance();
+        CalciteConnectionConfig connectionConfig = 
compilerContext.getConnectionConfig();
+        CalciteCatalogReader catalogReader = new 
SQLFederationCatalogReader(compilerContext.getCalciteSchema(), schemaPath, 
typeFactory, connectionConfig);
+        SqlValidator validator = createSqlValidator(catalogReader, 
typeFactory, databaseType, connectionConfig);
+        RelOptCluster relOptCluster = createRelOptCluster(typeFactory, 
convention);
+        sqlToRelConverter = createSqlToRelConverter(catalogReader, validator, 
relOptCluster, compilerContext.getSqlParserRule(), databaseType, true);
     }
     
-    /**
-     * Create sql validator.
-     *
-     * @param catalogReader catalog reader
-     * @param relDataTypeFactory rel data type factory
-     * @param databaseType database type
-     * @param connectionConfig connection config
-     * @return sql validator
-     */
-    public static SqlValidator createSqlValidator(final CalciteCatalogReader 
catalogReader, final RelDataTypeFactory relDataTypeFactory, final DatabaseType 
databaseType,
-                                                  final 
CalciteConnectionConfig connectionConfig) {
+    private SqlValidator createSqlValidator(final CalciteCatalogReader 
catalogReader, final RelDataTypeFactory relDataTypeFactory, final DatabaseType 
databaseType,
+                                            final CalciteConnectionConfig 
connectionConfig) {
         SqlValidator.Config validatorConfig = 
SqlValidator.Config.DEFAULT.withLenientOperatorLookup(connectionConfig.lenientOperatorLookup()).withConformance(connectionConfig.conformance())
                 
.withDefaultNullCollation(connectionConfig.defaultNullCollation()).withIdentifierExpansion(true);
         SqlOperatorTable sqlOperatorTable = getSQLOperatorTable(catalogReader, 
databaseType.getTrunkDatabaseType().orElse(databaseType));
@@ -103,41 +88,63 @@ public final class SQLFederationValidatorUtils {
     }
     
     private static SqlOperatorTable getSQLOperatorTable(final 
CalciteCatalogReader catalogReader, final DatabaseType databaseType) {
-        return SqlOperatorTables.chain(Arrays.asList(new MySQLOperatorTable(),
-                
SqlLibraryOperatorTableFactory.INSTANCE.getOperatorTable(Arrays.asList(SqlLibrary.STANDARD,
 DATABASE_TYPE_SQL_LIBRARIES.getOrDefault(databaseType.getType(), 
SqlLibrary.MYSQL))),
-                catalogReader));
+        SqlOperatorTable operatorTable =
+                
SqlLibraryOperatorTableFactory.INSTANCE.getOperatorTable(Arrays.asList(SqlLibrary.STANDARD,
 DATABASE_TYPE_SQL_LIBRARIES.getOrDefault(databaseType.getType(), 
SqlLibrary.MYSQL)));
+        return SqlOperatorTables.chain(Arrays.asList(new MySQLOperatorTable(), 
operatorTable, catalogReader));
     }
     
-    /**
-     * Create sql to rel converter.
-     *
-     * @param catalogReader catalog reader
-     * @param validator validator
-     * @param cluster cluster
-     * @param sqlParserRule sql parser rule
-     * @param databaseType database type
-     * @param needsViewExpand whether sql needs view expand or not
-     * @return sql to rel converter
-     */
-    public static SqlToRelConverter createSqlToRelConverter(final 
CalciteCatalogReader catalogReader, final SqlValidator validator, final 
RelOptCluster cluster, final SQLParserRule sqlParserRule,
-                                                            final DatabaseType 
databaseType, final boolean needsViewExpand) {
-        ViewExpander expander =
-                needsViewExpand ? new 
ShardingSphereViewExpander(sqlParserRule, databaseType, 
createSqlToRelConverter(catalogReader, validator, cluster, sqlParserRule, 
databaseType, false))
-                        : (rowType, queryString, schemaPath, viewPath) -> null;
+    private SqlToRelConverter createSqlToRelConverter(final 
CalciteCatalogReader catalogReader, final SqlValidator validator, final 
RelOptCluster cluster, final SQLParserRule sqlParserRule,
+                                                      final DatabaseType 
databaseType, final boolean needsViewExpand) {
+        ViewExpander expander = needsViewExpand
+                ? new ShardingSphereViewExpander(sqlParserRule, databaseType, 
createSqlToRelConverter(catalogReader, validator, cluster, sqlParserRule, 
databaseType, false))
+                : (rowType, queryString, schemaPath, viewPath) -> null;
         // TODO remove withRemoveSortInSubQuery when calcite can expand view 
which contains order by correctly
         Config converterConfig = 
SqlToRelConverter.config().withTrimUnusedFields(true).withRemoveSortInSubQuery(false).withExpand(true);
         return new SqlToRelConverter(expander, validator, catalogReader, 
cluster, StandardConvertletTable.INSTANCE, converterConfig);
     }
     
+    private RelOptCluster createRelOptCluster(final RelDataTypeFactory 
relDataTypeFactory, final Convention convention) {
+        RelOptPlanner volcanoPlanner = 
SQLFederationPlannerBuilder.buildVolcanoPlanner(convention);
+        return RelOptCluster.create(volcanoPlanner, new 
RexBuilder(relDataTypeFactory));
+    }
+    
     /**
-     * Create rel opt cluster.
+     * Get schema plus.
      *
-     * @param relDataTypeFactory rel data type factory
-     * @param convention convention
-     * @return rel opt cluster
+     * @return schema plus
      */
-    public static RelOptCluster createRelOptCluster(final RelDataTypeFactory 
relDataTypeFactory, final Convention convention) {
-        RelOptPlanner volcanoPlanner = 
SQLFederationPlannerBuilder.buildVolcanoPlanner(convention);
-        return RelOptCluster.create(volcanoPlanner, new 
RexBuilder(relDataTypeFactory));
+    public SchemaPlus getSchemaPlus() {
+        return 
sqlToRelConverter.validator.getCatalogReader().getRootSchema().plus();
+    }
+    
+    /**
+     * Convert query.
+     *
+     * @param sqlNode sql node
+     * @param needsValidation need validation
+     * @param top top
+     * @return rel root
+     */
+    public RelRoot convertQuery(final SqlNode sqlNode, final boolean 
needsValidation, final boolean top) {
+        return sqlToRelConverter.convertQuery(sqlNode, needsValidation, top);
+    }
+    
+    /**
+     * Get validated node type.
+     *
+     * @param sqlNode sql node
+     * @return rel data type
+     */
+    public RelDataType getValidatedNodeType(final SqlNode sqlNode) {
+        return 
Objects.requireNonNull(sqlToRelConverter.validator).getValidatedNodeType(sqlNode);
+    }
+    
+    /**
+     * Get cluster.
+     *
+     * @return cluster
+     */
+    public RelOptCluster getCluster() {
+        return sqlToRelConverter.getCluster();
     }
 }
diff --git 
a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/sql/operator/util/LogicalScanRelShuttle.java
 
b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/rel/rewriter/LogicalScanRelRewriter.java
similarity index 79%
rename from 
kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/sql/operator/util/LogicalScanRelShuttle.java
rename to 
kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/rel/rewriter/LogicalScanRelRewriter.java
index 0d99e240a4c..4c024759a58 100644
--- 
a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/sql/operator/util/LogicalScanRelShuttle.java
+++ 
b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/rel/rewriter/LogicalScanRelRewriter.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.sqlfederation.compiler.sql.operator.util;
+package org.apache.shardingsphere.sqlfederation.compiler.rel.rewriter;
 
 import lombok.AccessLevel;
 import lombok.RequiredArgsConstructor;
@@ -25,10 +25,10 @@ import org.apache.calcite.rel.core.TableScan;
 import 
org.apache.shardingsphere.sqlfederation.compiler.sql.operator.logical.LogicalScan;
 
 /**
- * Logical scan rel shuttle.
+ * Logical scan rel rewriter.
  */
 @RequiredArgsConstructor(access = AccessLevel.PRIVATE)
-public final class LogicalScanRelShuttle extends RelShuttleImpl {
+public final class LogicalScanRelRewriter extends RelShuttleImpl {
     
     private final String databaseType;
     
@@ -38,13 +38,13 @@ public final class LogicalScanRelShuttle extends 
RelShuttleImpl {
     }
     
     /**
-     * Replace table scan with logical scan.
+     * Rewrite table scan to logical scan.
      *
      * @param relNode rel node
      * @param databaseType database type
-     * @return replaced rel node
+     * @return rewritten rel node
      */
-    public static RelNode replace(final RelNode relNode, final String 
databaseType) {
-        return relNode.accept(new LogicalScanRelShuttle(databaseType));
+    public static RelNode rewrite(final RelNode relNode, final String 
databaseType) {
+        return relNode.accept(new LogicalScanRelRewriter(databaseType));
     }
 }
diff --git 
a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/sql/operator/logical/LogicalScan.java
 
b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/sql/operator/logical/LogicalScan.java
index 1d331840a71..ecff13c9a01 100644
--- 
a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/sql/operator/logical/LogicalScan.java
+++ 
b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/sql/operator/logical/LogicalScan.java
@@ -24,7 +24,7 @@ import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rel.logical.LogicalTableScan;
-import 
org.apache.shardingsphere.sqlfederation.compiler.sql.operator.util.LogicalScanPushDownRelBuilder;
+import 
org.apache.shardingsphere.sqlfederation.compiler.rel.builder.LogicalScanPushDownRelBuilder;
 
 import java.util.Collections;
 import java.util.Objects;
diff --git 
a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/sql/operator/physical/EnumerableScan.java
 
b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/sql/operator/physical/EnumerableScan.java
index 91753425e3a..09713b01c88 100644
--- 
a/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/sql/operator/physical/EnumerableScan.java
+++ 
b/kernel/sql-federation/compiler/src/main/java/org/apache/shardingsphere/sqlfederation/compiler/sql/operator/physical/EnumerableScan.java
@@ -84,7 +84,7 @@ public final class EnumerableScan extends TableScan 
implements EnumerableRel {
     public Result implement(final EnumerableRelImplementor implementor, final 
Prefer pref) {
         PhysType physType = PhysTypeImpl.of(implementor.getTypeFactory(), 
getPushDownRowType(), pref.preferArray());
         int[] paramIndexes = null == sqlString.getDynamicParameters() ? new 
int[]{} : getParamIndexes(sqlString.getDynamicParameters());
-        return implementor.result(physType, 
Blocks.toBlock(Expressions.call(Objects.requireNonNull(table.getExpression(SQLFederationTable.class)),
 "execute", implementor.getRootExpression(),
+        return implementor.result(physType, 
Blocks.toBlock(Expressions.call(Objects.requireNonNull(table.getExpression(SQLFederationTable.class)),
 "implement", implementor.getRootExpression(),
                 Expressions.constant(sqlString.getSql().replace("u&'\\", 
"'\\u")), Expressions.constant(paramIndexes))));
     }
     
diff --git 
a/kernel/sql-federation/compiler/src/test/java/org/apache/shardingsphere/sqlfederation/compiler/compiler/it/SQLStatementCompilerIT.java
 
b/kernel/sql-federation/compiler/src/test/java/org/apache/shardingsphere/sqlfederation/compiler/compiler/it/SQLStatementCompilerIT.java
index 1fe8747357f..d2073118c67 100644
--- 
a/kernel/sql-federation/compiler/src/test/java/org/apache/shardingsphere/sqlfederation/compiler/compiler/it/SQLStatementCompilerIT.java
+++ 
b/kernel/sql-federation/compiler/src/test/java/org/apache/shardingsphere/sqlfederation/compiler/compiler/it/SQLStatementCompilerIT.java
@@ -19,14 +19,8 @@ package 
org.apache.shardingsphere.sqlfederation.compiler.compiler.it;
 
 import lombok.SneakyThrows;
 import org.apache.calcite.adapter.enumerable.EnumerableConvention;
-import org.apache.calcite.config.CalciteConnectionConfig;
 import org.apache.calcite.config.CalciteConnectionConfigImpl;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.prepare.CalciteCatalogReader;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.sql.validate.SqlValidator;
-import org.apache.calcite.sql2rel.SqlToRelConverter;
+import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereColumn;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
@@ -36,10 +30,10 @@ import org.apache.shardingsphere.parser.rule.SQLParserRule;
 import 
org.apache.shardingsphere.parser.rule.builder.DefaultSQLParserRuleConfigurationBuilder;
 import 
org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatement;
 import 
org.apache.shardingsphere.sqlfederation.compiler.compiler.SQLStatementCompiler;
+import 
org.apache.shardingsphere.sqlfederation.compiler.context.CompilerContext;
 import 
org.apache.shardingsphere.sqlfederation.compiler.metadata.datatype.SQLFederationDataTypeFactory;
 import 
org.apache.shardingsphere.sqlfederation.compiler.metadata.schema.SQLFederationSchema;
-import 
org.apache.shardingsphere.sqlfederation.compiler.metadata.util.SQLFederationValidatorUtils;
-import 
org.apache.shardingsphere.sqlfederation.compiler.planner.builder.SQLFederationPlannerBuilder;
+import 
org.apache.shardingsphere.sqlfederation.compiler.rel.converter.SQLFederationRelConverter;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.ExtensionContext;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -83,8 +77,12 @@ class SQLStatementCompilerIT {
         tables.add(createTProductDetailMetaData());
         tables.add(createMultiTypesFirstTableMetaData());
         tables.add(createMultiTypesSecondTableMetaData());
-        sqlStatementCompiler =
-                new SQLStatementCompiler(createSqlToRelConverter(new 
ShardingSphereSchema("foo_db", tables, Collections.emptyList())), 
EnumerableConvention.INSTANCE);
+        CalciteSchema calciteSchema = CalciteSchema.createRootSchema(true);
+        DatabaseType databaseType = 
TypedSPILoader.getService(DatabaseType.class, "H2");
+        calciteSchema.add(SCHEMA_NAME,
+                new SQLFederationSchema(SCHEMA_NAME, new 
ShardingSphereSchema("foo_db", tables, Collections.emptyList()), databaseType, 
SQLFederationDataTypeFactory.getInstance()));
+        sqlStatementCompiler = new SQLStatementCompiler(new 
SQLFederationRelConverter(new CompilerContext(mock(SQLParserRule.class), 
calciteSchema, new CalciteConnectionConfigImpl(new Properties())),
+                Collections.singletonList("federate_jdbc"), databaseType, 
EnumerableConvention.INSTANCE), EnumerableConvention.INSTANCE);
     }
     
     private ShardingSphereTable createOrderFederationTableMetaData() {
@@ -233,18 +231,6 @@ class SQLStatementCompilerIT {
                 Collections.emptyList(), Collections.emptyList());
     }
     
-    private SqlToRelConverter createSqlToRelConverter(final 
ShardingSphereSchema schema) {
-        CalciteConnectionConfig connectionConfig = new 
CalciteConnectionConfigImpl(new Properties());
-        RelDataTypeFactory relDataTypeFactory = 
SQLFederationDataTypeFactory.getInstance();
-        DatabaseType databaseType = 
TypedSPILoader.getService(DatabaseType.class, "H2");
-        SQLFederationSchema sqlFederationSchema = new 
SQLFederationSchema(SCHEMA_NAME, schema, databaseType, 
SQLFederationDataTypeFactory.getInstance());
-        CalciteCatalogReader catalogReader =
-                SQLFederationValidatorUtils.createCatalogReader(SCHEMA_NAME, 
sqlFederationSchema, relDataTypeFactory, connectionConfig, databaseType);
-        SqlValidator validator = 
SQLFederationValidatorUtils.createSqlValidator(catalogReader, 
relDataTypeFactory, databaseType, connectionConfig);
-        RelOptCluster cluster = 
RelOptCluster.create(SQLFederationPlannerBuilder.buildVolcanoPlanner(EnumerableConvention.INSTANCE),
 new RexBuilder(relDataTypeFactory));
-        return 
SQLFederationValidatorUtils.createSqlToRelConverter(catalogReader, validator, 
cluster, mock(SQLParserRule.class), databaseType, false);
-    }
-    
     @ParameterizedTest(name = "{0}")
     @ArgumentsSource(TestCaseArgumentsProvider.class)
     void assertCompile(final TestCase testcase) {
diff --git 
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/context/SQLFederationContext.java
 
b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/context/SQLFederationContext.java
similarity index 95%
rename from 
kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/context/SQLFederationContext.java
rename to 
kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/context/SQLFederationContext.java
index bce17720cdc..958bd470093 100644
--- 
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/context/SQLFederationContext.java
+++ 
b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/context/SQLFederationContext.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.sqlfederation.executor.context;
+package org.apache.shardingsphere.sqlfederation.context;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
diff --git 
a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngine.java
 
b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngine.java
index 007c2da6135..ab7c0d68554 100644
--- 
a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngine.java
+++ 
b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngine.java
@@ -19,20 +19,13 @@ package org.apache.shardingsphere.sqlfederation.engine;
 
 import com.google.common.base.Joiner;
 import lombok.Getter;
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.config.CalciteConnectionConfig;
 import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.prepare.CalciteCatalogReader;
 import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.sql.validate.SqlValidator;
-import org.apache.calcite.sql2rel.SqlToRelConverter;
 import 
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
 import 
org.apache.shardingsphere.infra.binder.context.statement.dal.ExplainStatementContext;
 import 
org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
 import org.apache.shardingsphere.infra.binder.context.type.TableAvailable;
 import 
org.apache.shardingsphere.infra.database.core.metadata.database.metadata.DialectDatabaseMetaData;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
 import org.apache.shardingsphere.infra.datanode.DataNode;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
@@ -54,18 +47,16 @@ import 
org.apache.shardingsphere.infra.session.query.QueryContext;
 import org.apache.shardingsphere.infra.spi.type.ordered.OrderedSPILoader;
 import 
org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.SimpleTableSegment;
 import 
org.apache.shardingsphere.sql.parser.statement.core.statement.dml.SelectStatement;
-import 
org.apache.shardingsphere.sqlfederation.engine.processor.SQLFederationProcessor;
-import 
org.apache.shardingsphere.sqlfederation.engine.processor.SQLFederationProcessorFactory;
-import 
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
-import 
org.apache.shardingsphere.sqlfederation.compiler.SQLFederationExecutionPlan;
 import 
org.apache.shardingsphere.sqlfederation.compiler.SQLFederationCompilerEngine;
+import 
org.apache.shardingsphere.sqlfederation.compiler.SQLFederationExecutionPlan;
 import 
org.apache.shardingsphere.sqlfederation.compiler.compiler.SQLStatementCompiler;
 import 
org.apache.shardingsphere.sqlfederation.compiler.context.CompilerContext;
 import 
org.apache.shardingsphere.sqlfederation.compiler.exception.SQLFederationUnsupportedSQLException;
-import 
org.apache.shardingsphere.sqlfederation.compiler.metadata.catalog.SQLFederationCatalogReader;
-import 
org.apache.shardingsphere.sqlfederation.compiler.metadata.datatype.SQLFederationDataTypeFactory;
-import 
org.apache.shardingsphere.sqlfederation.compiler.metadata.util.SQLFederationValidatorUtils;
 import 
org.apache.shardingsphere.sqlfederation.compiler.planner.cache.ExecutionPlanCacheKey;
+import 
org.apache.shardingsphere.sqlfederation.compiler.rel.converter.SQLFederationRelConverter;
+import 
org.apache.shardingsphere.sqlfederation.engine.processor.SQLFederationProcessor;
+import 
org.apache.shardingsphere.sqlfederation.engine.processor.SQLFederationProcessorFactory;
+import org.apache.shardingsphere.sqlfederation.context.SQLFederationContext;
 import org.apache.shardingsphere.sqlfederation.rule.SQLFederationRule;
 import org.apache.shardingsphere.sqlfederation.spi.SQLFederationDecider;
 
@@ -113,7 +104,7 @@ public final class SQLFederationEngine implements 
AutoCloseable {
         this.currentDatabaseName = currentDatabaseName;
         this.currentSchemaName = currentSchemaName;
         sqlFederationRule = 
metaData.getGlobalRuleMetaData().getSingleRule(SQLFederationRule.class);
-        processor = 
SQLFederationProcessorFactory.getInstance().newInstance(metaData, statistics, 
jdbcExecutor);
+        processor = 
SQLFederationProcessorFactory.getInstance().newInstance(statistics, 
jdbcExecutor);
     }
     
     /**
@@ -193,11 +184,13 @@ public final class SQLFederationEngine implements 
AutoCloseable {
         queryContext = federationContext.getQueryContext();
         try {
             SQLStatementContext sqlStatementContext = 
queryContext.getSqlStatementContext();
-            SqlToRelConverter converter = 
creeateSQLToRelConverter(sqlStatementContext, processor.getConvention());
-            schemaPlus = 
converter.validator.getCatalogReader().getRootSchema().plus();
-            processor.prepare(prepareEngine, callback, currentDatabaseName, 
currentSchemaName, federationContext, sqlFederationRule.getCompilerContext(), 
schemaPlus);
-            SQLFederationExecutionPlan executionPlan =
-                    compileQuery(converter, currentDatabaseName, 
currentSchemaName, federationContext.getMetaData(), sqlStatementContext, 
queryContext.getSql(), processor.getConvention());
+            CompilerContext compilerContext = 
sqlFederationRule.getCompilerContext();
+            SQLFederationRelConverter converter = new 
SQLFederationRelConverter(compilerContext,
+                    getSchemaPath(sqlStatementContext), 
sqlStatementContext.getDatabaseType(), processor.getConvention());
+            schemaPlus = converter.getSchemaPlus();
+            processor.prepare(prepareEngine, callback, currentDatabaseName, 
currentSchemaName, federationContext, compilerContext, schemaPlus);
+            SQLFederationExecutionPlan executionPlan = compileQuery(converter, 
currentDatabaseName,
+                    currentSchemaName, federationContext.getMetaData(), 
sqlStatementContext, queryContext.getSql(), processor.getConvention());
             resultSet = processor.executePlan(prepareEngine, callback, 
executionPlan, converter, federationContext, schemaPlus);
             return resultSet;
             // CHECKSTYLE:OFF
@@ -209,20 +202,8 @@ public final class SQLFederationEngine implements 
AutoCloseable {
         }
     }
     
-    private SqlToRelConverter creeateSQLToRelConverter(final 
SQLStatementContext sqlStatementContext, final Convention convention) {
-        CompilerContext compilerContext = 
sqlFederationRule.getCompilerContext();
-        JavaTypeFactory typeFactory = 
SQLFederationDataTypeFactory.getInstance();
-        CalciteConnectionConfig connectionConfig = 
compilerContext.getConnectionConfig();
-        DatabaseType databaseType = sqlStatementContext.getDatabaseType();
-        DialectDatabaseMetaData dialectDatabaseMetaData = new 
DatabaseTypeRegistry(databaseType).getDialectDatabaseMetaData();
-        List<String> schemaPath = getSchemaPath(dialectDatabaseMetaData, 
sqlStatementContext);
-        CalciteCatalogReader catalogReader = new 
SQLFederationCatalogReader(compilerContext.getCalciteSchema(), schemaPath, 
typeFactory, connectionConfig);
-        SqlValidator validator = 
SQLFederationValidatorUtils.createSqlValidator(catalogReader, typeFactory, 
databaseType, connectionConfig);
-        RelOptCluster relOptCluster = 
SQLFederationValidatorUtils.createRelOptCluster(typeFactory, convention);
-        return 
SQLFederationValidatorUtils.createSqlToRelConverter(catalogReader, validator, 
relOptCluster, compilerContext.getSqlParserRule(), databaseType, true);
-    }
-    
-    private List<String> getSchemaPath(final DialectDatabaseMetaData 
dialectDatabaseMetaData, final SQLStatementContext sqlStatementContext) {
+    private List<String> getSchemaPath(final SQLStatementContext 
sqlStatementContext) {
+        DialectDatabaseMetaData dialectDatabaseMetaData = new 
DatabaseTypeRegistry(sqlStatementContext.getDatabaseType()).getDialectDatabaseMetaData();
         // TODO set default schema according to search path result
         if 
(dialectDatabaseMetaData.getSchemaOption().getDefaultSchema().isPresent()) {
             return sqlStatementContext instanceof TableAvailable && 
((TableAvailable) 
sqlStatementContext).getTablesContext().getSimpleTables().stream().anyMatch(each
 -> each.getOwner().isPresent())
@@ -232,7 +213,7 @@ public final class SQLFederationEngine implements 
AutoCloseable {
         return Collections.singletonList(currentDatabaseName);
     }
     
-    private SQLFederationExecutionPlan compileQuery(final SqlToRelConverter 
converter, final String databaseName, final String schemaName, final 
ShardingSphereMetaData metaData,
+    private SQLFederationExecutionPlan compileQuery(final 
SQLFederationRelConverter converter, final String databaseName, final String 
schemaName, final ShardingSphereMetaData metaData,
                                                     final SQLStatementContext 
sqlStatementContext, final String sql, final Convention convention) {
         SQLStatementCompiler sqlStatementCompiler = new 
SQLStatementCompiler(converter, convention);
         SQLFederationCompilerEngine compilerEngine = new 
SQLFederationCompilerEngine(databaseName, schemaName, 
sqlFederationRule.getConfiguration().getExecutionPlanCache());
diff --git 
a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/SQLFederationProcessor.java
 
b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/SQLFederationProcessor.java
index 4895625ee23..8c7ad6d7059 100644
--- 
a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/SQLFederationProcessor.java
+++ 
b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/SQLFederationProcessor.java
@@ -19,15 +19,15 @@ package 
org.apache.shardingsphere.sqlfederation.engine.processor;
 
 import org.apache.calcite.plan.Convention;
 import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.sql2rel.SqlToRelConverter;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 import org.apache.shardingsphere.infra.session.query.QueryContext;
-import 
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
 import 
org.apache.shardingsphere.sqlfederation.compiler.SQLFederationExecutionPlan;
 import 
org.apache.shardingsphere.sqlfederation.compiler.context.CompilerContext;
+import 
org.apache.shardingsphere.sqlfederation.compiler.rel.converter.SQLFederationRelConverter;
+import org.apache.shardingsphere.sqlfederation.context.SQLFederationContext;
 
 import java.sql.Connection;
 import java.sql.ResultSet;
@@ -73,7 +73,7 @@ public interface SQLFederationProcessor {
      * @return resultset
      */
     ResultSet executePlan(DriverExecutionPrepareEngine<JDBCExecutionUnit, 
Connection> prepareEngine, JDBCExecutorCallback<? extends ExecuteResult> 
callback,
-                          SQLFederationExecutionPlan executionPlan, 
SqlToRelConverter converter, SQLFederationContext federationContext, SchemaPlus 
schemaPlus);
+                          SQLFederationExecutionPlan executionPlan, 
SQLFederationRelConverter converter, SQLFederationContext federationContext, 
SchemaPlus schemaPlus);
     
     /**
      * Get conversion.
diff --git 
a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/SQLFederationProcessorFactory.java
 
b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/SQLFederationProcessorFactory.java
index 634ea5907bd..91bd0bcc8bf 100644
--- 
a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/SQLFederationProcessorFactory.java
+++ 
b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/SQLFederationProcessorFactory.java
@@ -20,7 +20,6 @@ package 
org.apache.shardingsphere.sqlfederation.engine.processor;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
 import 
org.apache.shardingsphere.sqlfederation.engine.processor.impl.StandardSQLFederationProcessor;
 
@@ -44,12 +43,11 @@ public final class SQLFederationProcessorFactory {
     /**
      * Create new instance of {@link SQLFederationProcessor}.
      *
-     * @param metaData shardingSphere meta data
      * @param statistics shardingSphere statistics
      * @param jdbcExecutor JDBC executor
      * @return created instance
      */
-    public SQLFederationProcessor newInstance(final ShardingSphereMetaData 
metaData, final ShardingSphereStatistics statistics, final JDBCExecutor 
jdbcExecutor) {
-        return new StandardSQLFederationProcessor(metaData, statistics, 
jdbcExecutor);
+    public SQLFederationProcessor newInstance(final ShardingSphereStatistics 
statistics, final JDBCExecutor jdbcExecutor) {
+        return new StandardSQLFederationProcessor(statistics, jdbcExecutor);
     }
 }
diff --git 
a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/impl/StandardSQLFederationProcessor.java
 
b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/impl/StandardSQLFederationProcessor.java
index f12c70574f5..88ed595e514 100644
--- 
a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/impl/StandardSQLFederationProcessor.java
+++ 
b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/impl/StandardSQLFederationProcessor.java
@@ -27,7 +27,7 @@ import org.apache.calcite.plan.Convention;
 import org.apache.calcite.runtime.Bindable;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
-import org.apache.calcite.sql2rel.SqlToRelConverter;
+import 
org.apache.shardingsphere.infra.binder.context.segment.select.projection.Projection;
 import 
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
 import 
org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
 import org.apache.shardingsphere.infra.binder.context.type.TableAvailable;
@@ -40,19 +40,19 @@ import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.J
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
 import org.apache.shardingsphere.infra.session.query.QueryContext;
 import 
org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.SimpleTableSegment;
-import 
org.apache.shardingsphere.sqlfederation.engine.processor.SQLFederationProcessor;
-import 
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationBindContext;
-import 
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
-import 
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationExecutorContext;
-import 
org.apache.shardingsphere.sqlfederation.executor.enumerable.EnumerableScanExecutor;
 import 
org.apache.shardingsphere.sqlfederation.compiler.SQLFederationExecutionPlan;
 import 
org.apache.shardingsphere.sqlfederation.compiler.context.CompilerContext;
 import 
org.apache.shardingsphere.sqlfederation.compiler.exception.SQLFederationSchemaNotFoundException;
 import 
org.apache.shardingsphere.sqlfederation.compiler.metadata.schema.SQLFederationTable;
+import 
org.apache.shardingsphere.sqlfederation.compiler.rel.converter.SQLFederationRelConverter;
+import org.apache.shardingsphere.sqlfederation.context.SQLFederationContext;
+import 
org.apache.shardingsphere.sqlfederation.engine.processor.SQLFederationProcessor;
+import 
org.apache.shardingsphere.sqlfederation.executor.context.ExecutorBindContext;
+import 
org.apache.shardingsphere.sqlfederation.executor.context.ExecutorContext;
+import 
org.apache.shardingsphere.sqlfederation.executor.enumerable.implementor.EnumerableScanImplementor;
 import 
org.apache.shardingsphere.sqlfederation.resultset.SQLFederationResultSet;
 
 import java.sql.Connection;
@@ -69,12 +69,12 @@ import java.util.Map;
 @RequiredArgsConstructor
 public final class StandardSQLFederationProcessor implements 
SQLFederationProcessor {
     
-    private final ShardingSphereMetaData metaData;
-    
     private final ShardingSphereStatistics statistics;
     
     private final JDBCExecutor jdbcExecutor;
     
+    private ExecutorContext executorContext;
+    
     @Override
     public void prepare(final DriverExecutionPrepareEngine<JDBCExecutionUnit, 
Connection> prepareEngine, final JDBCExecutorCallback<? extends ExecuteResult> 
callback,
                         final String currentDatabaseName, final String 
currentSchemaName, final SQLFederationContext federationContext, final 
CompilerContext compilerContext,
@@ -82,9 +82,9 @@ public final class StandardSQLFederationProcessor implements 
SQLFederationProces
         if (null == schemaPlus) {
             return;
         }
-        SQLFederationExecutorContext executorContext = new 
SQLFederationExecutorContext(currentDatabaseName, currentSchemaName, 
metaData.getProps());
-        EnumerableScanExecutor scanExecutor =
-                new EnumerableScanExecutor(prepareEngine, jdbcExecutor, 
callback, compilerContext, executorContext, federationContext, 
metaData.getGlobalRuleMetaData(), statistics);
+        executorContext =
+                new ExecutorContext(prepareEngine, jdbcExecutor, callback, 
statistics, currentDatabaseName, currentSchemaName, 
federationContext.isPreview(), federationContext.getProcessId());
+        EnumerableScanImplementor scanExecutor = new 
EnumerableScanImplementor(federationContext.getQueryContext(), compilerContext, 
executorContext);
         SQLStatementContext sqlStatementContext = 
federationContext.getQueryContext().getSqlStatementContext();
         Collection<SimpleTableSegment> simpleTables = sqlStatementContext 
instanceof TableAvailable
                 ? ((TableAvailable) 
sqlStatementContext).getTablesContext().getSimpleTables()
@@ -134,14 +134,18 @@ public final class StandardSQLFederationProcessor 
implements SQLFederationProces
     @SuppressWarnings("unchecked")
     @Override
     public ResultSet executePlan(final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, 
final JDBCExecutorCallback<? extends ExecuteResult> callback,
-                                 final SQLFederationExecutionPlan 
executionPlan, final SqlToRelConverter converter, final SQLFederationContext 
federationContext,
+                                 final SQLFederationExecutionPlan 
executionPlan, final SQLFederationRelConverter converter, final 
SQLFederationContext federationContext,
                                  final SchemaPlus schemaPlus) {
         Bindable<Object> executablePlan = 
EnumerableInterpretable.toBindable(Collections.emptyMap(), null, 
(EnumerableRel) executionPlan.getPhysicalPlan(), Prefer.ARRAY);
         Map<String, Object> params = 
createParameters(federationContext.getQueryContext().getParameters());
-        Enumerator<Object> enumerator = executablePlan.bind(new 
SQLFederationBindContext(converter, params)).enumerator();
+        Enumerator<Object> enumerator = executablePlan.bind(new 
ExecutorBindContext(converter, params)).enumerator();
         SelectStatementContext selectStatementContext = 
(SelectStatementContext) 
federationContext.getQueryContext().getSqlStatementContext();
-        return new SQLFederationResultSet(enumerator, schemaPlus, 
selectStatementContext.getProjectionsContext().getExpandProjections(), 
selectStatementContext.getDatabaseType(),
-                executionPlan.getResultColumnType());
+        List<Projection> expandProjections = 
selectStatementContext.getProjectionsContext().getExpandProjections();
+        SQLFederationResultSet result = new SQLFederationResultSet(enumerator, 
schemaPlus, expandProjections, selectStatementContext.getDatabaseType(), 
executionPlan.getResultColumnType());
+        if (federationContext.isPreview()) {
+            
federationContext.getPreviewExecutionUnits().addAll(executorContext.getPreviewExecutionUnits());
+        }
+        return result;
     }
     
     private Map<String, Object> createParameters(final List<Object> params) {
diff --git 
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/context/SQLFederationBindContext.java
 
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/context/ExecutorBindContext.java
similarity index 85%
rename from 
kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/context/SQLFederationBindContext.java
rename to 
kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/context/ExecutorBindContext.java
index d896ce68fc0..45328373232 100644
--- 
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/context/SQLFederationBindContext.java
+++ 
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/context/ExecutorBindContext.java
@@ -23,24 +23,24 @@ import org.apache.calcite.DataContext;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.linq4j.QueryProvider;
 import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.sql2rel.SqlToRelConverter;
+import 
org.apache.shardingsphere.sqlfederation.compiler.rel.converter.SQLFederationRelConverter;
 
 import java.util.Map;
 
 /**
- * SQL federation bind context.
+ * Executor bind context.
  */
 @RequiredArgsConstructor
-public final class SQLFederationBindContext implements DataContext {
+public final class ExecutorBindContext implements DataContext {
     
-    private final SqlToRelConverter converter;
+    private final SQLFederationRelConverter converter;
     
     @Getter
     private final Map<String, Object> parameters;
     
     @Override
     public SchemaPlus getRootSchema() {
-        return converter.validator.getCatalogReader().getRootSchema().plus();
+        return converter.getSchemaPlus();
     }
     
     @Override
diff --git 
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/context/ExecutorContext.java
 
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/context/ExecutorContext.java
new file mode 100644
index 00000000000..14fc4c57d1e
--- /dev/null
+++ 
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/context/ExecutorContext.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.sqlfederation.executor.context;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
+import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
+import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
+import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
+
+import java.sql.Connection;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Executor context.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class ExecutorContext {
+    
+    private final ProcessEngine processEngine = new ProcessEngine();
+    
+    private final Map<String, Integer> connectionOffsets = new 
LinkedHashMap<>();
+    
+    private final Collection<ExecutionUnit> previewExecutionUnits = new 
LinkedList<>();
+    
+    private final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
prepareEngine;
+    
+    private final JDBCExecutor jdbcExecutor;
+    
+    private final JDBCExecutorCallback<? extends ExecuteResult> callback;
+    
+    private final ShardingSphereStatistics statistics;
+    
+    private final String currentDatabaseName;
+    
+    private final String currentSchemaName;
+    
+    private final boolean preview;
+    
+    private final String processId;
+}
diff --git 
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/context/SQLFederationExecutorContext.java
 
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/context/SQLFederationExecutorContext.java
deleted file mode 100644
index b06b5773dd0..00000000000
--- 
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/context/SQLFederationExecutorContext.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.sqlfederation.executor.context;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-/**
- * SQL federation executor context.
- */
-@RequiredArgsConstructor
-@Getter
-public final class SQLFederationExecutorContext {
-    
-    private final String currentDatabaseName;
-    
-    private final String currentSchemaName;
-    
-    private final ConfigurationProperties props;
-    
-    private final Map<String, Integer> connectionOffsets = new 
LinkedHashMap<>();
-}
diff --git 
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerator/JDBCRowEnumerator.java
 
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/enumerator/jdbc/JDBCDataRowEnumerator.java
similarity index 96%
rename from 
kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerator/JDBCRowEnumerator.java
rename to 
kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/enumerator/jdbc/JDBCDataRowEnumerator.java
index bb26814c93b..bbb8f361294 100644
--- 
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerator/JDBCRowEnumerator.java
+++ 
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/enumerator/jdbc/JDBCDataRowEnumerator.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.sqlfederation.executor.enumerator;
+package 
org.apache.shardingsphere.sqlfederation.executor.enumerable.enumerator.jdbc;
 
 import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
@@ -32,10 +32,10 @@ import java.time.LocalDateTime;
 import java.util.Collection;
 
 /**
- * JDBC row enumerator.
+ * JDBC data row enumerator.
  */
 @RequiredArgsConstructor
-public final class JDBCRowEnumerator implements Enumerator<Object> {
+public final class JDBCDataRowEnumerator implements Enumerator<Object> {
     
     private final MergedResult queryResult;
     
diff --git 
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerator/MemoryRowEnumerator.java
 
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/enumerator/memory/MemoryDataRowEnumerator.java
similarity index 78%
rename from 
kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerator/MemoryRowEnumerator.java
rename to 
kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/enumerator/memory/MemoryDataRowEnumerator.java
index 2f283f71b47..4bcf0b4c5f2 100644
--- 
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerator/MemoryRowEnumerator.java
+++ 
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/enumerator/memory/MemoryDataRowEnumerator.java
@@ -15,13 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.sqlfederation.executor.enumerator;
+package 
org.apache.shardingsphere.sqlfederation.executor.enumerable.enumerator.memory;
 
 import org.apache.calcite.linq4j.Enumerator;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereColumn;
 import org.apache.shardingsphere.infra.metadata.statistics.RowStatistics;
-import org.apache.shardingsphere.sqlfederation.executor.utils.EnumeratorUtils;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -29,9 +28,9 @@ import java.util.Iterator;
 import java.util.Map;
 
 /**
- * Memory row enumerator.
+ * Memory data row enumerator.
  */
-public final class MemoryRowEnumerator implements Enumerator<Object> {
+public final class MemoryDataRowEnumerator implements Enumerator<Object> {
     
     private final Collection<RowStatistics> rows;
     
@@ -41,9 +40,9 @@ public final class MemoryRowEnumerator implements 
Enumerator<Object> {
     
     private Object current;
     
-    public MemoryRowEnumerator(final Collection<RowStatistics> rows, final 
Collection<ShardingSphereColumn> columns, final DatabaseType databaseType) {
+    public MemoryDataRowEnumerator(final Collection<RowStatistics> rows, final 
Collection<ShardingSphereColumn> columns, final DatabaseType databaseType) {
         this.rows = rows;
-        columnTypes = EnumeratorUtils.createColumnTypes(new 
ArrayList<>(columns), databaseType);
+        columnTypes = MemoryDataTypeConverter.createColumnTypes(new 
ArrayList<>(columns), databaseType);
         iterator = rows.iterator();
     }
     
@@ -55,7 +54,7 @@ public final class MemoryRowEnumerator implements 
Enumerator<Object> {
     @Override
     public boolean moveNext() {
         if (iterator.hasNext()) {
-            current = EnumeratorUtils.convertToTargetType(columnTypes, 
iterator.next().getRows().toArray());
+            current = MemoryDataTypeConverter.convertToTargetType(columnTypes, 
iterator.next().getRows().toArray());
             return true;
         }
         current = null;
diff --git 
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/utils/EnumeratorUtils.java
 
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/enumerator/memory/MemoryDataTypeConverter.java
similarity index 96%
rename from 
kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/utils/EnumeratorUtils.java
rename to 
kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/enumerator/memory/MemoryDataTypeConverter.java
index c67c5485cb5..fc0ba4e164d 100644
--- 
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/utils/EnumeratorUtils.java
+++ 
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/enumerator/memory/MemoryDataTypeConverter.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.sqlfederation.executor.utils;
+package 
org.apache.shardingsphere.sqlfederation.executor.enumerable.enumerator.memory;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
@@ -35,7 +35,7 @@ import java.util.Optional;
  * Enumerator utilities.
  */
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class EnumeratorUtils {
+public final class MemoryDataTypeConverter {
     
     /**
      * Create column types.
diff --git 
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/utils/StatisticsAssembleUtils.java
 
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/enumerator/memory/MemoryTableStatisticsBuilder.java
similarity index 55%
rename from 
kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/utils/StatisticsAssembleUtils.java
rename to 
kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/enumerator/memory/MemoryTableStatisticsBuilder.java
index e638f61748e..28e374bcbf8 100644
--- 
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/utils/StatisticsAssembleUtils.java
+++ 
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/enumerator/memory/MemoryTableStatisticsBuilder.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.sqlfederation.executor.utils;
+package 
org.apache.shardingsphere.sqlfederation.executor.enumerable.enumerator.memory;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
@@ -31,61 +31,75 @@ import 
org.apache.shardingsphere.infra.metadata.user.Grantee;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.LinkedList;
 
 /**
- * Statistics assemble utils.
+ * Memory table statistics builder.
  */
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class StatisticsAssembleUtils {
+public final class MemoryTableStatisticsBuilder {
     
     /**
-     * Assemble table statistics.
+     * Build table statistics.
      *
      * @param table table
      * @param metaData meta data
      * @param driverQuerySystemCatalogOption driver query system catalog option
      * @return table statistics
      */
-    public static TableStatistics assembleTableStatistics(final 
ShardingSphereTable table, final ShardingSphereMetaData metaData,
-                                                          final 
DialectDriverQuerySystemCatalogOption driverQuerySystemCatalogOption) {
-        TableStatistics result = new TableStatistics(table.getName());
+    public static TableStatistics buildTableStatistics(final 
ShardingSphereTable table, final ShardingSphereMetaData metaData,
+                                                       final 
DialectDriverQuerySystemCatalogOption driverQuerySystemCatalogOption) {
         if 
(driverQuerySystemCatalogOption.isDatabaseDataTable(table.getName())) {
-            assembleDatabaseData(result, metaData.getAllDatabases(), 
driverQuerySystemCatalogOption.getDatCompatibility());
-        } else if 
(driverQuerySystemCatalogOption.isTableDataTable(table.getName())) {
-            for (ShardingSphereDatabase each : metaData.getAllDatabases()) {
-                assembleTableData(result, each.getAllSchemas());
-            }
-        } else if 
(driverQuerySystemCatalogOption.isRoleDataTable(table.getName())) {
-            assembleRoleData(result, metaData);
+            return buildDatabaseData(table.getName(), 
metaData.getAllDatabases(), 
driverQuerySystemCatalogOption.getDatCompatibility());
         }
-        return result;
+        if (driverQuerySystemCatalogOption.isTableDataTable(table.getName())) {
+            return buildTableData(table.getName(), metaData);
+        }
+        if (driverQuerySystemCatalogOption.isRoleDataTable(table.getName())) {
+            return buildRoleData(table.getName(), metaData);
+        }
+        return new TableStatistics(table.getName());
     }
     
-    private static void assembleDatabaseData(final TableStatistics 
tableStatistics, final Collection<ShardingSphereDatabase> databases, final 
String datCompatibility) {
+    private static TableStatistics buildDatabaseData(final String tableName, 
final Collection<ShardingSphereDatabase> databases, final String 
datCompatibility) {
+        TableStatistics result = new TableStatistics(tableName);
         for (ShardingSphereDatabase each : databases) {
             Object[] rows = new Object[15];
             rows[0] = each.getName();
             rows[11] = datCompatibility;
-            tableStatistics.getRows().add(new 
RowStatistics(Arrays.asList(rows)));
+            result.getRows().add(new RowStatistics(Arrays.asList(rows)));
         }
+        return result;
     }
     
-    private static void assembleTableData(final TableStatistics 
tableStatistics, final Collection<ShardingSphereSchema> schemas) {
+    private static TableStatistics buildTableData(final String tableName, 
final ShardingSphereMetaData metaData) {
+        TableStatistics result = new TableStatistics(tableName);
+        for (ShardingSphereDatabase each : metaData.getAllDatabases()) {
+            result.getRows().addAll(buildTableData(each.getAllSchemas()));
+        }
+        return result;
+    }
+    
+    private static Collection<RowStatistics> buildTableData(final 
Collection<ShardingSphereSchema> schemas) {
+        Collection<RowStatistics> result = new LinkedList<>();
         for (ShardingSphereSchema schema : schemas) {
             for (ShardingSphereTable each : schema.getAllTables()) {
                 Object[] rows = new Object[10];
                 rows[0] = schema.getName();
                 rows[1] = each.getName();
-                tableStatistics.getRows().add(new 
RowStatistics(Arrays.asList(rows)));
+                result.add(new RowStatistics(Arrays.asList(rows)));
             }
         }
+        return result;
     }
     
-    private static void assembleRoleData(final TableStatistics 
tableStatistics, final ShardingSphereMetaData metaData) {
+    private static TableStatistics buildRoleData(final String tableName, final 
ShardingSphereMetaData metaData) {
+        TableStatistics result = new TableStatistics(tableName);
         for (Grantee each : 
metaData.getGlobalRuleMetaData().getSingleRule(AuthorityRule.class).getGrantees())
 {
             Object[] rows = new Object[27];
             rows[0] = each.getUsername();
-            tableStatistics.getRows().add(new 
RowStatistics(Arrays.asList(rows)));
+            result.getRows().add(new RowStatistics(Arrays.asList(rows)));
         }
+        return result;
     }
 }
diff --git 
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutor.java
 
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/implementor/EnumerableScanImplementor.java
similarity index 70%
rename from 
kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutor.java
rename to 
kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/implementor/EnumerableScanImplementor.java
index 6e441ce3aed..7d569fd774c 100644
--- 
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutor.java
+++ 
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/implementor/EnumerableScanImplementor.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.sqlfederation.executor.enumerable;
+package 
org.apache.shardingsphere.sqlfederation.executor.enumerable.implementor;
 
 import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
@@ -38,11 +38,7 @@ import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupRepor
 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
-import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
 import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
 import org.apache.shardingsphere.infra.hint.HintValueContext;
@@ -50,23 +46,19 @@ import org.apache.shardingsphere.infra.merge.MergeEngine;
 import org.apache.shardingsphere.infra.merge.result.MergedResult;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
-import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
 import org.apache.shardingsphere.infra.metadata.statistics.TableStatistics;
 import org.apache.shardingsphere.infra.session.query.QueryContext;
 import 
org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatement;
-import 
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
-import 
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationExecutorContext;
-import 
org.apache.shardingsphere.sqlfederation.executor.enumerator.JDBCRowEnumerator;
-import 
org.apache.shardingsphere.sqlfederation.executor.enumerator.MemoryRowEnumerator;
-import 
org.apache.shardingsphere.sqlfederation.executor.utils.StatisticsAssembleUtils;
 import 
org.apache.shardingsphere.sqlfederation.compiler.context.CompilerContext;
-import 
org.apache.shardingsphere.sqlfederation.compiler.metadata.schema.table.EmptyRowEnumerator;
-import 
org.apache.shardingsphere.sqlfederation.compiler.metadata.schema.table.ScanExecutor;
-import 
org.apache.shardingsphere.sqlfederation.compiler.metadata.schema.table.ScanExecutorContext;
+import 
org.apache.shardingsphere.sqlfederation.compiler.implementor.ScanImplementor;
+import 
org.apache.shardingsphere.sqlfederation.compiler.implementor.ScanImplementorContext;
+import 
org.apache.shardingsphere.sqlfederation.compiler.implementor.enumerator.EmptyDataRowEnumerator;
+import 
org.apache.shardingsphere.sqlfederation.executor.context.ExecutorContext;
+import 
org.apache.shardingsphere.sqlfederation.executor.enumerable.enumerator.jdbc.JDBCDataRowEnumerator;
+import 
org.apache.shardingsphere.sqlfederation.executor.enumerable.enumerator.memory.MemoryDataRowEnumerator;
+import 
org.apache.shardingsphere.sqlfederation.executor.enumerable.enumerator.memory.MemoryTableStatisticsBuilder;
 
-import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -79,42 +71,32 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 
 /**
- * Enumerable scan executor.
+ * Enumerable scan implementor.
  */
 @RequiredArgsConstructor
-public final class EnumerableScanExecutor implements ScanExecutor {
+public final class EnumerableScanImplementor implements ScanImplementor {
     
-    private final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
prepareEngine;
-    
-    private final JDBCExecutor jdbcExecutor;
-    
-    private final JDBCExecutorCallback<? extends ExecuteResult> callback;
+    private final QueryContext queryContext;
     
     private final CompilerContext compilerContext;
     
-    private final SQLFederationExecutorContext executorContext;
-    
-    private final SQLFederationContext federationContext;
-    
-    private final RuleMetaData globalRuleMetaData;
-    
-    private final ShardingSphereStatistics statistics;
+    private final ExecutorContext executorContext;
     
     private final ProcessEngine processEngine = new ProcessEngine();
     
     @Override
-    public Enumerable<Object> execute(final ShardingSphereTable table, final 
ScanExecutorContext scanContext) {
-        SQLStatementContext sqlStatementContext = 
federationContext.getQueryContext().getSqlStatementContext();
+    public Enumerable<Object> implement(final ShardingSphereTable table, final 
ScanImplementorContext scanContext) {
+        SQLStatementContext sqlStatementContext = 
queryContext.getSqlStatementContext();
         if (containsSystemSchema(sqlStatementContext)) {
             return createMemoryEnumerable(sqlStatementContext, table);
         }
-        QueryContext queryContext = 
createQueryContext(federationContext.getMetaData(), scanContext, 
sqlStatementContext.getDatabaseType(), 
federationContext.getQueryContext().isUseCache());
-        ExecutionContext executionContext = new 
KernelProcessor().generateExecutionContext(queryContext, globalRuleMetaData, 
executorContext.getProps());
-        if (federationContext.isPreview()) {
-            
federationContext.getPreviewExecutionUnits().addAll(executionContext.getExecutionUnits());
+        QueryContext scanQueryContext = 
createQueryContext(queryContext.getMetaData(), scanContext, 
sqlStatementContext.getDatabaseType(), queryContext.isUseCache());
+        ExecutionContext executionContext = new 
KernelProcessor().generateExecutionContext(scanQueryContext, 
queryContext.getMetaData().getGlobalRuleMetaData(), 
queryContext.getMetaData().getProps());
+        if (executorContext.isPreview()) {
+            
executorContext.getPreviewExecutionUnits().addAll(executionContext.getExecutionUnits());
             return createEmptyEnumerable();
         }
-        return createJDBCEnumerable(queryContext, 
federationContext.getMetaData().getDatabase(executorContext.getCurrentDatabaseName()),
 executionContext);
+        return createJDBCEnumerable(scanQueryContext, 
queryContext.getMetaData().getDatabase(executorContext.getCurrentDatabaseName()),
 executionContext);
     }
     
     private boolean containsSystemSchema(final SQLStatementContext 
sqlStatementContext) {
@@ -128,29 +110,33 @@ public final class EnumerableScanExecutor implements 
ScanExecutor {
         return false;
     }
     
-    private AbstractEnumerable<Object> createJDBCEnumerable(final QueryContext 
queryContext, final ShardingSphereDatabase database, final ExecutionContext 
context) {
+    private AbstractEnumerable<Object> createJDBCEnumerable(final QueryContext 
queryContext, final ShardingSphereDatabase database, final ExecutionContext 
executionContext) {
         return new AbstractEnumerable<Object>() {
             
             @SneakyThrows
             @Override
             public Enumerator<Object> enumerator() {
-                computeConnectionOffsets(context);
-                // TODO pass grantee from proxy and jdbc adapter
-                ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext 
= prepareEngine.prepare(
-                        database.getName(), context.getRouteContext(), 
executorContext.getConnectionOffsets(),
-                        context.getExecutionUnits(), new 
ExecutionGroupReportContext(federationContext.getProcessId(), 
database.getName()));
+                computeConnectionOffsets(executionContext);
+                ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext 
= prepare(database, executionContext);
                 setParameters(executionGroupContext.getInputGroups());
-                
ShardingSpherePreconditions.checkState(!ProcessRegistry.getInstance().get(federationContext.getProcessId()).isInterrupted(),
 SQLExecutionInterruptedException::new);
-                processEngine.executeSQL(executionGroupContext, 
federationContext.getQueryContext());
-                List<QueryResult> queryResults = 
jdbcExecutor.execute(executionGroupContext, 
callback).stream().map(QueryResult.class::cast).collect(Collectors.toList());
-                MergeEngine mergeEngine = new 
MergeEngine(federationContext.getMetaData(), database, 
executorContext.getProps(), 
federationContext.getQueryContext().getConnectionContext());
+                
ShardingSpherePreconditions.checkState(!ProcessRegistry.getInstance().get(executorContext.getProcessId()).isInterrupted(),
 SQLExecutionInterruptedException::new);
+                processEngine.executeSQL(executionGroupContext, queryContext);
+                List<QueryResult> queryResults =
+                        
executorContext.getJdbcExecutor().execute(executionGroupContext, 
executorContext.getCallback()).stream().map(QueryResult.class::cast).collect(Collectors.toList());
+                MergeEngine mergeEngine = new 
MergeEngine(queryContext.getMetaData(), database, 
queryContext.getMetaData().getProps(), queryContext.getConnectionContext());
                 MergedResult mergedResult = mergeEngine.merge(queryResults, 
queryContext.getSqlStatementContext());
                 Collection<Statement> statements = 
getStatements(executionGroupContext.getInputGroups());
-                return new JDBCRowEnumerator(mergedResult, 
queryResults.get(0).getMetaData(), statements);
+                return new JDBCDataRowEnumerator(mergedResult, 
queryResults.get(0).getMetaData(), statements);
             }
         };
     }
     
+    private ExecutionGroupContext<JDBCExecutionUnit> prepare(final 
ShardingSphereDatabase database, final ExecutionContext executionContext) 
throws SQLException {
+        // TODO pass grantee from proxy and jdbc adapter
+        return executorContext.getPrepareEngine().prepare(database.getName(), 
executionContext.getRouteContext(), executorContext.getConnectionOffsets(), 
executionContext.getExecutionUnits(),
+                new 
ExecutionGroupReportContext(executorContext.getProcessId(), 
database.getName()));
+    }
+    
     private void computeConnectionOffsets(final ExecutionContext context) {
         for (ExecutionUnit each : context.getExecutionUnits()) {
             if 
(executorContext.getConnectionOffsets().containsKey(each.getDataSourceName())) {
@@ -166,13 +152,13 @@ public final class EnumerableScanExecutor implements 
ScanExecutor {
         DatabaseType databaseType = sqlStatementContext.getDatabaseType();
         Optional<DialectDriverQuerySystemCatalogOption> 
driverQuerySystemCatalogOption = new 
DatabaseTypeRegistry(databaseType).getDialectDatabaseMetaData().getDriverQuerySystemCatalogOption();
         if (driverQuerySystemCatalogOption.isPresent() && 
driverQuerySystemCatalogOption.get().isSystemTable(table.getName())) {
-            return 
createMemoryEnumerator(StatisticsAssembleUtils.assembleTableStatistics(table, 
federationContext.getMetaData(), driverQuerySystemCatalogOption.get()), table, 
databaseType);
+            return 
createMemoryEnumerator(MemoryTableStatisticsBuilder.buildTableStatistics(table, 
queryContext.getMetaData(), driverQuerySystemCatalogOption.get()), table, 
databaseType);
         }
         ShardingSpherePreconditions.checkState(sqlStatementContext instanceof 
TableAvailable,
                 () -> new IllegalStateException(String.format("Can not support 
%s in sql federation", 
sqlStatementContext.getSqlStatement().getClass().getSimpleName())));
         String databaseName = ((TableAvailable) 
sqlStatementContext).getTablesContext().getDatabaseName().orElse(executorContext.getCurrentDatabaseName());
         String schemaName = ((TableAvailable) 
sqlStatementContext).getTablesContext().getSchemaName().orElse(executorContext.getCurrentSchemaName());
-        Optional<TableStatistics> tableStatistics = 
Optional.ofNullable(statistics.getDatabaseStatistics(databaseName))
+        Optional<TableStatistics> tableStatistics = 
Optional.ofNullable(executorContext.getStatistics().getDatabaseStatistics(databaseName))
                 .map(optional -> 
optional.getSchemaStatistics(schemaName)).map(optional -> 
optional.getTableStatistics(table.getName()));
         return tableStatistics.map(optional -> 
createMemoryEnumerator(optional, table, 
databaseType)).orElseGet(this::createEmptyEnumerable);
     }
@@ -182,7 +168,7 @@ public final class EnumerableScanExecutor implements 
ScanExecutor {
             
             @Override
             public Enumerator<Object> enumerator() {
-                return new MemoryRowEnumerator(tableStatistics.getRows(), 
table.getAllColumns(), databaseType);
+                return new MemoryDataRowEnumerator(tableStatistics.getRows(), 
table.getAllColumns(), databaseType);
             }
         };
     }
@@ -215,13 +201,13 @@ public final class EnumerableScanExecutor implements 
ScanExecutor {
         }
     }
     
-    private QueryContext createQueryContext(final ShardingSphereMetaData 
metaData, final ScanExecutorContext sqlString, final DatabaseType databaseType, 
final boolean useCache) {
+    private QueryContext createQueryContext(final ShardingSphereMetaData 
metaData, final ScanImplementorContext sqlString, final DatabaseType 
databaseType, final boolean useCache) {
         String sql = sqlString.getSql().replace(System.lineSeparator(), " ");
         SQLStatement sqlStatement = 
compilerContext.getSqlParserRule().getSQLParserEngine(databaseType).parse(sql, 
useCache);
         List<Object> params = getParameters(sqlString.getParamIndexes());
         HintValueContext hintValueContext = new HintValueContext();
         SQLStatementContext sqlStatementContext = new SQLBindEngine(metaData, 
executorContext.getCurrentDatabaseName(), hintValueContext).bind(sqlStatement, 
params);
-        return new QueryContext(sqlStatementContext, sql, params, 
hintValueContext, federationContext.getQueryContext().getConnectionContext(), 
metaData, useCache);
+        return new QueryContext(sqlStatementContext, sql, params, 
hintValueContext, queryContext.getConnectionContext(), metaData, useCache);
     }
     
     private List<Object> getParameters(final int[] paramIndexes) {
@@ -230,7 +216,7 @@ public final class EnumerableScanExecutor implements 
ScanExecutor {
         }
         List<Object> result = new ArrayList<>(paramIndexes.length);
         for (int each : paramIndexes) {
-            
result.add(federationContext.getQueryContext().getParameters().get(each));
+            result.add(queryContext.getParameters().get(each));
         }
         return result;
     }
@@ -240,7 +226,7 @@ public final class EnumerableScanExecutor implements 
ScanExecutor {
             
             @Override
             public Enumerator<Object> enumerator() {
-                return new EmptyRowEnumerator();
+                return new EmptyDataRowEnumerator();
             }
         };
     }
diff --git 
a/kernel/sql-federation/executor/src/test/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutorTest.java
 
b/kernel/sql-federation/executor/src/test/java/org/apache/shardingsphere/sqlfederation/executor/executor/EnumerableScanImplementorTest.java
similarity index 77%
rename from 
kernel/sql-federation/executor/src/test/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutorTest.java
rename to 
kernel/sql-federation/executor/src/test/java/org/apache/shardingsphere/sqlfederation/executor/executor/EnumerableScanImplementorTest.java
index 941bd0f7b1f..a7dd882a505 100644
--- 
a/kernel/sql-federation/executor/src/test/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutorTest.java
+++ 
b/kernel/sql-federation/executor/src/test/java/org/apache/shardingsphere/sqlfederation/executor/executor/EnumerableScanImplementorTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.sqlfederation.executor.enumerable;
+package org.apache.shardingsphere.sqlfederation.executor.executor;
 
 import org.apache.calcite.linq4j.Enumerable;
 import org.apache.calcite.linq4j.Enumerator;
@@ -28,11 +28,12 @@ import 
org.apache.shardingsphere.infra.metadata.statistics.RowStatistics;
 import org.apache.shardingsphere.infra.metadata.statistics.SchemaStatistics;
 import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
 import org.apache.shardingsphere.infra.metadata.statistics.TableStatistics;
+import org.apache.shardingsphere.infra.session.query.QueryContext;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
-import 
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
-import 
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationExecutorContext;
 import 
org.apache.shardingsphere.sqlfederation.compiler.context.CompilerContext;
-import 
org.apache.shardingsphere.sqlfederation.compiler.metadata.schema.table.ScanExecutorContext;
+import 
org.apache.shardingsphere.sqlfederation.compiler.implementor.ScanImplementorContext;
+import 
org.apache.shardingsphere.sqlfederation.executor.context.ExecutorContext;
+import 
org.apache.shardingsphere.sqlfederation.executor.enumerable.implementor.EnumerableScanImplementor;
 import org.junit.jupiter.api.Test;
 
 import java.sql.Types;
@@ -46,30 +47,23 @@ import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-class EnumerableScanExecutorTest {
+class EnumerableScanImplementorTest {
     
     @Test
-    void assertExecuteWithStatistics() {
+    void assertImplementWithStatistics() {
         CompilerContext compilerContext = mock(CompilerContext.class, 
RETURNS_DEEP_STUBS);
-        SQLFederationExecutorContext executorContext = 
mock(SQLFederationExecutorContext.class);
+        ExecutorContext executorContext = mock(ExecutorContext.class);
         when(executorContext.getCurrentDatabaseName()).thenReturn("foo_db");
         when(executorContext.getCurrentSchemaName()).thenReturn("pg_catalog");
-        ShardingSphereStatistics statistics = 
mock(ShardingSphereStatistics.class, RETURNS_DEEP_STUBS);
-        DatabaseStatistics databaseStatistics = mock(DatabaseStatistics.class, 
RETURNS_DEEP_STUBS);
-        
when(statistics.getDatabaseStatistics("foo_db")).thenReturn(databaseStatistics);
-        SchemaStatistics schemaStatistics = mock(SchemaStatistics.class, 
RETURNS_DEEP_STUBS);
-        
when(databaseStatistics.getSchemaStatistics("pg_catalog")).thenReturn(schemaStatistics);
-        TableStatistics tableStatistics = mock(TableStatistics.class);
-        
when(tableStatistics.getRows()).thenReturn(Collections.singletonList(new 
RowStatistics(Collections.singletonList(1))));
-        
when(schemaStatistics.getTableStatistics("test")).thenReturn(tableStatistics);
+        ShardingSphereStatistics statistics = mockStatistics();
+        when(executorContext.getStatistics()).thenReturn(statistics);
         ShardingSphereTable table = mock(ShardingSphereTable.class, 
RETURNS_DEEP_STUBS);
         when(table.getName()).thenReturn("test");
         when(table.getAllColumns()).thenReturn(Collections.singleton(new 
ShardingSphereColumn("id", Types.INTEGER, true, false, false, false, true, 
false)));
-        SQLFederationContext federationContext = 
mock(SQLFederationContext.class, RETURNS_DEEP_STUBS);
+        QueryContext queryContext = mock(QueryContext.class, 
RETURNS_DEEP_STUBS);
         SelectStatementContext selectStatementContext = 
mockSelectStatementContext();
-        
when(federationContext.getQueryContext().getSqlStatementContext()).thenReturn(selectStatementContext);
-        Enumerable<Object> enumerable =
-                new EnumerableScanExecutor(null, null, null, compilerContext, 
executorContext, federationContext, null, statistics).execute(table, 
mock(ScanExecutorContext.class));
+        
when(queryContext.getSqlStatementContext()).thenReturn(selectStatementContext);
+        Enumerable<Object> enumerable = new 
EnumerableScanImplementor(queryContext, compilerContext, 
executorContext).implement(table, mock(ScanImplementorContext.class));
         try (Enumerator<Object> actual = enumerable.enumerator()) {
             actual.moveNext();
             Object row = actual.current();
@@ -78,6 +72,18 @@ class EnumerableScanExecutorTest {
         }
     }
     
+    private ShardingSphereStatistics mockStatistics() {
+        ShardingSphereStatistics result = mock(ShardingSphereStatistics.class, 
RETURNS_DEEP_STUBS);
+        DatabaseStatistics databaseStatistics = mock(DatabaseStatistics.class, 
RETURNS_DEEP_STUBS);
+        
when(result.getDatabaseStatistics("foo_db")).thenReturn(databaseStatistics);
+        SchemaStatistics schemaStatistics = mock(SchemaStatistics.class, 
RETURNS_DEEP_STUBS);
+        
when(databaseStatistics.getSchemaStatistics("pg_catalog")).thenReturn(schemaStatistics);
+        TableStatistics tableStatistics = mock(TableStatistics.class);
+        
when(tableStatistics.getRows()).thenReturn(Collections.singletonList(new 
RowStatistics(Collections.singletonList(1))));
+        
when(schemaStatistics.getTableStatistics("test")).thenReturn(tableStatistics);
+        return result;
+    }
+    
     private SelectStatementContext mockSelectStatementContext() {
         SelectStatementContext result = mock(SelectStatementContext.class, 
RETURNS_DEEP_STUBS);
         
when(result.getDatabaseType()).thenReturn(TypedSPILoader.getService(DatabaseType.class,
 "PostgreSQL"));
diff --git 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/StandardDatabaseConnector.java
 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/StandardDatabaseConnector.java
index 2304b7dc529..9899928c731 100644
--- 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/StandardDatabaseConnector.java
+++ 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/StandardDatabaseConnector.java
@@ -77,7 +77,7 @@ import 
org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatemen
 import 
org.apache.shardingsphere.sql.parser.statement.core.statement.dml.DMLStatement;
 import 
org.apache.shardingsphere.sql.parser.statement.core.statement.dml.InsertStatement;
 import 
org.apache.shardingsphere.sql.parser.statement.core.statement.dml.SelectStatement;
-import 
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
+import org.apache.shardingsphere.sqlfederation.context.SQLFederationContext;
 import org.apache.shardingsphere.transaction.api.TransactionType;
 import 
org.apache.shardingsphere.transaction.implicit.ImplicitTransactionCallback;
 
diff --git 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/PreviewExecutor.java
 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/PreviewExecutor.java
index 736588e0a0b..2ce102b6ef5 100644
--- 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/PreviewExecutor.java
+++ 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/PreviewExecutor.java
@@ -58,7 +58,7 @@ import 
org.apache.shardingsphere.proxy.backend.connector.ProxyDatabaseConnection
 import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
 import 
org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatement;
 import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine;
-import 
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
+import org.apache.shardingsphere.sqlfederation.context.SQLFederationContext;
 
 import java.sql.Connection;
 import java.sql.SQLException;
diff --git 
a/proxy/backend/type/opengauss/src/main/java/org/apache/shardingsphere/proxy/backend/opengauss/handler/admin/OpenGaussSystemCatalogAdminQueryExecutor.java
 
b/proxy/backend/type/opengauss/src/main/java/org/apache/shardingsphere/proxy/backend/opengauss/handler/admin/OpenGaussSystemCatalogAdminQueryExecutor.java
index ac11c30a7e6..90f194d7a49 100644
--- 
a/proxy/backend/type/opengauss/src/main/java/org/apache/shardingsphere/proxy/backend/opengauss/handler/admin/OpenGaussSystemCatalogAdminQueryExecutor.java
+++ 
b/proxy/backend/type/opengauss/src/main/java/org/apache/shardingsphere/proxy/backend/opengauss/handler/admin/OpenGaussSystemCatalogAdminQueryExecutor.java
@@ -48,7 +48,7 @@ import 
org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
 import 
org.apache.shardingsphere.sharding.merge.common.IteratorStreamMergedResult;
 import 
org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatement;
 import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine;
-import 
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
+import org.apache.shardingsphere.sqlfederation.context.SQLFederationContext;
 
 import java.sql.Connection;
 import java.sql.ResultSet;

Reply via email to