This is an automated email from the ASF dual-hosted git repository.
tuichenchuxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 40e8a0abb54 Replace Programs.standard() with RBO and CBO optimize
logic (#19624)
40e8a0abb54 is described below
commit 40e8a0abb54245a45841c6f47529afaf2b1ccaf6
Author: Zhengqiang Duan <[email protected]>
AuthorDate: Thu Jul 28 13:33:43 2022 +0800
Replace Programs.standard() with RBO and CBO optimize logic (#19624)
---
.../optimizer/ShardingSphereOptimizer.java | 21 +++--
.../context/planner/OptimizerPlannerContext.java | 3 +
.../planner/OptimizerPlannerContextFactory.java | 7 +-
.../planner/QueryOptimizePlannerFactory.java | 56 +++++++++++-
.../optimizer/ShardingSphereOptimizerTest.java | 101 ++++++++++++---------
.../rulealtered/RuleAlteredJobPreparer.java | 2 +-
6 files changed, 129 insertions(+), 61 deletions(-)
diff --git
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/ShardingSphereOptimizer.java
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/ShardingSphereOptimizer.java
index 4dcb7e2acad..1c1e43af0ba 100644
---
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/ShardingSphereOptimizer.java
+++
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/ShardingSphereOptimizer.java
@@ -20,19 +20,14 @@ package
org.apache.shardingsphere.infra.federation.optimizer;
import lombok.RequiredArgsConstructor;
import org.apache.calcite.adapter.enumerable.EnumerableConvention;
import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql2rel.SqlToRelConverter;
-import org.apache.calcite.tools.Programs;
import org.apache.shardingsphere.infra.exception.ShardingSphereException;
import
org.apache.shardingsphere.infra.federation.optimizer.context.OptimizerContext;
import
org.apache.shardingsphere.infra.federation.optimizer.converter.SQLNodeConverterEngine;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
-import java.util.Collections;
-
/**
* ShardingSphere optimizer.
*/
@@ -53,16 +48,22 @@ public final class ShardingSphereOptimizer {
try {
SqlToRelConverter converter =
context.getPlannerContexts().get(databaseName).getConverters().get(schemaName);
SqlNode sqlNode =
SQLNodeConverterEngine.convertToSQLNode(sqlStatement);
- RelRoot relRoot = converter.convertQuery(sqlNode, true, true);
- return optimize(converter, relRoot);
+ RelNode logicPlan = converter.convertQuery(sqlNode, true,
true).rel;
+ RelNode bestPlan = optimizeWithRBO(logicPlan,
context.getPlannerContexts().get(databaseName).getHepPlanners().get(schemaName));
+ return optimizeWithCBO(bestPlan, converter);
} catch (final UnsupportedOperationException ex) {
throw new ShardingSphereException(ex);
}
}
- private RelNode optimize(final SqlToRelConverter converter, final RelRoot
relRoot) {
+ private static RelNode optimizeWithRBO(final RelNode logicPlan, final
RelOptPlanner hepPlanner) {
+ hepPlanner.setRoot(logicPlan);
+ return hepPlanner.findBestExp();
+ }
+
+ private RelNode optimizeWithCBO(final RelNode bestPlan, final
SqlToRelConverter converter) {
RelOptPlanner planner = converter.getCluster().getPlanner();
- RelTraitSet desiredTraitSet =
relRoot.rel.getTraitSet().replace(EnumerableConvention.INSTANCE).simplify();
- return Programs.standard().run(planner, relRoot.rel, desiredTraitSet,
Collections.emptyList(), Collections.emptyList());
+ planner.setRoot(planner.changeTraits(bestPlan,
converter.getCluster().traitSet().replace(EnumerableConvention.INSTANCE)));
+ return planner.findBestExp();
}
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/planner/OptimizerPlannerContext.java
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/planner/OptimizerPlannerContext.java
index bfc7a1cf955..fa7647cffd7 100644
---
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/planner/OptimizerPlannerContext.java
+++
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/planner/OptimizerPlannerContext.java
@@ -19,6 +19,7 @@ package
org.apache.shardingsphere.infra.federation.optimizer.context.planner;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.sql2rel.SqlToRelConverter;
@@ -34,4 +35,6 @@ public final class OptimizerPlannerContext {
private final Map<String, SqlValidator> validators;
private final Map<String, SqlToRelConverter> converters;
+
+ private final Map<String, RelOptPlanner> hepPlanners;
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/planner/OptimizerPlannerContextFactory.java
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/planner/OptimizerPlannerContextFactory.java
index ca257721938..ea0875e12f1 100644
---
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/planner/OptimizerPlannerContextFactory.java
+++
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/planner/OptimizerPlannerContextFactory.java
@@ -25,6 +25,7 @@ import org.apache.calcite.config.CalciteConnectionProperty;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptTable.ViewExpander;
import org.apache.calcite.prepare.CalciteCatalogReader;
import org.apache.calcite.rel.type.RelDataTypeFactory;
@@ -77,6 +78,7 @@ public final class OptimizerPlannerContextFactory {
public static OptimizerPlannerContext create(final
FederationDatabaseMetaData databaseMetaData) {
Map<String, SqlValidator> validators = new LinkedHashMap<>();
Map<String, SqlToRelConverter> converters = new LinkedHashMap<>();
+ Map<String, RelOptPlanner> hepPlanners = new LinkedHashMap<>();
FederationDatabase federationDatabase = new
FederationDatabase(databaseMetaData);
for (Entry<String, Schema> entry :
federationDatabase.getSubSchemaMap().entrySet()) {
CalciteConnectionConfig connectionConfig = new
CalciteConnectionConfigImpl(createConnectionProperties());
@@ -86,8 +88,9 @@ public final class OptimizerPlannerContextFactory {
SqlToRelConverter converter = createConverter(catalogReader,
validator, relDataTypeFactory);
validators.put(entry.getKey(), validator);
converters.put(entry.getKey(), converter);
+ hepPlanners.put(entry.getKey(),
QueryOptimizePlannerFactory.createHepPlanner());
}
- return new OptimizerPlannerContext(validators, converters);
+ return new OptimizerPlannerContext(validators, converters,
hepPlanners);
}
private static Properties createConnectionProperties() {
@@ -115,7 +118,7 @@ public final class OptimizerPlannerContextFactory {
private static SqlToRelConverter createConverter(final
CalciteCatalogReader catalogReader, final SqlValidator validator, final
RelDataTypeFactory relDataTypeFactory) {
ViewExpander expander = (rowType, queryString, schemaPath, viewPath)
-> null;
Config converterConfig =
SqlToRelConverter.config().withTrimUnusedFields(true);
- RelOptCluster cluster =
RelOptCluster.create(QueryOptimizePlannerFactory.newInstance(), new
RexBuilder(relDataTypeFactory));
+ RelOptCluster cluster =
RelOptCluster.create(QueryOptimizePlannerFactory.createVolcanoPlanner(), new
RexBuilder(relDataTypeFactory));
return new SqlToRelConverter(expander, validator, catalogReader,
cluster, StandardConvertletTable.INSTANCE, converterConfig);
}
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/planner/QueryOptimizePlannerFactory.java
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/planner/QueryOptimizePlannerFactory.java
index ca6680517d9..bc763329503 100644
---
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/planner/QueryOptimizePlannerFactory.java
+++
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/planner/QueryOptimizePlannerFactory.java
@@ -22,27 +22,49 @@ import lombok.NoArgsConstructor;
import org.apache.calcite.adapter.enumerable.EnumerableRules;
import org.apache.calcite.plan.ConventionTraitDef;
import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.hep.HepMatchOrder;
+import org.apache.calcite.plan.hep.HepPlanner;
+import org.apache.calcite.plan.hep.HepProgramBuilder;
import org.apache.calcite.plan.volcano.VolcanoPlanner;
import org.apache.calcite.rel.RelCollationTraitDef;
import org.apache.calcite.rel.rules.CoreRules;
+import java.util.Collection;
+import java.util.LinkedList;
+
/**
* Query optimize planner factory.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class QueryOptimizePlannerFactory {
+ private static final int DEFAULT_MATCH_LIMIT = 1024;
+
/**
- * Create new instance of query optimize planner.
+ * Create new instance of volcano planner.
*
- * @return created instance
+ * @return volcano planner instance
*/
- public static RelOptPlanner newInstance() {
+ public static RelOptPlanner createVolcanoPlanner() {
RelOptPlanner result = new VolcanoPlanner();
setUpRules(result);
return result;
}
+ /**
+ * Create new instance of hep planner.
+ *
+ * @return hep planner instance
+ */
+ public static RelOptPlanner createHepPlanner() {
+ HepProgramBuilder builder = new HepProgramBuilder();
+
builder.addGroupBegin().addRuleCollection(getSubQueryRules()).addGroupEnd().addMatchOrder(HepMatchOrder.DEPTH_FIRST);
+
builder.addGroupBegin().addRuleCollection(getFilterRules()).addGroupEnd().addMatchOrder(HepMatchOrder.BOTTOM_UP);
+ builder.addMatchLimit(DEFAULT_MATCH_LIMIT);
+ return new HepPlanner(builder.build());
+ }
+
private static void setUpRules(final RelOptPlanner planner) {
planner.addRelTraitDef(ConventionTraitDef.INSTANCE);
planner.addRelTraitDef(RelCollationTraitDef.INSTANCE);
@@ -55,5 +77,33 @@ public final class QueryOptimizePlannerFactory {
planner.addRule(EnumerableRules.ENUMERABLE_TABLE_SCAN_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_AGGREGATE_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_FILTER_RULE);
+ planner.addRule(EnumerableRules.ENUMERABLE_CORRELATE_RULE);
+ }
+
+ private static Collection<RelOptRule> getSubQueryRules() {
+ Collection<RelOptRule> result = new LinkedList<>();
+ result.add(CoreRules.FILTER_SUB_QUERY_TO_CORRELATE);
+ result.add(CoreRules.PROJECT_SUB_QUERY_TO_CORRELATE);
+ result.add(CoreRules.JOIN_SUB_QUERY_TO_CORRELATE);
+ return result;
+ }
+
+ private static Collection<RelOptRule> getFilterRules() {
+ Collection<RelOptRule> result = new LinkedList<>();
+ result.add(CoreRules.FILTER_INTO_JOIN);
+ result.add(CoreRules.JOIN_CONDITION_PUSH);
+ result.add(CoreRules.SORT_JOIN_TRANSPOSE);
+ result.add(CoreRules.PROJECT_CORRELATE_TRANSPOSE);
+ result.add(CoreRules.FILTER_AGGREGATE_TRANSPOSE);
+ result.add(CoreRules.FILTER_PROJECT_TRANSPOSE);
+ result.add(CoreRules.FILTER_SET_OP_TRANSPOSE);
+ result.add(CoreRules.FILTER_PROJECT_TRANSPOSE);
+ result.add(CoreRules.FILTER_REDUCE_EXPRESSIONS);
+ result.add(CoreRules.PROJECT_REDUCE_EXPRESSIONS);
+ result.add(CoreRules.FILTER_MERGE);
+ result.add(CoreRules.PROJECT_CALC_MERGE);
+ result.add(CoreRules.JOIN_PUSH_EXPRESSIONS);
+ result.add(CoreRules.JOIN_PUSH_TRANSITIVE_PREDICATES);
+ return result;
}
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/test/java/org/apache/shardingsphere/infra/federation/optimizer/ShardingSphereOptimizerTest.java
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/test/java/org/apache/shardingsphere/infra/federation/optimizer/ShardingSphereOptimizerTest.java
index 7728aa2a11a..581790733c0 100644
---
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/test/java/org/apache/shardingsphere/infra/federation/optimizer/ShardingSphereOptimizerTest.java
+++
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/test/java/org/apache/shardingsphere/infra/federation/optimizer/ShardingSphereOptimizerTest.java
@@ -133,12 +133,14 @@ public final class ShardingSphereOptimizerTest {
ShardingSphereSQLParserEngine sqlParserEngine =
sqlParserRule.getSQLParserEngine(DatabaseTypeEngine.getTrunkDatabaseTypeName(new
H2DatabaseType()));
SQLStatement sqlStatement =
sqlParserEngine.parse(SELECT_CROSS_JOIN_CONDITION, false);
String actual = optimizer.optimize(databaseName, schemaName,
sqlStatement).explain();
- String expected = "EnumerableCalc(expr#0..4=[{inputs}],
expr#5=[CAST($t3):INTEGER], expr#6=[13], expr#7=[=($t5, $t6)],
proj#0..1=[{exprs}], user_id1=[$t3], $condition=[$t7])\n"
- + " EnumerableHashJoin(condition=[=($2, $4)],
joinType=[inner])\n"
- + " EnumerableCalc(expr#0..2=[{inputs}],
expr#3=[CAST($t1):VARCHAR], proj#0..1=[{exprs}], user_id0=[$t3])\n"
- + " EnumerableTableScan(table=[[federate_jdbc,
t_order_federate]])\n"
- + " EnumerableCalc(expr#0..1=[{inputs}],
expr#2=[CAST($t0):VARCHAR], user_id=[$t0], user_id0=[$t2])\n"
- + " EnumerableTableScan(table=[[federate_jdbc,
t_user_info]])\n";
+ String expected = "EnumerableCalc(expr#0..4=[{inputs}],
proj#0..1=[{exprs}], user_id0=[$t3])\n"
+ + " EnumerableCalc(expr#0..6=[{inputs}], proj#0..2=[{exprs}],
user_id1=[$t4], information=[$t5])\n"
+ + " EnumerableHashJoin(condition=[=($3, $6)],
joinType=[inner])\n"
+ + " EnumerableCalc(expr#0..2=[{inputs}],
expr#3=[CAST($t1):VARCHAR], proj#0..3=[{exprs}])\n"
+ + " EnumerableTableScan(table=[[federate_jdbc,
t_order_federate]])\n"
+ + " EnumerableCalc(expr#0..1=[{inputs}],
expr#2=[CAST($t0):VARCHAR], proj#0..2=[{exprs}])\n"
+ + " EnumerableFilter(condition=[=(CAST($0):INTEGER,
13)])\n"
+ + " EnumerableTableScan(table=[[federate_jdbc,
t_user_info]])\n";
assertThat(actual, is(expected));
}
@@ -147,8 +149,9 @@ public final class ShardingSphereOptimizerTest {
ShardingSphereSQLParserEngine sqlParserEngine =
sqlParserRule.getSQLParserEngine(DatabaseTypeEngine.getTrunkDatabaseTypeName(new
H2DatabaseType()));
SQLStatement sqlStatement =
sqlParserEngine.parse(SELECT_WHERE_ALL_FIELDS, false);
String actual = optimizer.optimize(databaseName, schemaName,
sqlStatement).explain();
- String expected = "EnumerableCalc(expr#0..1=[{inputs}],
expr#2=[CAST($t0):INTEGER], expr#3=[12], expr#4=[=($t2, $t3)],
proj#0..1=[{exprs}], $condition=[$t4])\n"
- + " EnumerableTableScan(table=[[federate_jdbc,
t_user_info]])\n";
+ String expected = "EnumerableCalc(expr#0..1=[{inputs}],
proj#0..1=[{exprs}])\n"
+ + " EnumerableFilter(condition=[=(CAST($0):INTEGER, 12)])\n"
+ + " EnumerableTableScan(table=[[federate_jdbc,
t_user_info]])\n";
assertThat(actual, is(expected));
}
@@ -157,8 +160,9 @@ public final class ShardingSphereOptimizerTest {
ShardingSphereSQLParserEngine sqlParserEngine =
sqlParserRule.getSQLParserEngine(DatabaseTypeEngine.getTrunkDatabaseTypeName(new
H2DatabaseType()));
SQLStatement sqlStatement =
sqlParserEngine.parse(SELECT_WHERE_SINGLE_FIELD, false);
String actual = optimizer.optimize(databaseName, schemaName,
sqlStatement).explain();
- String expected = "EnumerableCalc(expr#0..1=[{inputs}],
expr#2=[CAST($t0):INTEGER], expr#3=[12], expr#4=[=($t2, $t3)], user_id=[$t0],
$condition=[$t4])\n"
- + " EnumerableTableScan(table=[[federate_jdbc,
t_user_info]])\n";
+ String expected = "EnumerableCalc(expr#0..1=[{inputs}],
user_id=[$t0])\n"
+ + " EnumerableFilter(condition=[=(CAST($0):INTEGER, 12)])\n"
+ + " EnumerableTableScan(table=[[federate_jdbc,
t_user_info]])\n";
assertThat(actual, is(expected));
}
@@ -167,11 +171,13 @@ public final class ShardingSphereOptimizerTest {
ShardingSphereSQLParserEngine sqlParserEngine =
sqlParserRule.getSQLParserEngine(DatabaseTypeEngine.getTrunkDatabaseTypeName(new
H2DatabaseType()));
SQLStatement sqlStatement = sqlParserEngine.parse(SELECT_CROSS_WHERE,
false);
String actual = optimizer.optimize(databaseName, schemaName,
sqlStatement).explain();
- String expected =
"EnumerableNestedLoopJoin(condition=[=(CAST($1):VARCHAR, CAST($2):VARCHAR)],
joinType=[inner])\n"
- + " EnumerableCalc(expr#0..2=[{inputs}],
proj#0..1=[{exprs}])\n"
- + " EnumerableTableScan(table=[[federate_jdbc,
t_order_federate]])\n"
- + " EnumerableCalc(expr#0..1=[{inputs}], user_id=[$t0])\n"
- + " EnumerableTableScan(table=[[federate_jdbc,
t_user_info]])\n";
+ String expected = "EnumerableCalc(expr#0..4=[{inputs}],
proj#0..1=[{exprs}], user_id0=[$t3])\n"
+ + " EnumerableCalc(expr#0..6=[{inputs}], proj#0..2=[{exprs}],
user_id1=[$t4], information=[$t5])\n"
+ + " EnumerableHashJoin(condition=[=($3, $6)],
joinType=[inner])\n"
+ + " EnumerableCalc(expr#0..2=[{inputs}],
expr#3=[CAST($t1):VARCHAR], proj#0..3=[{exprs}])\n"
+ + " EnumerableTableScan(table=[[federate_jdbc,
t_order_federate]])\n"
+ + " EnumerableCalc(expr#0..1=[{inputs}],
expr#2=[CAST($t0):VARCHAR], proj#0..2=[{exprs}])\n"
+ + " EnumerableTableScan(table=[[federate_jdbc,
t_user_info]])\n";
assertThat(actual, is(expected));
}
@@ -180,11 +186,11 @@ public final class ShardingSphereOptimizerTest {
ShardingSphereSQLParserEngine sqlParserEngine =
sqlParserRule.getSQLParserEngine(DatabaseTypeEngine.getTrunkDatabaseTypeName(new
H2DatabaseType()));
SQLStatement sqlStatement = sqlParserEngine.parse(SELECT_CROSS_JOIN,
false);
String actual = optimizer.optimize(databaseName, schemaName,
sqlStatement).explain();
- String expected = "EnumerableCalc(expr#0..4=[{inputs}],
proj#0..1=[{exprs}], user_id0=[$t3])\n"
- + " EnumerableHashJoin(condition=[=($2, $4)],
joinType=[inner])\n"
- + " EnumerableCalc(expr#0..2=[{inputs}],
expr#3=[CAST($t1):VARCHAR], proj#0..1=[{exprs}], user_id0=[$t3])\n"
+ String expected = "EnumerableCalc(expr#0..6=[{inputs}],
proj#0..1=[{exprs}], user_id0=[$t4])\n"
+ + " EnumerableHashJoin(condition=[=($3, $6)],
joinType=[inner])\n"
+ + " EnumerableCalc(expr#0..2=[{inputs}],
expr#3=[CAST($t1):VARCHAR], proj#0..3=[{exprs}])\n"
+ " EnumerableTableScan(table=[[federate_jdbc,
t_order_federate]])\n"
- + " EnumerableCalc(expr#0..1=[{inputs}],
expr#2=[CAST($t0):VARCHAR], user_id=[$t0], user_id0=[$t2])\n"
+ + " EnumerableCalc(expr#0..1=[{inputs}],
expr#2=[CAST($t0):VARCHAR], proj#0..2=[{exprs}])\n"
+ " EnumerableTableScan(table=[[federate_jdbc,
t_user_info]])\n";
assertThat(actual, is(expected));
}
@@ -194,11 +200,14 @@ public final class ShardingSphereOptimizerTest {
ShardingSphereSQLParserEngine sqlParserEngine =
sqlParserRule.getSQLParserEngine(DatabaseTypeEngine.getTrunkDatabaseTypeName(new
H2DatabaseType()));
SQLStatement sqlStatement =
sqlParserEngine.parse(SELECT_CROSS_WHERE_CONDITION, false);
String actual = optimizer.optimize(databaseName, schemaName,
sqlStatement).explain();
- String expected =
"EnumerableNestedLoopJoin(condition=[=(CAST($1):VARCHAR, CAST($2):VARCHAR)],
joinType=[inner])\n"
- + " EnumerableCalc(expr#0..2=[{inputs}],
proj#0..1=[{exprs}])\n"
- + " EnumerableTableScan(table=[[federate_jdbc,
t_order_federate]])\n"
- + " EnumerableCalc(expr#0..1=[{inputs}],
expr#2=[CAST($t0):INTEGER], expr#3=[13], expr#4=[=($t2, $t3)], user_id=[$t0],
$condition=[$t4])\n"
- + " EnumerableTableScan(table=[[federate_jdbc,
t_user_info]])\n";
+ String expected = "EnumerableCalc(expr#0..4=[{inputs}],
proj#0..1=[{exprs}], user_id0=[$t3])\n"
+ + " EnumerableCalc(expr#0..6=[{inputs}], proj#0..2=[{exprs}],
user_id1=[$t4], information=[$t5])\n"
+ + " EnumerableHashJoin(condition=[=($3, $6)],
joinType=[inner])\n"
+ + " EnumerableCalc(expr#0..2=[{inputs}],
expr#3=[CAST($t1):VARCHAR], proj#0..3=[{exprs}])\n"
+ + " EnumerableTableScan(table=[[federate_jdbc,
t_order_federate]])\n"
+ + " EnumerableCalc(expr#0..1=[{inputs}],
expr#2=[CAST($t0):VARCHAR], proj#0..2=[{exprs}])\n"
+ + " EnumerableFilter(condition=[=(CAST($0):INTEGER,
13)])\n"
+ + " EnumerableTableScan(table=[[federate_jdbc,
t_user_info]])\n";
assertThat(actual, is(expected));
}
@@ -207,8 +216,9 @@ public final class ShardingSphereOptimizerTest {
ShardingSphereSQLParserEngine sqlParserEngine =
sqlParserRule.getSQLParserEngine(DatabaseTypeEngine.getTrunkDatabaseTypeName(new
H2DatabaseType()));
SQLStatement sqlStatement =
sqlParserEngine.parse(SELECT_SUBQUERY_FROM, false);
String actual = optimizer.optimize(databaseName, schemaName,
sqlStatement).explain();
- String expected = "EnumerableCalc(expr#0..1=[{inputs}],
expr#2=[CAST($t0):INTEGER], expr#3=[1], expr#4=[>($t2, $t3)],
proj#0..1=[{exprs}], $condition=[$t4])\n"
- + " EnumerableTableScan(table=[[federate_jdbc,
t_user_info]])\n";
+ String expected = "EnumerableCalc(expr#0..1=[{inputs}],
proj#0..1=[{exprs}])\n"
+ + " EnumerableFilter(condition=[>(CAST($0):INTEGER, 1)])\n"
+ + " EnumerableTableScan(table=[[federate_jdbc,
t_user_info]])\n";
assertThat(actual, is(expected));
}
@@ -217,13 +227,14 @@ public final class ShardingSphereOptimizerTest {
ShardingSphereSQLParserEngine sqlParserEngine =
sqlParserRule.getSQLParserEngine(DatabaseTypeEngine.getTrunkDatabaseTypeName(new
H2DatabaseType()));
SQLStatement sqlStatement =
sqlParserEngine.parse(SELECT_SUBQUERY_WHERE_EXIST, false);
String actual = optimizer.optimize(databaseName, schemaName,
sqlStatement).explain();
- String expected = "EnumerableCalc(expr#0..4=[{inputs}], expr#5=[IS NOT
NULL($t4)], proj#0..1=[{exprs}], $condition=[$t5])\n"
- + " EnumerableHashJoin(condition=[=($2, $3)],
joinType=[left])\n"
- + " EnumerableCalc(expr#0..2=[{inputs}],
expr#3=[CAST($t1):VARCHAR], proj#0..1=[{exprs}], user_id0=[$t3])\n"
+ String expected = "EnumerableCalc(expr#0..3=[{inputs}],
proj#0..1=[{exprs}])\n"
+ + " EnumerableFilter(condition=[IS NOT NULL($3)])\n"
+ + " EnumerableCorrelate(correlation=[$cor0],
joinType=[left], requiredColumns=[{1}])\n"
+ " EnumerableTableScan(table=[[federate_jdbc,
t_order_federate]])\n"
- + " EnumerableAggregate(group=[{0}], agg#0=[MIN($1)])\n"
- + " EnumerableCalc(expr#0..1=[{inputs}],
expr#2=[CAST($t0):VARCHAR], expr#3=[true], expr#4=[IS NOT NULL($t2)],
user_id0=[$t2], $f0=[$t3], $condition=[$t4])\n"
- + " EnumerableTableScan(table=[[federate_jdbc,
t_user_info]])\n";
+ + " EnumerableAggregate(group=[{}], agg#0=[MIN($0)])\n"
+ + " EnumerableCalc(expr#0..1=[{inputs}], expr#2=[true],
$f0=[$t2])\n"
+ + "
EnumerableFilter(condition=[=(CAST($cor0.user_id):VARCHAR,
CAST($0):VARCHAR)])\n"
+ + " EnumerableTableScan(table=[[federate_jdbc,
t_user_info]])\n";
assertThat(actual, is(expected));
}
@@ -232,10 +243,9 @@ public final class ShardingSphereOptimizerTest {
ShardingSphereSQLParserEngine sqlParserEngine =
sqlParserRule.getSQLParserEngine(DatabaseTypeEngine.getTrunkDatabaseTypeName(new
H2DatabaseType()));
SQLStatement sqlStatement =
sqlParserEngine.parse(SELECT_SUBQUERY_WHERE_IN, false);
String actual = optimizer.optimize(databaseName, schemaName,
sqlStatement).explain();
- String expected = "EnumerableCalc(expr#0..2=[{inputs}],
proj#0..1=[{exprs}])\n"
- + " EnumerableHashJoin(condition=[=($1, $2)],
joinType=[inner])\n"
- + " EnumerableCalc(expr#0..2=[{inputs}],
proj#0..1=[{exprs}])\n"
- + " EnumerableTableScan(table=[[federate_jdbc,
t_order_federate]])\n"
+ String expected = "EnumerableCalc(expr#0..3=[{inputs}],
proj#0..1=[{exprs}])\n"
+ + " EnumerableHashJoin(condition=[=($1, $3)],
joinType=[inner])\n"
+ + " EnumerableTableScan(table=[[federate_jdbc,
t_order_federate]])\n"
+ " EnumerableAggregate(group=[{0}])\n"
+ " EnumerableCalc(expr#0..1=[{inputs}], user_id=[$t0])\n"
+ " EnumerableTableScan(table=[[federate_jdbc,
t_user_info]])\n";
@@ -247,17 +257,18 @@ public final class ShardingSphereOptimizerTest {
ShardingSphereSQLParserEngine sqlParserEngine =
sqlParserRule.getSQLParserEngine(DatabaseTypeEngine.getTrunkDatabaseTypeName(new
H2DatabaseType()));
SQLStatement sqlStatement =
sqlParserEngine.parse(SELECT_SUBQUERY_WHERE_BETWEEN, false);
String actual = optimizer.optimize(databaseName, schemaName,
sqlStatement).explain();
- String expected = "EnumerableCalc(expr#0..3=[{inputs}],
proj#0..1=[{exprs}])\n"
- + " EnumerableNestedLoopJoin(condition=[<=($1, $3)],
joinType=[inner])\n"
- + " EnumerableNestedLoopJoin(condition=[>=($1, $2)],
joinType=[inner])\n"
- + " EnumerableCalc(expr#0..2=[{inputs}],
proj#0..1=[{exprs}])\n"
- + " EnumerableTableScan(table=[[federate_jdbc,
t_order_federate]])\n"
+ String expected = "EnumerableCalc(expr#0..4=[{inputs}],
proj#0..1=[{exprs}])\n"
+ + " EnumerableNestedLoopJoin(condition=[<=($1, $4)],
joinType=[inner])\n"
+ + " EnumerableNestedLoopJoin(condition=[>=($1, $3)],
joinType=[inner])\n"
+ + " EnumerableTableScan(table=[[federate_jdbc,
t_order_federate]])\n"
+ " EnumerableAggregate(group=[{}],
agg#0=[SINGLE_VALUE($0)])\n"
- + " EnumerableCalc(expr#0..1=[{inputs}],
expr#2=[CAST($t1):VARCHAR], expr#3=['before':VARCHAR], expr#4=[=($t2, $t3)],
user_id=[$t0], $condition=[$t4])\n"
- + " EnumerableTableScan(table=[[federate_jdbc,
t_user_info]])\n"
+ + " EnumerableCalc(expr#0..1=[{inputs}],
user_id=[$t0])\n"
+ + " EnumerableFilter(condition=[=(CAST($1):VARCHAR,
'before')])\n"
+ + " EnumerableTableScan(table=[[federate_jdbc,
t_user_info]])\n"
+ " EnumerableAggregate(group=[{}],
agg#0=[SINGLE_VALUE($0)])\n"
- + " EnumerableCalc(expr#0..1=[{inputs}],
expr#2=[CAST($t1):VARCHAR], expr#3=['after':VARCHAR], expr#4=[=($t2, $t3)],
user_id=[$t0], $condition=[$t4])\n"
- + " EnumerableTableScan(table=[[federate_jdbc,
t_user_info]])\n";
+ + " EnumerableCalc(expr#0..1=[{inputs}], user_id=[$t0])\n"
+ + " EnumerableFilter(condition=[=(CAST($1):VARCHAR,
'after')])\n"
+ + " EnumerableTableScan(table=[[federate_jdbc,
t_user_info]])\n";
assertThat(actual, is(expected));
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index 288d1cf0b85..6c56ec6179c 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -152,7 +152,7 @@ public final class RuleAlteredJobPreparer {
jobContext.getDataSourceManager(),
jobConfig.getTablesFirstDataNodes(), tableNameSchemaNameMapping);
dataSourcePreparer.get().prepareTargetTables(prepareTargetTablesParameter);
}
-
+
private boolean isSourceAndTargetSchemaAvailable(final
RuleAlteredJobConfiguration jobConfig) {
DatabaseType sourceDatabaseType =
DatabaseTypeFactory.getInstance(jobConfig.getSourceDatabaseType());
DatabaseType targetDatabaseType =
DatabaseTypeFactory.getInstance(jobConfig.getTargetDatabaseType());