This is an automated email from the ASF dual-hosted git repository.

tuichenchuxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 40e8a0abb54 Replace Programs.standard() with RBO and CBO optimize 
logic (#19624)
40e8a0abb54 is described below

commit 40e8a0abb54245a45841c6f47529afaf2b1ccaf6
Author: Zhengqiang Duan <[email protected]>
AuthorDate: Thu Jul 28 13:33:43 2022 +0800

    Replace Programs.standard() with RBO and CBO optimize logic (#19624)
---
 .../optimizer/ShardingSphereOptimizer.java         |  21 +++--
 .../context/planner/OptimizerPlannerContext.java   |   3 +
 .../planner/OptimizerPlannerContextFactory.java    |   7 +-
 .../planner/QueryOptimizePlannerFactory.java       |  56 +++++++++++-
 .../optimizer/ShardingSphereOptimizerTest.java     | 101 ++++++++++++---------
 .../rulealtered/RuleAlteredJobPreparer.java        |   2 +-
 6 files changed, 129 insertions(+), 61 deletions(-)

diff --git 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/ShardingSphereOptimizer.java
 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/ShardingSphereOptimizer.java
index 4dcb7e2acad..1c1e43af0ba 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/ShardingSphereOptimizer.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/ShardingSphereOptimizer.java
@@ -20,19 +20,14 @@ package 
org.apache.shardingsphere.infra.federation.optimizer;
 import lombok.RequiredArgsConstructor;
 import org.apache.calcite.adapter.enumerable.EnumerableConvention;
 import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelRoot;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql2rel.SqlToRelConverter;
-import org.apache.calcite.tools.Programs;
 import org.apache.shardingsphere.infra.exception.ShardingSphereException;
 import 
org.apache.shardingsphere.infra.federation.optimizer.context.OptimizerContext;
 import 
org.apache.shardingsphere.infra.federation.optimizer.converter.SQLNodeConverterEngine;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 
-import java.util.Collections;
-
 /**
  * ShardingSphere optimizer.
  */
@@ -53,16 +48,22 @@ public final class ShardingSphereOptimizer {
         try {
             SqlToRelConverter converter = 
context.getPlannerContexts().get(databaseName).getConverters().get(schemaName);
             SqlNode sqlNode = 
SQLNodeConverterEngine.convertToSQLNode(sqlStatement);
-            RelRoot relRoot = converter.convertQuery(sqlNode, true, true);
-            return optimize(converter, relRoot);
+            RelNode logicPlan = converter.convertQuery(sqlNode, true, 
true).rel;
+            RelNode bestPlan = optimizeWithRBO(logicPlan, 
context.getPlannerContexts().get(databaseName).getHepPlanners().get(schemaName));
+            return optimizeWithCBO(bestPlan, converter);
         } catch (final UnsupportedOperationException ex) {
             throw new ShardingSphereException(ex);
         }
     }
     
-    private RelNode optimize(final SqlToRelConverter converter, final RelRoot 
relRoot) {
+    private static RelNode optimizeWithRBO(final RelNode logicPlan, final 
RelOptPlanner hepPlanner) {
+        hepPlanner.setRoot(logicPlan);
+        return hepPlanner.findBestExp();
+    }
+    
+    private RelNode optimizeWithCBO(final RelNode bestPlan, final 
SqlToRelConverter converter) {
         RelOptPlanner planner = converter.getCluster().getPlanner();
-        RelTraitSet desiredTraitSet = 
relRoot.rel.getTraitSet().replace(EnumerableConvention.INSTANCE).simplify();
-        return Programs.standard().run(planner, relRoot.rel, desiredTraitSet, 
Collections.emptyList(), Collections.emptyList());
+        planner.setRoot(planner.changeTraits(bestPlan, 
converter.getCluster().traitSet().replace(EnumerableConvention.INSTANCE)));
+        return planner.findBestExp();
     }
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/planner/OptimizerPlannerContext.java
 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/planner/OptimizerPlannerContext.java
index bfc7a1cf955..fa7647cffd7 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/planner/OptimizerPlannerContext.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/planner/OptimizerPlannerContext.java
@@ -19,6 +19,7 @@ package 
org.apache.shardingsphere.infra.federation.optimizer.context.planner;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
+import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.sql.validate.SqlValidator;
 import org.apache.calcite.sql2rel.SqlToRelConverter;
 
@@ -34,4 +35,6 @@ public final class OptimizerPlannerContext {
     private final Map<String, SqlValidator> validators;
     
     private final Map<String, SqlToRelConverter> converters;
+    
+    private final Map<String, RelOptPlanner> hepPlanners;
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/planner/OptimizerPlannerContextFactory.java
 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/planner/OptimizerPlannerContextFactory.java
index ca257721938..ea0875e12f1 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/planner/OptimizerPlannerContextFactory.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/planner/OptimizerPlannerContextFactory.java
@@ -25,6 +25,7 @@ import org.apache.calcite.config.CalciteConnectionProperty;
 import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
 import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelOptTable.ViewExpander;
 import org.apache.calcite.prepare.CalciteCatalogReader;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
@@ -77,6 +78,7 @@ public final class OptimizerPlannerContextFactory {
     public static OptimizerPlannerContext create(final 
FederationDatabaseMetaData databaseMetaData) {
         Map<String, SqlValidator> validators = new LinkedHashMap<>();
         Map<String, SqlToRelConverter> converters = new LinkedHashMap<>();
+        Map<String, RelOptPlanner> hepPlanners = new LinkedHashMap<>();
         FederationDatabase federationDatabase = new 
FederationDatabase(databaseMetaData);
         for (Entry<String, Schema> entry : 
federationDatabase.getSubSchemaMap().entrySet()) {
             CalciteConnectionConfig connectionConfig = new 
CalciteConnectionConfigImpl(createConnectionProperties());
@@ -86,8 +88,9 @@ public final class OptimizerPlannerContextFactory {
             SqlToRelConverter converter = createConverter(catalogReader, 
validator, relDataTypeFactory);
             validators.put(entry.getKey(), validator);
             converters.put(entry.getKey(), converter);
+            hepPlanners.put(entry.getKey(), 
QueryOptimizePlannerFactory.createHepPlanner());
         }
-        return new OptimizerPlannerContext(validators, converters);
+        return new OptimizerPlannerContext(validators, converters, 
hepPlanners);
     }
     
     private static Properties createConnectionProperties() {
@@ -115,7 +118,7 @@ public final class OptimizerPlannerContextFactory {
     private static SqlToRelConverter createConverter(final 
CalciteCatalogReader catalogReader, final SqlValidator validator, final 
RelDataTypeFactory relDataTypeFactory) {
         ViewExpander expander = (rowType, queryString, schemaPath, viewPath) 
-> null;
         Config converterConfig = 
SqlToRelConverter.config().withTrimUnusedFields(true);
-        RelOptCluster cluster = 
RelOptCluster.create(QueryOptimizePlannerFactory.newInstance(), new 
RexBuilder(relDataTypeFactory));
+        RelOptCluster cluster = 
RelOptCluster.create(QueryOptimizePlannerFactory.createVolcanoPlanner(), new 
RexBuilder(relDataTypeFactory));
         return new SqlToRelConverter(expander, validator, catalogReader, 
cluster, StandardConvertletTable.INSTANCE, converterConfig);
     }
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/planner/QueryOptimizePlannerFactory.java
 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/planner/QueryOptimizePlannerFactory.java
index ca6680517d9..bc763329503 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/planner/QueryOptimizePlannerFactory.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/planner/QueryOptimizePlannerFactory.java
@@ -22,27 +22,49 @@ import lombok.NoArgsConstructor;
 import org.apache.calcite.adapter.enumerable.EnumerableRules;
 import org.apache.calcite.plan.ConventionTraitDef;
 import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.hep.HepMatchOrder;
+import org.apache.calcite.plan.hep.HepPlanner;
+import org.apache.calcite.plan.hep.HepProgramBuilder;
 import org.apache.calcite.plan.volcano.VolcanoPlanner;
 import org.apache.calcite.rel.RelCollationTraitDef;
 import org.apache.calcite.rel.rules.CoreRules;
 
+import java.util.Collection;
+import java.util.LinkedList;
+
 /**
  * Query optimize planner factory.
  */
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 public final class QueryOptimizePlannerFactory {
     
+    private static final int DEFAULT_MATCH_LIMIT = 1024;
+    
     /**
-     * Create new instance of query optimize planner.
+     * Create new instance of volcano planner.
      *
-     * @return created instance
+     * @return volcano planner instance
      */
-    public static RelOptPlanner newInstance() {
+    public static RelOptPlanner createVolcanoPlanner() {
         RelOptPlanner result = new VolcanoPlanner();
         setUpRules(result);
         return result;
     }
     
+    /**
+     * Create new instance of hep planner.
+     *
+     * @return hep planner instance
+     */
+    public static RelOptPlanner createHepPlanner() {
+        HepProgramBuilder builder = new HepProgramBuilder();
+        
builder.addGroupBegin().addRuleCollection(getSubQueryRules()).addGroupEnd().addMatchOrder(HepMatchOrder.DEPTH_FIRST);
+        
builder.addGroupBegin().addRuleCollection(getFilterRules()).addGroupEnd().addMatchOrder(HepMatchOrder.BOTTOM_UP);
+        builder.addMatchLimit(DEFAULT_MATCH_LIMIT);
+        return new HepPlanner(builder.build());
+    }
+    
     private static void setUpRules(final RelOptPlanner planner) {
         planner.addRelTraitDef(ConventionTraitDef.INSTANCE);
         planner.addRelTraitDef(RelCollationTraitDef.INSTANCE);
@@ -55,5 +77,33 @@ public final class QueryOptimizePlannerFactory {
         planner.addRule(EnumerableRules.ENUMERABLE_TABLE_SCAN_RULE);
         planner.addRule(EnumerableRules.ENUMERABLE_AGGREGATE_RULE);
         planner.addRule(EnumerableRules.ENUMERABLE_FILTER_RULE);
+        planner.addRule(EnumerableRules.ENUMERABLE_CORRELATE_RULE);
+    }
+    
+    private static Collection<RelOptRule> getSubQueryRules() {
+        Collection<RelOptRule> result = new LinkedList<>();
+        result.add(CoreRules.FILTER_SUB_QUERY_TO_CORRELATE);
+        result.add(CoreRules.PROJECT_SUB_QUERY_TO_CORRELATE);
+        result.add(CoreRules.JOIN_SUB_QUERY_TO_CORRELATE);
+        return result;
+    }
+    
+    private static Collection<RelOptRule> getFilterRules() {
+        Collection<RelOptRule> result = new LinkedList<>();
+        result.add(CoreRules.FILTER_INTO_JOIN);
+        result.add(CoreRules.JOIN_CONDITION_PUSH);
+        result.add(CoreRules.SORT_JOIN_TRANSPOSE);
+        result.add(CoreRules.PROJECT_CORRELATE_TRANSPOSE);
+        result.add(CoreRules.FILTER_AGGREGATE_TRANSPOSE);
+        result.add(CoreRules.FILTER_PROJECT_TRANSPOSE);
+        result.add(CoreRules.FILTER_SET_OP_TRANSPOSE);
+        result.add(CoreRules.FILTER_PROJECT_TRANSPOSE);
+        result.add(CoreRules.FILTER_REDUCE_EXPRESSIONS);
+        result.add(CoreRules.PROJECT_REDUCE_EXPRESSIONS);
+        result.add(CoreRules.FILTER_MERGE);
+        result.add(CoreRules.PROJECT_CALC_MERGE);
+        result.add(CoreRules.JOIN_PUSH_EXPRESSIONS);
+        result.add(CoreRules.JOIN_PUSH_TRANSITIVE_PREDICATES);
+        return result;
     }
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/test/java/org/apache/shardingsphere/infra/federation/optimizer/ShardingSphereOptimizerTest.java
 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/test/java/org/apache/shardingsphere/infra/federation/optimizer/ShardingSphereOptimizerTest.java
index 7728aa2a11a..581790733c0 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/test/java/org/apache/shardingsphere/infra/federation/optimizer/ShardingSphereOptimizerTest.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/test/java/org/apache/shardingsphere/infra/federation/optimizer/ShardingSphereOptimizerTest.java
@@ -133,12 +133,14 @@ public final class ShardingSphereOptimizerTest {
         ShardingSphereSQLParserEngine sqlParserEngine = 
sqlParserRule.getSQLParserEngine(DatabaseTypeEngine.getTrunkDatabaseTypeName(new
 H2DatabaseType()));
         SQLStatement sqlStatement = 
sqlParserEngine.parse(SELECT_CROSS_JOIN_CONDITION, false);
         String actual = optimizer.optimize(databaseName, schemaName, 
sqlStatement).explain();
-        String expected = "EnumerableCalc(expr#0..4=[{inputs}], 
expr#5=[CAST($t3):INTEGER], expr#6=[13], expr#7=[=($t5, $t6)], 
proj#0..1=[{exprs}], user_id1=[$t3], $condition=[$t7])\n"
-                + "  EnumerableHashJoin(condition=[=($2, $4)], 
joinType=[inner])\n"
-                + "    EnumerableCalc(expr#0..2=[{inputs}], 
expr#3=[CAST($t1):VARCHAR], proj#0..1=[{exprs}], user_id0=[$t3])\n"
-                + "      EnumerableTableScan(table=[[federate_jdbc, 
t_order_federate]])\n"
-                + "    EnumerableCalc(expr#0..1=[{inputs}], 
expr#2=[CAST($t0):VARCHAR], user_id=[$t0], user_id0=[$t2])\n"
-                + "      EnumerableTableScan(table=[[federate_jdbc, 
t_user_info]])\n";
+        String expected = "EnumerableCalc(expr#0..4=[{inputs}], 
proj#0..1=[{exprs}], user_id0=[$t3])\n"
+                + "  EnumerableCalc(expr#0..6=[{inputs}], proj#0..2=[{exprs}], 
user_id1=[$t4], information=[$t5])\n"
+                + "    EnumerableHashJoin(condition=[=($3, $6)], 
joinType=[inner])\n"
+                + "      EnumerableCalc(expr#0..2=[{inputs}], 
expr#3=[CAST($t1):VARCHAR], proj#0..3=[{exprs}])\n"
+                + "        EnumerableTableScan(table=[[federate_jdbc, 
t_order_federate]])\n"
+                + "      EnumerableCalc(expr#0..1=[{inputs}], 
expr#2=[CAST($t0):VARCHAR], proj#0..2=[{exprs}])\n"
+                + "        EnumerableFilter(condition=[=(CAST($0):INTEGER, 
13)])\n"
+                + "          EnumerableTableScan(table=[[federate_jdbc, 
t_user_info]])\n";
         assertThat(actual, is(expected));
     }
     
@@ -147,8 +149,9 @@ public final class ShardingSphereOptimizerTest {
         ShardingSphereSQLParserEngine sqlParserEngine = 
sqlParserRule.getSQLParserEngine(DatabaseTypeEngine.getTrunkDatabaseTypeName(new
 H2DatabaseType()));
         SQLStatement sqlStatement = 
sqlParserEngine.parse(SELECT_WHERE_ALL_FIELDS, false);
         String actual = optimizer.optimize(databaseName, schemaName, 
sqlStatement).explain();
-        String expected = "EnumerableCalc(expr#0..1=[{inputs}], 
expr#2=[CAST($t0):INTEGER], expr#3=[12], expr#4=[=($t2, $t3)], 
proj#0..1=[{exprs}], $condition=[$t4])\n"
-                + "  EnumerableTableScan(table=[[federate_jdbc, 
t_user_info]])\n";
+        String expected = "EnumerableCalc(expr#0..1=[{inputs}], 
proj#0..1=[{exprs}])\n"
+                + "  EnumerableFilter(condition=[=(CAST($0):INTEGER, 12)])\n"
+                + "    EnumerableTableScan(table=[[federate_jdbc, 
t_user_info]])\n";
         assertThat(actual, is(expected));
     }
     
@@ -157,8 +160,9 @@ public final class ShardingSphereOptimizerTest {
         ShardingSphereSQLParserEngine sqlParserEngine = 
sqlParserRule.getSQLParserEngine(DatabaseTypeEngine.getTrunkDatabaseTypeName(new
 H2DatabaseType()));
         SQLStatement sqlStatement = 
sqlParserEngine.parse(SELECT_WHERE_SINGLE_FIELD, false);
         String actual = optimizer.optimize(databaseName, schemaName, 
sqlStatement).explain();
-        String expected = "EnumerableCalc(expr#0..1=[{inputs}], 
expr#2=[CAST($t0):INTEGER], expr#3=[12], expr#4=[=($t2, $t3)], user_id=[$t0], 
$condition=[$t4])\n"
-                + "  EnumerableTableScan(table=[[federate_jdbc, 
t_user_info]])\n";
+        String expected = "EnumerableCalc(expr#0..1=[{inputs}], 
user_id=[$t0])\n"
+                + "  EnumerableFilter(condition=[=(CAST($0):INTEGER, 12)])\n"
+                + "    EnumerableTableScan(table=[[federate_jdbc, 
t_user_info]])\n";
         assertThat(actual, is(expected));
     }
     
@@ -167,11 +171,13 @@ public final class ShardingSphereOptimizerTest {
         ShardingSphereSQLParserEngine sqlParserEngine = 
sqlParserRule.getSQLParserEngine(DatabaseTypeEngine.getTrunkDatabaseTypeName(new
 H2DatabaseType()));
         SQLStatement sqlStatement = sqlParserEngine.parse(SELECT_CROSS_WHERE, 
false);
         String actual = optimizer.optimize(databaseName, schemaName, 
sqlStatement).explain();
-        String expected = 
"EnumerableNestedLoopJoin(condition=[=(CAST($1):VARCHAR, CAST($2):VARCHAR)], 
joinType=[inner])\n"
-                + "  EnumerableCalc(expr#0..2=[{inputs}], 
proj#0..1=[{exprs}])\n"
-                + "    EnumerableTableScan(table=[[federate_jdbc, 
t_order_federate]])\n"
-                + "  EnumerableCalc(expr#0..1=[{inputs}], user_id=[$t0])\n"
-                + "    EnumerableTableScan(table=[[federate_jdbc, 
t_user_info]])\n";
+        String expected = "EnumerableCalc(expr#0..4=[{inputs}], 
proj#0..1=[{exprs}], user_id0=[$t3])\n"
+                + "  EnumerableCalc(expr#0..6=[{inputs}], proj#0..2=[{exprs}], 
user_id1=[$t4], information=[$t5])\n"
+                + "    EnumerableHashJoin(condition=[=($3, $6)], 
joinType=[inner])\n"
+                + "      EnumerableCalc(expr#0..2=[{inputs}], 
expr#3=[CAST($t1):VARCHAR], proj#0..3=[{exprs}])\n"
+                + "        EnumerableTableScan(table=[[federate_jdbc, 
t_order_federate]])\n"
+                + "      EnumerableCalc(expr#0..1=[{inputs}], 
expr#2=[CAST($t0):VARCHAR], proj#0..2=[{exprs}])\n"
+                + "        EnumerableTableScan(table=[[federate_jdbc, 
t_user_info]])\n";
         assertThat(actual, is(expected));
     }
     
@@ -180,11 +186,11 @@ public final class ShardingSphereOptimizerTest {
         ShardingSphereSQLParserEngine sqlParserEngine = 
sqlParserRule.getSQLParserEngine(DatabaseTypeEngine.getTrunkDatabaseTypeName(new
 H2DatabaseType()));
         SQLStatement sqlStatement = sqlParserEngine.parse(SELECT_CROSS_JOIN, 
false);
         String actual = optimizer.optimize(databaseName, schemaName, 
sqlStatement).explain();
-        String expected = "EnumerableCalc(expr#0..4=[{inputs}], 
proj#0..1=[{exprs}], user_id0=[$t3])\n"
-                + "  EnumerableHashJoin(condition=[=($2, $4)], 
joinType=[inner])\n"
-                + "    EnumerableCalc(expr#0..2=[{inputs}], 
expr#3=[CAST($t1):VARCHAR], proj#0..1=[{exprs}], user_id0=[$t3])\n"
+        String expected = "EnumerableCalc(expr#0..6=[{inputs}], 
proj#0..1=[{exprs}], user_id0=[$t4])\n"
+                + "  EnumerableHashJoin(condition=[=($3, $6)], 
joinType=[inner])\n"
+                + "    EnumerableCalc(expr#0..2=[{inputs}], 
expr#3=[CAST($t1):VARCHAR], proj#0..3=[{exprs}])\n"
                 + "      EnumerableTableScan(table=[[federate_jdbc, 
t_order_federate]])\n"
-                + "    EnumerableCalc(expr#0..1=[{inputs}], 
expr#2=[CAST($t0):VARCHAR], user_id=[$t0], user_id0=[$t2])\n"
+                + "    EnumerableCalc(expr#0..1=[{inputs}], 
expr#2=[CAST($t0):VARCHAR], proj#0..2=[{exprs}])\n"
                 + "      EnumerableTableScan(table=[[federate_jdbc, 
t_user_info]])\n";
         assertThat(actual, is(expected));
     }
@@ -194,11 +200,14 @@ public final class ShardingSphereOptimizerTest {
         ShardingSphereSQLParserEngine sqlParserEngine = 
sqlParserRule.getSQLParserEngine(DatabaseTypeEngine.getTrunkDatabaseTypeName(new
 H2DatabaseType()));
         SQLStatement sqlStatement = 
sqlParserEngine.parse(SELECT_CROSS_WHERE_CONDITION, false);
         String actual = optimizer.optimize(databaseName, schemaName, 
sqlStatement).explain();
-        String expected = 
"EnumerableNestedLoopJoin(condition=[=(CAST($1):VARCHAR, CAST($2):VARCHAR)], 
joinType=[inner])\n"
-                + "  EnumerableCalc(expr#0..2=[{inputs}], 
proj#0..1=[{exprs}])\n"
-                + "    EnumerableTableScan(table=[[federate_jdbc, 
t_order_federate]])\n"
-                + "  EnumerableCalc(expr#0..1=[{inputs}], 
expr#2=[CAST($t0):INTEGER], expr#3=[13], expr#4=[=($t2, $t3)], user_id=[$t0], 
$condition=[$t4])\n"
-                + "    EnumerableTableScan(table=[[federate_jdbc, 
t_user_info]])\n";
+        String expected = "EnumerableCalc(expr#0..4=[{inputs}], 
proj#0..1=[{exprs}], user_id0=[$t3])\n"
+                + "  EnumerableCalc(expr#0..6=[{inputs}], proj#0..2=[{exprs}], 
user_id1=[$t4], information=[$t5])\n"
+                + "    EnumerableHashJoin(condition=[=($3, $6)], 
joinType=[inner])\n"
+                + "      EnumerableCalc(expr#0..2=[{inputs}], 
expr#3=[CAST($t1):VARCHAR], proj#0..3=[{exprs}])\n"
+                + "        EnumerableTableScan(table=[[federate_jdbc, 
t_order_federate]])\n"
+                + "      EnumerableCalc(expr#0..1=[{inputs}], 
expr#2=[CAST($t0):VARCHAR], proj#0..2=[{exprs}])\n"
+                + "        EnumerableFilter(condition=[=(CAST($0):INTEGER, 
13)])\n"
+                + "          EnumerableTableScan(table=[[federate_jdbc, 
t_user_info]])\n";
         assertThat(actual, is(expected));
     }
     
@@ -207,8 +216,9 @@ public final class ShardingSphereOptimizerTest {
         ShardingSphereSQLParserEngine sqlParserEngine = 
sqlParserRule.getSQLParserEngine(DatabaseTypeEngine.getTrunkDatabaseTypeName(new
 H2DatabaseType()));
         SQLStatement sqlStatement = 
sqlParserEngine.parse(SELECT_SUBQUERY_FROM, false);
         String actual = optimizer.optimize(databaseName, schemaName, 
sqlStatement).explain();
-        String expected = "EnumerableCalc(expr#0..1=[{inputs}], 
expr#2=[CAST($t0):INTEGER], expr#3=[1], expr#4=[>($t2, $t3)], 
proj#0..1=[{exprs}], $condition=[$t4])\n"
-                + "  EnumerableTableScan(table=[[federate_jdbc, 
t_user_info]])\n";
+        String expected = "EnumerableCalc(expr#0..1=[{inputs}], 
proj#0..1=[{exprs}])\n"
+                + "  EnumerableFilter(condition=[>(CAST($0):INTEGER, 1)])\n"
+                + "    EnumerableTableScan(table=[[federate_jdbc, 
t_user_info]])\n";
         assertThat(actual, is(expected));
     }
     
@@ -217,13 +227,14 @@ public final class ShardingSphereOptimizerTest {
         ShardingSphereSQLParserEngine sqlParserEngine = 
sqlParserRule.getSQLParserEngine(DatabaseTypeEngine.getTrunkDatabaseTypeName(new
 H2DatabaseType()));
         SQLStatement sqlStatement = 
sqlParserEngine.parse(SELECT_SUBQUERY_WHERE_EXIST, false);
         String actual = optimizer.optimize(databaseName, schemaName, 
sqlStatement).explain();
-        String expected = "EnumerableCalc(expr#0..4=[{inputs}], expr#5=[IS NOT 
NULL($t4)], proj#0..1=[{exprs}], $condition=[$t5])\n"
-                + "  EnumerableHashJoin(condition=[=($2, $3)], 
joinType=[left])\n"
-                + "    EnumerableCalc(expr#0..2=[{inputs}], 
expr#3=[CAST($t1):VARCHAR], proj#0..1=[{exprs}], user_id0=[$t3])\n"
+        String expected = "EnumerableCalc(expr#0..3=[{inputs}], 
proj#0..1=[{exprs}])\n"
+                + "  EnumerableFilter(condition=[IS NOT NULL($3)])\n"
+                + "    EnumerableCorrelate(correlation=[$cor0], 
joinType=[left], requiredColumns=[{1}])\n"
                 + "      EnumerableTableScan(table=[[federate_jdbc, 
t_order_federate]])\n"
-                + "    EnumerableAggregate(group=[{0}], agg#0=[MIN($1)])\n"
-                + "      EnumerableCalc(expr#0..1=[{inputs}], 
expr#2=[CAST($t0):VARCHAR], expr#3=[true], expr#4=[IS NOT NULL($t2)], 
user_id0=[$t2], $f0=[$t3], $condition=[$t4])\n"
-                + "        EnumerableTableScan(table=[[federate_jdbc, 
t_user_info]])\n";
+                + "      EnumerableAggregate(group=[{}], agg#0=[MIN($0)])\n"
+                + "        EnumerableCalc(expr#0..1=[{inputs}], expr#2=[true], 
$f0=[$t2])\n"
+                + "          
EnumerableFilter(condition=[=(CAST($cor0.user_id):VARCHAR, 
CAST($0):VARCHAR)])\n"
+                + "            EnumerableTableScan(table=[[federate_jdbc, 
t_user_info]])\n";
         assertThat(actual, is(expected));
     }
     
@@ -232,10 +243,9 @@ public final class ShardingSphereOptimizerTest {
         ShardingSphereSQLParserEngine sqlParserEngine = 
sqlParserRule.getSQLParserEngine(DatabaseTypeEngine.getTrunkDatabaseTypeName(new
 H2DatabaseType()));
         SQLStatement sqlStatement = 
sqlParserEngine.parse(SELECT_SUBQUERY_WHERE_IN, false);
         String actual = optimizer.optimize(databaseName, schemaName, 
sqlStatement).explain();
-        String expected = "EnumerableCalc(expr#0..2=[{inputs}], 
proj#0..1=[{exprs}])\n"
-                + "  EnumerableHashJoin(condition=[=($1, $2)], 
joinType=[inner])\n"
-                + "    EnumerableCalc(expr#0..2=[{inputs}], 
proj#0..1=[{exprs}])\n"
-                + "      EnumerableTableScan(table=[[federate_jdbc, 
t_order_federate]])\n"
+        String expected = "EnumerableCalc(expr#0..3=[{inputs}], 
proj#0..1=[{exprs}])\n"
+                + "  EnumerableHashJoin(condition=[=($1, $3)], 
joinType=[inner])\n"
+                + "    EnumerableTableScan(table=[[federate_jdbc, 
t_order_federate]])\n"
                 + "    EnumerableAggregate(group=[{0}])\n"
                 + "      EnumerableCalc(expr#0..1=[{inputs}], user_id=[$t0])\n"
                 + "        EnumerableTableScan(table=[[federate_jdbc, 
t_user_info]])\n";
@@ -247,17 +257,18 @@ public final class ShardingSphereOptimizerTest {
         ShardingSphereSQLParserEngine sqlParserEngine = 
sqlParserRule.getSQLParserEngine(DatabaseTypeEngine.getTrunkDatabaseTypeName(new
 H2DatabaseType()));
         SQLStatement sqlStatement = 
sqlParserEngine.parse(SELECT_SUBQUERY_WHERE_BETWEEN, false);
         String actual = optimizer.optimize(databaseName, schemaName, 
sqlStatement).explain();
-        String expected = "EnumerableCalc(expr#0..3=[{inputs}], 
proj#0..1=[{exprs}])\n"
-                + "  EnumerableNestedLoopJoin(condition=[<=($1, $3)], 
joinType=[inner])\n"
-                + "    EnumerableNestedLoopJoin(condition=[>=($1, $2)], 
joinType=[inner])\n"
-                + "      EnumerableCalc(expr#0..2=[{inputs}], 
proj#0..1=[{exprs}])\n"
-                + "        EnumerableTableScan(table=[[federate_jdbc, 
t_order_federate]])\n"
+        String expected = "EnumerableCalc(expr#0..4=[{inputs}], 
proj#0..1=[{exprs}])\n"
+                + "  EnumerableNestedLoopJoin(condition=[<=($1, $4)], 
joinType=[inner])\n"
+                + "    EnumerableNestedLoopJoin(condition=[>=($1, $3)], 
joinType=[inner])\n"
+                + "      EnumerableTableScan(table=[[federate_jdbc, 
t_order_federate]])\n"
                 + "      EnumerableAggregate(group=[{}], 
agg#0=[SINGLE_VALUE($0)])\n"
-                + "        EnumerableCalc(expr#0..1=[{inputs}], 
expr#2=[CAST($t1):VARCHAR], expr#3=['before':VARCHAR], expr#4=[=($t2, $t3)], 
user_id=[$t0], $condition=[$t4])\n"
-                + "          EnumerableTableScan(table=[[federate_jdbc, 
t_user_info]])\n"
+                + "        EnumerableCalc(expr#0..1=[{inputs}], 
user_id=[$t0])\n"
+                + "          EnumerableFilter(condition=[=(CAST($1):VARCHAR, 
'before')])\n"
+                + "            EnumerableTableScan(table=[[federate_jdbc, 
t_user_info]])\n"
                 + "    EnumerableAggregate(group=[{}], 
agg#0=[SINGLE_VALUE($0)])\n"
-                + "      EnumerableCalc(expr#0..1=[{inputs}], 
expr#2=[CAST($t1):VARCHAR], expr#3=['after':VARCHAR], expr#4=[=($t2, $t3)], 
user_id=[$t0], $condition=[$t4])\n"
-                + "        EnumerableTableScan(table=[[federate_jdbc, 
t_user_info]])\n";
+                + "      EnumerableCalc(expr#0..1=[{inputs}], user_id=[$t0])\n"
+                + "        EnumerableFilter(condition=[=(CAST($1):VARCHAR, 
'after')])\n"
+                + "          EnumerableTableScan(table=[[federate_jdbc, 
t_user_info]])\n";
         assertThat(actual, is(expected));
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index 288d1cf0b85..6c56ec6179c 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -152,7 +152,7 @@ public final class RuleAlteredJobPreparer {
                 jobContext.getDataSourceManager(), 
jobConfig.getTablesFirstDataNodes(), tableNameSchemaNameMapping);
         
dataSourcePreparer.get().prepareTargetTables(prepareTargetTablesParameter);
     }
-
+    
     private boolean isSourceAndTargetSchemaAvailable(final 
RuleAlteredJobConfiguration jobConfig) {
         DatabaseType sourceDatabaseType = 
DatabaseTypeFactory.getInstance(jobConfig.getSourceDatabaseType());
         DatabaseType targetDatabaseType = 
DatabaseTypeFactory.getInstance(jobConfig.getTargetDatabaseType());

Reply via email to