spark git commit: [SPARK-16343][SQL] Improve the PushDownPredicate rule to pushdown predicates correctly in non-deterministic condition.
Repository: spark Updated Branches: refs/heads/branch-2.0 ca8130050 -> 7ffafa3bf [SPARK-16343][SQL] Improve the PushDownPredicate rule to pushdown predicates correctly in non-deterministic condition. ## What changes were proposed in this pull request? Currently our Optimizer may reorder the predicates to run them more efficient, but in non-deterministic condition, change the order between deterministic parts and non-deterministic parts may change the number of input rows. For example: ```SELECT a FROM t WHERE rand() < 0.1 AND a = 1``` And ```SELECT a FROM t WHERE a = 1 AND rand() < 0.1``` may call rand() for different times and therefore the output rows differ. This PR improved this condition by checking whether the predicate is placed before any non-deterministic predicates. ## How was this patch tested? Expanded related testcases in FilterPushdownSuite. Author: èæå Closes #14012 from jiangxb1987/ppd. (cherry picked from commit f376c37268848dbb4b2fb57677e22ef2bf207b49) Signed-off-by: Josh Rosen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7ffafa3b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7ffafa3b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7ffafa3b Branch: refs/heads/branch-2.0 Commit: 7ffafa3bfecb8bc92b79eddea1ca18166efd3385 Parents: ca81300 Author: èæå Authored: Thu Jul 14 00:21:27 2016 +0800 Committer: Josh Rosen Committed: Thu Sep 29 11:44:00 2016 -0700 -- .../sql/catalyst/optimizer/Optimizer.scala | 44 +--- .../optimizer/FilterPushdownSuite.scala | 8 ++-- 2 files changed, 33 insertions(+), 19 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7ffafa3b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index d824c2e..35b122d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1031,19 +1031,23 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild)) // Push [[Filter]] operators through [[Window]] operators. Parts of the predicate that can be -// pushed beneath must satisfy the following two conditions: +// pushed beneath must satisfy the following conditions: // 1. All the expressions are part of window partitioning key. The expressions can be compound. -// 2. Deterministic +// 2. Deterministic. +// 3. Placed before any non-deterministic predicates. case filter @ Filter(condition, w: Window) if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) => val partitionAttrs = AttributeSet(w.partitionSpec.flatMap(_.references)) - val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond => -cond.references.subsetOf(partitionAttrs) && cond.deterministic && - // This is for ensuring all the partitioning expressions have been converted to alias - // in Analyzer. Thus, we do not need to check if the expressions in conditions are - // the same as the expressions used in partitioning columns. - partitionAttrs.forall(_.isInstanceOf[Attribute]) + + val (candidates, containingNonDeterministic) = +splitConjunctivePredicates(condition).span(_.deterministic) + + val (pushDown, rest) = candidates.partition { cond => +cond.references.subsetOf(partitionAttrs) } + + val stayUp = rest ++ containingNonDeterministic + if (pushDown.nonEmpty) { val pushDownPredicate = pushDown.reduce(And) val newWindow = w.copy(child = Filter(pushDownPredicate, w.child)) @@ -1062,11 +1066,16 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { // For each filter, expand the alias and check if the filter can be evaluated using // attributes produced by the aggregate operator's child operator. - val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond => + val (candidates, containingNonDeterministic) = +splitConjunctivePredicates(condition).span(_.deterministic) + + val (pushDown, rest) = candidates.partition { cond => val replaced = replaceAlias(cond, aliasMap) -replaced.references.subsetOf(aggregate.child.outputSet) && replaced.deterministic +replaced.references.subsetOf(aggregate.child.outputSet
spark git commit: [SPARK-16343][SQL] Improve the PushDownPredicate rule to pushdown predicates correctly in non-deterministic condition.
Repository: spark Updated Branches: refs/heads/master ea06e4ef3 -> f376c3726 [SPARK-16343][SQL] Improve the PushDownPredicate rule to pushdown predicates correctly in non-deterministic condition. ## What changes were proposed in this pull request? Currently our Optimizer may reorder the predicates to run them more efficient, but in non-deterministic condition, change the order between deterministic parts and non-deterministic parts may change the number of input rows. For example: ```SELECT a FROM t WHERE rand() < 0.1 AND a = 1``` And ```SELECT a FROM t WHERE a = 1 AND rand() < 0.1``` may call rand() for different times and therefore the output rows differ. This PR improved this condition by checking whether the predicate is placed before any non-deterministic predicates. ## How was this patch tested? Expanded related testcases in FilterPushdownSuite. Author: èæå Closes #14012 from jiangxb1987/ppd. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f376c372 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f376c372 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f376c372 Branch: refs/heads/master Commit: f376c37268848dbb4b2fb57677e22ef2bf207b49 Parents: ea06e4e Author: èæå Authored: Thu Jul 14 00:21:27 2016 +0800 Committer: Cheng Lian Committed: Thu Jul 14 00:21:27 2016 +0800 -- .../sql/catalyst/optimizer/Optimizer.scala | 44 +--- .../optimizer/FilterPushdownSuite.scala | 8 ++-- 2 files changed, 33 insertions(+), 19 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f376c372/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 368e9a5..08fb019 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1128,19 +1128,23 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild)) // Push [[Filter]] operators through [[Window]] operators. Parts of the predicate that can be -// pushed beneath must satisfy the following two conditions: +// pushed beneath must satisfy the following conditions: // 1. All the expressions are part of window partitioning key. The expressions can be compound. -// 2. Deterministic +// 2. Deterministic. +// 3. Placed before any non-deterministic predicates. case filter @ Filter(condition, w: Window) if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) => val partitionAttrs = AttributeSet(w.partitionSpec.flatMap(_.references)) - val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond => -cond.references.subsetOf(partitionAttrs) && cond.deterministic && - // This is for ensuring all the partitioning expressions have been converted to alias - // in Analyzer. Thus, we do not need to check if the expressions in conditions are - // the same as the expressions used in partitioning columns. - partitionAttrs.forall(_.isInstanceOf[Attribute]) + + val (candidates, containingNonDeterministic) = +splitConjunctivePredicates(condition).span(_.deterministic) + + val (pushDown, rest) = candidates.partition { cond => +cond.references.subsetOf(partitionAttrs) } + + val stayUp = rest ++ containingNonDeterministic + if (pushDown.nonEmpty) { val pushDownPredicate = pushDown.reduce(And) val newWindow = w.copy(child = Filter(pushDownPredicate, w.child)) @@ -1159,11 +1163,16 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { // For each filter, expand the alias and check if the filter can be evaluated using // attributes produced by the aggregate operator's child operator. - val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond => + val (candidates, containingNonDeterministic) = +splitConjunctivePredicates(condition).span(_.deterministic) + + val (pushDown, rest) = candidates.partition { cond => val replaced = replaceAlias(cond, aliasMap) -replaced.references.subsetOf(aggregate.child.outputSet) && replaced.deterministic +replaced.references.subsetOf(aggregate.child.outputSet) } + val stayUp = rest ++ containingNonDeterministic + if (pushDown.nonEmpty) {