Repository: flink Updated Branches: refs/heads/master 186b12309 -> 8efacf588
[FLINK-5406] [table] Add normalization phase for predicate logical plan rewriting This closes #3101. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8efacf58 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8efacf58 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8efacf58 Branch: refs/heads/master Commit: 8efacf588cee45eb99a24628136ced308c4fb418 Parents: 186b123 Author: godfreyhe <godfre...@163.com> Authored: Thu Jan 12 18:42:49 2017 +0800 Committer: twalthr <twal...@apache.org> Committed: Tue Feb 14 14:30:04 2017 +0100 ---------------------------------------------------------------------- .../flink/table/api/BatchTableEnvironment.scala | 52 +++---- .../table/api/StreamTableEnvironment.scala | 48 ++++--- .../flink/table/api/TableEnvironment.scala | 103 ++++++++++++-- .../flink/table/calcite/CalciteConfig.scala | 90 ++++++++++--- .../table/plan/nodes/dataset/DataSetCalc.scala | 4 +- .../flink/table/plan/rules/FlinkRuleSets.scala | 32 +++-- .../api/java/batch/TableEnvironmentITCase.java | 2 +- .../apache/flink/table/AggregationTest.scala | 4 +- .../flink/table/CalciteConfigBuilderTest.scala | 135 +++++++++++++++---- .../flink/table/TableEnvironmentTest.scala | 4 +- .../api/scala/batch/sql/SetOperatorsTest.scala | 2 +- .../scala/batch/table/FieldProjectionTest.scala | 4 +- .../expressions/utils/ExpressionTestBase.scala | 25 +++- .../plan/rules/NormalizationRulesTest.scala | 98 ++++++++++++++ 14 files changed, 477 insertions(+), 126 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8efacf58/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala index 2dec00e..b48e9f9 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala @@ -20,12 +20,12 @@ package org.apache.flink.table.api import _root_.java.util.concurrent.atomic.AtomicInteger -import org.apache.calcite.plan.RelOptPlanner.CannotPlanException import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.plan.hep.HepMatchOrder import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.sql2rel.RelDecorrelator -import org.apache.calcite.tools.{Programs, RuleSet} +import org.apache.calcite.tools.RuleSet import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.io.DiscardingOutputFormat import org.apache.flink.api.java.typeutils.GenericTypeInfo @@ -199,9 +199,14 @@ abstract class BatchTableEnvironment( } /** - * Returns the built-in rules that are defined by the environment. + * Returns the built-in normalization rules that are defined by the environment. */ - protected def getBuiltInRuleSet: RuleSet = FlinkRuleSets.DATASET_OPT_RULES + protected def getBuiltInNormRuleSet: RuleSet = FlinkRuleSets.DATASET_NORM_RULES + + /** + * Returns the built-in optimization rules that are defined by the environment. + */ + protected def getBuiltInOptRuleSet: RuleSet = FlinkRuleSets.DATASET_OPT_RULES /** * Generates the optimized [[RelNode]] tree from the original relational node tree. @@ -211,32 +216,27 @@ abstract class BatchTableEnvironment( */ private[flink] def optimize(relNode: RelNode): RelNode = { - // decorrelate + // 1. decorrelate val decorPlan = RelDecorrelator.decorrelateQuery(relNode) - // optimize the logical Flink plan - val optProgram = Programs.ofRules(getRuleSet) - val flinkOutputProps = relNode.getTraitSet.replace(DataSetConvention.INSTANCE).simplify() + // 2. normalize the logical plan + val normRuleSet = getNormRuleSet + val normalizedPlan = if (normRuleSet.iterator().hasNext) { + runHepPlanner(HepMatchOrder.BOTTOM_UP, normRuleSet, decorPlan, decorPlan.getTraitSet) + } else { + decorPlan + } - val dataSetPlan = try { - optProgram.run(getPlanner, decorPlan, flinkOutputProps) - } catch { - case e: CannotPlanException => - throw new TableException( - s"Cannot generate a valid execution plan for the given query: \n\n" + - s"${RelOptUtil.toString(relNode)}\n" + - s"This exception indicates that the query uses an unsupported SQL feature.\n" + - s"Please check the documentation for the set of currently supported SQL features.") - case t: TableException => - throw new TableException( - s"Cannot generate a valid execution plan for the given query: \n\n" + - s"${RelOptUtil.toString(relNode)}\n" + - s"${t.msg}\n" + - s"Please check the documentation for the set of currently supported SQL features.") - case a: AssertionError => - throw a.getCause + // 3. optimize the logical Flink plan + val optRuleSet = getOptRuleSet + val flinkOutputProps = relNode.getTraitSet.replace(DataSetConvention.INSTANCE).simplify() + val optimizedPlan = if (optRuleSet.iterator().hasNext) { + runVolcanoPlanner(optRuleSet, normalizedPlan, flinkOutputProps) + } else { + normalizedPlan } - dataSetPlan + + optimizedPlan } /** http://git-wip-us.apache.org/repos/asf/flink/blob/8efacf58/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala index 19c4af1..d927c3a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala @@ -20,14 +20,14 @@ package org.apache.flink.table.api import _root_.java.util.concurrent.atomic.AtomicInteger -import org.apache.calcite.plan.RelOptPlanner.CannotPlanException import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.plan.hep.HepMatchOrder import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.sql2rel.RelDecorrelator -import org.apache.calcite.tools.{Programs, RuleSet} +import org.apache.calcite.tools.RuleSet import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo} +import org.apache.flink.api.java.typeutils.GenericTypeInfo import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.table.explain.PlanJsonParser @@ -211,9 +211,14 @@ abstract class StreamTableEnvironment( } /** - * Returns the built-in rules that are defined by the environment. + * Returns the built-in normalization rules that are defined by the environment. */ - protected def getBuiltInRuleSet: RuleSet = FlinkRuleSets.DATASTREAM_OPT_RULES + protected def getBuiltInNormRuleSet: RuleSet = FlinkRuleSets.DATASTREAM_NORM_RULES + + /** + * Returns the built-in optimization rules that are defined by the environment. + */ + protected def getBuiltInOptRuleSet: RuleSet = FlinkRuleSets.DATASTREAM_OPT_RULES /** * Generates the optimized [[RelNode]] tree from the original relational node tree. @@ -222,25 +227,28 @@ abstract class StreamTableEnvironment( * @return The optimized [[RelNode]] tree */ private[flink] def optimize(relNode: RelNode): RelNode = { - // decorrelate - val decorPlan = RelDecorrelator.decorrelateQuery(relNode) - // optimize the logical Flink plan - val optProgram = Programs.ofRules(getRuleSet) - val flinkOutputProps = relNode.getTraitSet.replace(DataStreamConvention.INSTANCE).simplify() + // 1. decorrelate + val decorPlan = RelDecorrelator.decorrelateQuery(relNode) - val dataStreamPlan = try { - optProgram.run(getPlanner, decorPlan, flinkOutputProps) + // 2. normalize the logical plan + val normRuleSet = getNormRuleSet + val normalizedPlan = if (normRuleSet.iterator().hasNext) { + runHepPlanner(HepMatchOrder.BOTTOM_UP, normRuleSet, decorPlan, decorPlan.getTraitSet) + } else { + decorPlan } - catch { - case e: CannotPlanException => - throw TableException( - s"Cannot generate a valid execution plan for the given query: \n\n" + - s"${RelOptUtil.toString(relNode)}\n" + - s"This exception indicates that the query uses an unsupported SQL feature.\n" + - s"Please check the documentation for the set of currently supported SQL features.", e) + + // 3. optimize the logical Flink plan + val optRuleSet = getOptRuleSet + val flinkOutputProps = relNode.getTraitSet.replace(DataStreamConvention.INSTANCE).simplify() + val optimizedPlan = if (optRuleSet.iterator().hasNext) { + runVolcanoPlanner(optRuleSet, normalizedPlan, flinkOutputProps) + } else { + normalizedPlan } - dataStreamPlan + + optimizedPlan } http://git-wip-us.apache.org/repos/asf/flink/blob/8efacf58/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index b36441a..4a36320 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -23,14 +23,17 @@ import _root_.java.util.concurrent.atomic.AtomicInteger import org.apache.calcite.config.Lex import org.apache.calcite.jdbc.CalciteSchema -import org.apache.calcite.plan.RelOptPlanner +import org.apache.calcite.plan.RelOptPlanner.CannotPlanException +import org.apache.calcite.plan.hep.{HepMatchOrder, HepPlanner, HepProgramBuilder} +import org.apache.calcite.plan.{RelOptPlanner, RelOptUtil, RelTraitSet} +import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.schema.SchemaPlus import org.apache.calcite.schema.impl.AbstractTable import org.apache.calcite.sql.SqlOperatorTable import org.apache.calcite.sql.parser.SqlParser import org.apache.calcite.sql.util.ChainedSqlOperatorTable -import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RuleSet, RuleSets} +import org.apache.calcite.tools._ import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType @@ -119,20 +122,41 @@ abstract class TableEnvironment(val config: TableConfig) { } /** - * Returns the rule set for this environment including a custom Calcite configuration. + * Returns the normalization rule set for this environment + * including a custom RuleSet configuration. */ - protected def getRuleSet: RuleSet = { + protected def getNormRuleSet: RuleSet = { val calciteConfig = config.getCalciteConfig - calciteConfig.getRuleSet match { + calciteConfig.getNormRuleSet match { case None => - getBuiltInRuleSet + getBuiltInNormRuleSet case Some(ruleSet) => - if (calciteConfig.replacesRuleSet) { + if (calciteConfig.replacesNormRuleSet) { ruleSet } else { - RuleSets.ofList((getBuiltInRuleSet.asScala ++ ruleSet.asScala).asJava) + RuleSets.ofList((getBuiltInNormRuleSet.asScala ++ ruleSet.asScala).asJava) + } + } + } + + /** + * Returns the optimization rule set for this environment + * including a custom RuleSet configuration. + */ + protected def getOptRuleSet: RuleSet = { + val calciteConfig = config.getCalciteConfig + calciteConfig.getOptRuleSet match { + + case None => + getBuiltInOptRuleSet + + case Some(ruleSet) => + if (calciteConfig.replacesOptRuleSet) { + ruleSet + } else { + RuleSets.ofList((getBuiltInOptRuleSet.asScala ++ ruleSet.asScala).asJava) } } } @@ -158,9 +182,68 @@ abstract class TableEnvironment(val config: TableConfig) { } /** - * Returns the built-in rules that are defined by the environment. + * Returns the built-in normalization rules that are defined by the environment. */ - protected def getBuiltInRuleSet: RuleSet + protected def getBuiltInNormRuleSet: RuleSet + + /** + * Returns the built-in optimization rules that are defined by the environment. + */ + protected def getBuiltInOptRuleSet: RuleSet + + /** + * run HEP planner + */ + protected def runHepPlanner( + hepMatchOrder: HepMatchOrder, + ruleSet: RuleSet, + input: RelNode, + targetTraits: RelTraitSet): RelNode = { + val builder = new HepProgramBuilder + builder.addMatchOrder(hepMatchOrder) + + val it = ruleSet.iterator() + while (it.hasNext) { + builder.addRuleInstance(it.next()) + } + + val planner = new HepPlanner(builder.build, frameworkConfig.getContext) + planner.setRoot(input) + if (input.getTraitSet != targetTraits) { + planner.changeTraits(input, targetTraits.simplify) + } + planner.findBestExp + } + + /** + * run VOLCANO planner + */ + protected def runVolcanoPlanner( + ruleSet: RuleSet, + input: RelNode, + targetTraits: RelTraitSet): RelNode = { + val optProgram = Programs.ofRules(ruleSet) + + val output = try { + optProgram.run(getPlanner, input, targetTraits) + } catch { + case e: CannotPlanException => + throw new TableException( + s"Cannot generate a valid execution plan for the given query: \n\n" + + s"${RelOptUtil.toString(input)}\n" + + s"This exception indicates that the query uses an unsupported SQL feature.\n" + + s"Please check the documentation for the set of currently supported SQL features.") + case t: TableException => + throw new TableException( + s"Cannot generate a valid execution plan for the given query: \n\n" + + s"${RelOptUtil.toString(input)}\n" + + s"${t.msg}\n" + + s"Please check the documentation for the set of currently supported SQL features.") + case a: AssertionError => + throw a.getCause + } + output + } /** * Registers a [[ScalarFunction]] under a unique name. Replaces already existing http://git-wip-us.apache.org/repos/asf/flink/blob/8efacf58/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala index f646caf..65a61b2 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala @@ -31,8 +31,11 @@ import scala.collection.JavaConverters._ * Builder for creating a Calcite configuration. */ class CalciteConfigBuilder { - private var replaceRules: Boolean = false - private var ruleSets: List[RuleSet] = Nil + private var replaceNormRules: Boolean = false + private var normRuleSets: List[RuleSet] = Nil + + private var replaceOptRules: Boolean = false + private var optRuleSets: List[RuleSet] = Nil private var replaceOperatorTable: Boolean = false private var operatorTables: List[SqlOperatorTable] = Nil @@ -40,21 +43,40 @@ class CalciteConfigBuilder { private var replaceSqlParserConfig: Option[SqlParser.Config] = None /** - * Replaces the built-in rule set with the given rule set. + * Replaces the built-in normalization rule set with the given rule set. + */ + def replaceNormRuleSet(replaceRuleSet: RuleSet): CalciteConfigBuilder = { + Preconditions.checkNotNull(replaceRuleSet) + normRuleSets = List(replaceRuleSet) + replaceNormRules = true + this + } + + /** + * Appends the given normalization rule set to the built-in rule set. + */ + def addNormRuleSet(addedRuleSet: RuleSet): CalciteConfigBuilder = { + Preconditions.checkNotNull(addedRuleSet) + normRuleSets = addedRuleSet :: normRuleSets + this + } + + /** + * Replaces the built-in optimization rule set with the given rule set. */ - def replaceRuleSet(replaceRuleSet: RuleSet): CalciteConfigBuilder = { + def replaceOptRuleSet(replaceRuleSet: RuleSet): CalciteConfigBuilder = { Preconditions.checkNotNull(replaceRuleSet) - ruleSets = List(replaceRuleSet) - replaceRules = true + optRuleSets = List(replaceRuleSet) + replaceOptRules = true this } /** - * Appends the given rule set to the built-in rule set. + * Appends the given optimization rule set to the built-in rule set. */ - def addRuleSet(addedRuleSet: RuleSet): CalciteConfigBuilder = { + def addOptRuleSet(addedRuleSet: RuleSet): CalciteConfigBuilder = { Preconditions.checkNotNull(addedRuleSet) - ruleSets = addedRuleSet :: ruleSets + optRuleSets = addedRuleSet :: optRuleSets this } @@ -87,32 +109,45 @@ class CalciteConfigBuilder { } private class CalciteConfigImpl( - val getRuleSet: Option[RuleSet], - val replacesRuleSet: Boolean, - val getSqlOperatorTable: Option[SqlOperatorTable], - val replacesSqlOperatorTable: Boolean, - val getSqlParserConfig: Option[SqlParser.Config]) + val getNormRuleSet: Option[RuleSet], + val replacesNormRuleSet: Boolean, + val getOptRuleSet: Option[RuleSet], + val replacesOptRuleSet: Boolean, + val getSqlOperatorTable: Option[SqlOperatorTable], + val replacesSqlOperatorTable: Boolean, + val getSqlParserConfig: Option[SqlParser.Config]) extends CalciteConfig /** * Builds a new [[CalciteConfig]]. */ def build(): CalciteConfig = new CalciteConfigImpl( - ruleSets match { + normRuleSets match { case Nil => None case h :: Nil => Some(h) case _ => // concat rule sets - val concatRules = ruleSets.foldLeft(Nil: Iterable[RelOptRule])( (c, r) => r.asScala ++ c) + val concatRules = + normRuleSets.foldLeft(Nil: Iterable[RelOptRule])((c, r) => r.asScala ++ c) Some(RuleSets.ofList(concatRules.asJava)) }, - this.replaceRules, + replaceNormRules, + optRuleSets match { + case Nil => None + case h :: Nil => Some(h) + case _ => + // concat rule sets + val concatRules = + optRuleSets.foldLeft(Nil: Iterable[RelOptRule])((c, r) => r.asScala ++ c) + Some(RuleSets.ofList(concatRules.asJava)) + }, + replaceOptRules, operatorTables match { case Nil => None case h :: Nil => Some(h) case _ => // chain operator tables - Some(operatorTables.reduce( (x, y) => ChainedSqlOperatorTable.of(x, y))) + Some(operatorTables.reduce((x, y) => ChainedSqlOperatorTable.of(x, y))) }, this.replaceOperatorTable, replaceSqlParserConfig) @@ -122,15 +157,26 @@ class CalciteConfigBuilder { * Calcite configuration for defining a custom Calcite configuration for Table and SQL API. */ trait CalciteConfig { + + /** + * Returns whether this configuration replaces the built-in normalization rule set. + */ + def replacesNormRuleSet: Boolean + + /** + * Returns a custom normalization rule set. + */ + def getNormRuleSet: Option[RuleSet] + /** - * Returns whether this configuration replaces the built-in rule set. + * Returns whether this configuration replaces the built-in optimization rule set. */ - def replacesRuleSet: Boolean + def replacesOptRuleSet: Boolean /** - * Returns a custom rule set. + * Returns a custom optimization rule set. */ - def getRuleSet: Option[RuleSet] + def getOptRuleSet: Option[RuleSet] /** * Returns whether this configuration replaces the built-in SQL operator table. http://git-wip-us.apache.org/repos/asf/flink/blob/8efacf58/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala index 245a038..9b3ff63 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala @@ -24,7 +24,6 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} import org.apache.calcite.rex._ import org.apache.flink.api.common.functions.FlatMapFunction -import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet import org.apache.flink.table.api.BatchTableEnvironment import org.apache.flink.table.calcite.FlinkTypeFactory @@ -78,9 +77,12 @@ class DataSetCalc( // compute number of expressions that do not access a field or literal, i.e. computations, // conditions, etc. We only want to account for computations, not for simple projections. + // CASTs in RexProgram are reduced as far as possible by ReduceExpressionsRule + // in normalization stage. So we should ignore CASTs here in optimization stage. val compCnt = calcProgram.getExprList.asScala.toList.count { case i: RexInputRef => false case l: RexLiteral => false + case c: RexCall if c.getOperator.getName.equals("CAST") => false case _ => true } http://git-wip-us.apache.org/repos/asf/flink/blob/8efacf58/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala index 0b60848..a24a06d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala @@ -28,6 +28,17 @@ import org.apache.flink.table.plan.rules.datastream.{DataStreamCalcRule, DataStr object FlinkRuleSets { /** + * RuleSet to normalize plans for batch / DataSet execution + */ + val DATASET_NORM_RULES: RuleSet = RuleSets.ofList( + // simplify expressions rules + ReduceExpressionsRule.FILTER_INSTANCE, + ReduceExpressionsRule.PROJECT_INSTANCE, + ReduceExpressionsRule.CALC_INSTANCE, + ReduceExpressionsRule.JOIN_INSTANCE + ) + + /** * RuleSet to optimize plans for batch / DataSet execution */ val DATASET_OPT_RULES: RuleSet = RuleSets.ofList( @@ -75,12 +86,6 @@ object FlinkRuleSets { // remove unnecessary sort rule SortRemoveRule.INSTANCE, - // simplify expressions rules - ReduceExpressionsRule.FILTER_INSTANCE, - ReduceExpressionsRule.PROJECT_INSTANCE, - ReduceExpressionsRule.CALC_INSTANCE, - ReduceExpressionsRule.JOIN_INSTANCE, - // prune empty results rules PruneEmptyRules.AGGREGATE_INSTANCE, PruneEmptyRules.FILTER_INSTANCE, @@ -117,6 +122,16 @@ object FlinkRuleSets { ) /** + * RuleSet to normalize plans for stream / DataStream execution + */ + val DATASTREAM_NORM_RULES: RuleSet = RuleSets.ofList( + // simplify expressions rules + ReduceExpressionsRule.FILTER_INSTANCE, + ReduceExpressionsRule.PROJECT_INSTANCE, + ReduceExpressionsRule.CALC_INSTANCE + ) + + /** * RuleSet to optimize plans for stream / DataStream execution */ val DATASTREAM_OPT_RULES: RuleSet = RuleSets.ofList( @@ -142,11 +157,6 @@ object FlinkRuleSets { FilterProjectTransposeRule.INSTANCE, ProjectRemoveRule.INSTANCE, - // simplify expressions rules - ReduceExpressionsRule.FILTER_INSTANCE, - ReduceExpressionsRule.PROJECT_INSTANCE, - ReduceExpressionsRule.CALC_INSTANCE, - // merge and push unions rules UnionEliminatorRule.INSTANCE, http://git-wip-us.apache.org/repos/asf/flink/blob/8efacf58/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java index dece295..ebe79fa 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java @@ -482,7 +482,7 @@ public class TableEnvironmentITCase extends TableProgramsCollectionTestBase { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - CalciteConfig cc = new CalciteConfigBuilder().replaceRuleSet(RuleSets.ofList()).build(); + CalciteConfig cc = new CalciteConfigBuilder().replaceOptRuleSet(RuleSets.ofList()).build(); tableEnv.getConfig().setCalciteConfig(cc); DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); http://git-wip-us.apache.org/repos/asf/flink/blob/8efacf58/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/AggregationTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/AggregationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/AggregationTest.scala index 708e007..aad3403 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/AggregationTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/AggregationTest.scala @@ -230,7 +230,9 @@ class AggregationTest extends TableTestBase { val calcNode = unaryNode( "DataSetCalc", batchTableNode(0), - term("select", "a", "b", "c"), + // ReduceExpressionsRule will add cast for Project node by force + // if the input of the Project node has constant expression. + term("select", "CAST(1) AS a", "b", "c"), term("where", "=(a, 1)") ) http://git-wip-us.apache.org/repos/asf/flink/blob/8efacf58/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala index e69bd11..6c07e28 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala @@ -18,12 +18,12 @@ package org.apache.flink.table -import org.apache.calcite.rel.rules.{CalcSplitRule, CalcMergeRule, FilterMergeRule} -import org.apache.calcite.sql.fun.{SqlStdOperatorTable, OracleSqlOperatorTable} +import org.apache.calcite.rel.rules._ +import org.apache.calcite.sql.fun.{OracleSqlOperatorTable, SqlStdOperatorTable} import org.apache.calcite.tools.RuleSets -import org.apache.flink.table.calcite.{CalciteConfigBuilder, CalciteConfig} -import org.junit.Test +import org.apache.flink.table.calcite.{CalciteConfig, CalciteConfigBuilder} import org.junit.Assert._ +import org.junit.Test import scala.collection.JavaConverters._ @@ -32,38 +32,117 @@ class CalciteConfigBuilderTest { @Test def testDefaultRules(): Unit = { + val cc: CalciteConfig = new CalciteConfigBuilder().build() + + assertFalse(cc.replacesNormRuleSet) + assertFalse(cc.getNormRuleSet.isDefined) + + assertFalse(cc.replacesOptRuleSet) + assertFalse(cc.getOptRuleSet.isDefined) + } + + @Test + def testRules(): Unit = { + + val cc: CalciteConfig = new CalciteConfigBuilder() + .addNormRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE)) + .replaceOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE)) + .build() + + assertFalse(cc.replacesNormRuleSet) + assertTrue(cc.getNormRuleSet.isDefined) + + assertTrue(cc.replacesOptRuleSet) + assertTrue(cc.getOptRuleSet.isDefined) + } + + @Test + def testReplaceNormalizationRules(): Unit = { + + val cc: CalciteConfig = new CalciteConfigBuilder() + .replaceNormRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE)) + .build() + + assertEquals(true, cc.replacesNormRuleSet) + assertTrue(cc.getNormRuleSet.isDefined) + val cSet = cc.getNormRuleSet.get.iterator().asScala.toSet + assertEquals(1, cSet.size) + assertTrue(cSet.contains(ReduceExpressionsRule.FILTER_INSTANCE)) + } + + @Test + def testReplaceNormalizationAddRules(): Unit = { + + val cc: CalciteConfig = new CalciteConfigBuilder() + .replaceNormRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE)) + .addNormRuleSet(RuleSets.ofList(ReduceExpressionsRule.PROJECT_INSTANCE)) + .build() + + assertEquals(true, cc.replacesNormRuleSet) + assertTrue(cc.getNormRuleSet.isDefined) + val cSet = cc.getNormRuleSet.get.iterator().asScala.toSet + assertEquals(2, cSet.size) + assertTrue(cSet.contains(ReduceExpressionsRule.FILTER_INSTANCE)) + assertTrue(cSet.contains(ReduceExpressionsRule.PROJECT_INSTANCE)) + } + + @Test + def testAddNormalizationRules(): Unit = { + + val cc: CalciteConfig = new CalciteConfigBuilder() + .addNormRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE)) + .build() + + assertEquals(false, cc.replacesNormRuleSet) + assertTrue(cc.getNormRuleSet.isDefined) + val cSet = cc.getNormRuleSet.get.iterator().asScala.toSet + assertEquals(1, cSet.size) + assertTrue(cSet.contains(ReduceExpressionsRule.FILTER_INSTANCE)) + } + + @Test + def testAddAddNormalizationRules(): Unit = { + val cc: CalciteConfig = new CalciteConfigBuilder() + .addNormRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE)) + .addNormRuleSet(RuleSets.ofList(ReduceExpressionsRule.PROJECT_INSTANCE, + ReduceExpressionsRule.CALC_INSTANCE)) .build() - assertEquals(false, cc.replacesRuleSet) - assertFalse(cc.getRuleSet.isDefined) + assertEquals(false, cc.replacesNormRuleSet) + assertTrue(cc.getNormRuleSet.isDefined) + val cList = cc.getNormRuleSet.get.iterator().asScala.toList + assertEquals(3, cList.size) + assertEquals(cList.head, ReduceExpressionsRule.FILTER_INSTANCE) + assertEquals(cList(1), ReduceExpressionsRule.PROJECT_INSTANCE) + assertEquals(cList(2), ReduceExpressionsRule.CALC_INSTANCE) } @Test - def testReplaceRules(): Unit = { + def testReplaceOptimizationRules(): Unit = { val cc: CalciteConfig = new CalciteConfigBuilder() - .replaceRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE)) + .replaceOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE)) .build() - assertEquals(true, cc.replacesRuleSet) - assertTrue(cc.getRuleSet.isDefined) - val cSet = cc.getRuleSet.get.iterator().asScala.toSet + assertEquals(true, cc.replacesOptRuleSet) + assertTrue(cc.getOptRuleSet.isDefined) + val cSet = cc.getOptRuleSet.get.iterator().asScala.toSet assertEquals(1, cSet.size) assertTrue(cSet.contains(FilterMergeRule.INSTANCE)) } @Test - def testReplaceAddRules(): Unit = { + def testReplaceOptimizationAddRules(): Unit = { val cc: CalciteConfig = new CalciteConfigBuilder() - .replaceRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE)) - .addRuleSet(RuleSets.ofList(CalcMergeRule.INSTANCE, CalcSplitRule.INSTANCE)) + .replaceOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE)) + .addOptRuleSet(RuleSets.ofList(CalcMergeRule.INSTANCE, CalcSplitRule.INSTANCE)) .build() - assertEquals(true, cc.replacesRuleSet) - assertTrue(cc.getRuleSet.isDefined) - val cSet = cc.getRuleSet.get.iterator().asScala.toSet + assertEquals(true, cc.replacesOptRuleSet) + assertTrue(cc.getOptRuleSet.isDefined) + val cSet = cc.getOptRuleSet.get.iterator().asScala.toSet assertEquals(3, cSet.size) assertTrue(cSet.contains(FilterMergeRule.INSTANCE)) assertTrue(cSet.contains(CalcMergeRule.INSTANCE)) @@ -71,30 +150,30 @@ class CalciteConfigBuilderTest { } @Test - def testAddRules(): Unit = { + def testAddOptimizationRules(): Unit = { val cc: CalciteConfig = new CalciteConfigBuilder() - .addRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE)) + .addOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE)) .build() - assertEquals(false, cc.replacesRuleSet) - assertTrue(cc.getRuleSet.isDefined) - val cSet = cc.getRuleSet.get.iterator().asScala.toSet + assertEquals(false, cc.replacesOptRuleSet) + assertTrue(cc.getOptRuleSet.isDefined) + val cSet = cc.getOptRuleSet.get.iterator().asScala.toSet assertEquals(1, cSet.size) assertTrue(cSet.contains(FilterMergeRule.INSTANCE)) } @Test - def testAddAddRules(): Unit = { + def testAddAddOptimizationRules(): Unit = { val cc: CalciteConfig = new CalciteConfigBuilder() - .addRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE)) - .addRuleSet(RuleSets.ofList(CalcMergeRule.INSTANCE, CalcSplitRule.INSTANCE)) + .addOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE)) + .addOptRuleSet(RuleSets.ofList(CalcMergeRule.INSTANCE, CalcSplitRule.INSTANCE)) .build() - assertEquals(false, cc.replacesRuleSet) - assertTrue(cc.getRuleSet.isDefined) - val cSet = cc.getRuleSet.get.iterator().asScala.toSet + assertEquals(false, cc.replacesOptRuleSet) + assertTrue(cc.getOptRuleSet.isDefined) + val cSet = cc.getOptRuleSet.get.iterator().asScala.toSet assertEquals(3, cSet.size) assertTrue(cSet.contains(FilterMergeRule.INSTANCE)) assertTrue(cSet.contains(CalcMergeRule.INSTANCE)) http://git-wip-us.apache.org/repos/asf/flink/blob/8efacf58/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala index 1f73427..8ce27e8 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala @@ -285,7 +285,9 @@ class MockTableEnvironment extends TableEnvironment(new TableConfig) { override protected def checkValidTableName(name: String): Unit = ??? - override protected def getBuiltInRuleSet: RuleSet = ??? + override protected def getBuiltInNormRuleSet: RuleSet = ??? + + override protected def getBuiltInOptRuleSet: RuleSet = ??? override def registerTableSource(name: String, tableSource: TableSource[_]) = ??? } http://git-wip-us.apache.org/repos/asf/flink/blob/8efacf58/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala index be98a89..d70a32a 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala @@ -79,7 +79,7 @@ class SetOperatorsTest extends TableTestBase { term("join", "b_long", "a_long"), term("joinType", "InnerJoin") ), - term("select", "true AS $f0", "a_long") + term("select", "a_long", "true AS $f0") ), term("groupBy", "a_long"), term("select", "a_long", "MIN($f0) AS $f1") http://git-wip-us.apache.org/repos/asf/flink/blob/8efacf58/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala index a7da5b5..d053b9f 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala @@ -297,8 +297,8 @@ class FieldProjectionTest extends TableTestBase { term("groupBy", "word"), term("select", "word", "SUM(frequency) AS TMP_0") ), - term("select", "word, frequency"), - term("where", "=(frequency, 2)") + term("select", "word, TMP_0 AS frequency"), + term("where", "=(TMP_0, 2)") ) util.verifyTable(resultTable, expected) http://git-wip-us.apache.org/repos/asf/flink/blob/8efacf58/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala index b4327ec..679942c 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala @@ -18,6 +18,7 @@ package org.apache.flink.table.expressions.utils +import org.apache.calcite.plan.hep.{HepMatchOrder, HepPlanner, HepProgramBuilder} import org.apache.calcite.rex.RexNode import org.apache.calcite.sql.`type`.SqlTypeName._ import org.apache.calcite.sql2rel.RelDecorrelator @@ -58,6 +59,16 @@ abstract class ExpressionTestBase { context._2.getTypeFactory) private val optProgram = Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES) + private def hepPlanner = { + val builder = new HepProgramBuilder + builder.addMatchOrder(HepMatchOrder.BOTTOM_UP) + val it = FlinkRuleSets.DATASET_NORM_RULES.iterator() + while (it.hasNext) { + builder.addRuleInstance(it.next()) + } + new HepPlanner(builder.build, context._2.getFrameworkConfig.getContext) + } + private def prepareContext(typeInfo: TypeInformation[Any]): (RelBuilder, TableEnvironment) = { // create DataSetTable val dataSetMock = mock(classOf[DataSet[Any]]) @@ -140,10 +151,20 @@ abstract class ExpressionTestBase { val validated = planner.validate(parsed) val converted = planner.rel(validated).rel - // create DataSetCalc val decorPlan = RelDecorrelator.decorrelateQuery(converted) + + // normalize + val normalizedPlan = if (FlinkRuleSets.DATASET_NORM_RULES.iterator().hasNext) { + val planner = hepPlanner + planner.setRoot(decorPlan) + planner.findBestExp + } else { + decorPlan + } + + // create DataSetCalc val flinkOutputProps = converted.getTraitSet.replace(DataSetConvention.INSTANCE).simplify() - val dataSetCalc = optProgram.run(context._2.getPlanner, decorPlan, flinkOutputProps) + val dataSetCalc = optProgram.run(context._2.getPlanner, normalizedPlan, flinkOutputProps) // extract RexNode val calcProgram = dataSetCalc http://git-wip-us.apache.org/repos/asf/flink/blob/8efacf58/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/NormalizationRulesTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/NormalizationRulesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/NormalizationRulesTest.scala new file mode 100644 index 0000000..8b6d6cf --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/NormalizationRulesTest.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.rules + +import org.apache.calcite.rel.rules.AggregateExpandDistinctAggregatesRule +import org.apache.calcite.tools.RuleSets +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.calcite.{CalciteConfig, CalciteConfigBuilder} +import org.apache.flink.table.utils.TableTestBase +import org.apache.flink.table.utils.TableTestUtil._ +import org.junit.Test + +class NormalizationRulesTest extends TableTestBase { + + @Test + def testApplyNormalizationRuleForForBatchSQL(): Unit = { + val util = batchTestUtil() + + // rewrite distinct aggregate + val cc: CalciteConfig = new CalciteConfigBuilder() + .replaceNormRuleSet(RuleSets.ofList(AggregateExpandDistinctAggregatesRule.JOIN)) + .replaceOptRuleSet(RuleSets.ofList()) + .build() + util.tEnv.getConfig.setCalciteConfig(cc) + + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) + + val sqlQuery = "SELECT " + + "COUNT(DISTINCT a)" + + "FROM MyTable group by b" + + // expect double aggregate + val expected = unaryNode("LogicalProject", + unaryNode("LogicalAggregate", + unaryNode("LogicalAggregate", + unaryNode("LogicalProject", + values("LogicalTableScan", term("table", "[MyTable]")), + term("b", "$1"), term("a", "$0")), + term("group", "{0, 1}")), + term("group", "{0}"), term("EXPR$0", "COUNT($1)") + ), + term("EXPR$0", "$1") + ) + + util.verifySql(sqlQuery, expected) + } + + @Test + def testApplyNormalizationRuleForForStreamSQL(): Unit = { + val util = streamTestUtil() + + // rewrite distinct aggregate + val cc: CalciteConfig = new CalciteConfigBuilder() + .replaceNormRuleSet(RuleSets.ofList(AggregateExpandDistinctAggregatesRule.JOIN)) + .replaceOptRuleSet(RuleSets.ofList()) + .build() + util.tEnv.getConfig.setCalciteConfig(cc) + + util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) + + val sqlQuery = "SELECT " + + "COUNT(DISTINCT a)" + + "FROM MyTable group by b" + + // expect double aggregate + val expected = unaryNode( + "LogicalProject", + unaryNode("LogicalAggregate", + unaryNode("LogicalAggregate", + unaryNode("LogicalProject", + values("LogicalTableScan", term("table", "[MyTable]")), + term("b", "$1"), term("a", "$0")), + term("group", "{0, 1}")), + term("group", "{0}"), term("EXPR$0", "COUNT($1)") + ), + term("EXPR$0", "$1") + ) + + util.verifySql(sqlQuery, expected) + } + +}