spark git commit: [SPARK-16686][SQL] Remove PushProjectThroughSample since it is handled by ColumnPruning
Repository: spark Updated Branches: refs/heads/branch-2.0 d0707c6ba -> 3276ccfac [SPARK-16686][SQL] Remove PushProjectThroughSample since it is handled by ColumnPruning We push down `Project` through `Sample` in `Optimizer` by the rule `PushProjectThroughSample`. However, if the projected columns produce new output, they will encounter whole data instead of sampled data. It will bring some inconsistency between original plan (Sample then Project) and optimized plan (Project then Sample). In the extreme case such as attached in the JIRA, if the projected column is an UDF which is supposed to not see the sampled out data, the result of UDF will be incorrect. Since the rule `ColumnPruning` already handles general `Project` pushdown. We don't need `PushProjectThroughSample` anymore. The rule `ColumnPruning` also avoids the described issue. Jenkins tests. Author: Liang-Chi HsiehCloses #14327 from viirya/fix-sample-pushdown. (cherry picked from commit 7b06a8948fc16d3c14e240fdd632b79ce1651008) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3276ccfa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3276ccfa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3276ccfa Branch: refs/heads/branch-2.0 Commit: 3276ccfac807514d5a959415bcf58d2aa6ed8fbc Parents: d0707c6 Author: Liang-Chi Hsieh Authored: Tue Jul 26 12:00:01 2016 +0800 Committer: Reynold Xin Committed: Fri Aug 19 11:18:55 2016 -0700 -- .../sql/catalyst/optimizer/Optimizer.scala | 12 -- .../catalyst/optimizer/ColumnPruningSuite.scala | 15 .../optimizer/FilterPushdownSuite.scala | 17 - .../org/apache/spark/sql/DatasetSuite.scala | 25 4 files changed, 40 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3276ccfa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 19d3c39..88cc0e4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -75,7 +75,6 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) Batch("Operator Optimizations", fixedPoint, // Operator push down PushThroughSetOperations, - PushProjectThroughSample, ReorderJoin, EliminateOuterJoin, PushPredicateThroughJoin, @@ -147,17 +146,6 @@ class SimpleTestOptimizer extends Optimizer( new SimpleCatalystConf(caseSensitiveAnalysis = true)) /** - * Pushes projects down beneath Sample to enable column pruning with sampling. - */ -object PushProjectThroughSample extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan transform { -// Push down projection into sample -case Project(projectList, Sample(lb, up, replace, seed, child)) => - Sample(lb, up, replace, seed, Project(projectList, child))() - } -} - -/** * Removes the Project only conducting Alias of its child node. * It is created mainly for removing extra Project added in EliminateSerialization rule, * but can also benefit other operators. http://git-wip-us.apache.org/repos/asf/spark/blob/3276ccfa/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala index b5664a5..589607e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala @@ -346,5 +346,20 @@ class ColumnPruningSuite extends PlanTest { comparePlans(Optimize.execute(plan1.analyze), correctAnswer1) } + test("push project down into sample") { +val testRelation = LocalRelation('a.int, 'b.int, 'c.int) +val x = testRelation.subquery('x) + +val query1 = Sample(0.0, 0.6, false, 11L, x)().select('a) +val optimized1 = Optimize.execute(query1.analyze) +val expected1 = Sample(0.0, 0.6, false, 11L, x.select('a))() +comparePlans(optimized1, expected1.analyze) + +val query2 = Sample(0.0, 0.6, false, 11L, x)().select('a
spark git commit: [SPARK-16686][SQL] Remove PushProjectThroughSample since it is handled by ColumnPruning
Repository: spark Updated Branches: refs/heads/master 815f3eece -> 7b06a8948 [SPARK-16686][SQL] Remove PushProjectThroughSample since it is handled by ColumnPruning ## What changes were proposed in this pull request? We push down `Project` through `Sample` in `Optimizer` by the rule `PushProjectThroughSample`. However, if the projected columns produce new output, they will encounter whole data instead of sampled data. It will bring some inconsistency between original plan (Sample then Project) and optimized plan (Project then Sample). In the extreme case such as attached in the JIRA, if the projected column is an UDF which is supposed to not see the sampled out data, the result of UDF will be incorrect. Since the rule `ColumnPruning` already handles general `Project` pushdown. We don't need `PushProjectThroughSample` anymore. The rule `ColumnPruning` also avoids the described issue. ## How was this patch tested? Jenkins tests. Author: Liang-Chi HsiehCloses #14327 from viirya/fix-sample-pushdown. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7b06a894 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7b06a894 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7b06a894 Branch: refs/heads/master Commit: 7b06a8948fc16d3c14e240fdd632b79ce1651008 Parents: 815f3ee Author: Liang-Chi Hsieh Authored: Tue Jul 26 12:00:01 2016 +0800 Committer: Wenchen Fan Committed: Tue Jul 26 12:00:01 2016 +0800 -- .../sql/catalyst/optimizer/Optimizer.scala | 12 -- .../catalyst/optimizer/ColumnPruningSuite.scala | 15 .../optimizer/FilterPushdownSuite.scala | 17 - .../org/apache/spark/sql/DatasetSuite.scala | 25 4 files changed, 40 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7b06a894/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index c8e9d8e..fe328fd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -76,7 +76,6 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) Batch("Operator Optimizations", fixedPoint, // Operator push down PushThroughSetOperations, - PushProjectThroughSample, ReorderJoin, EliminateOuterJoin, PushPredicateThroughJoin, @@ -149,17 +148,6 @@ class SimpleTestOptimizer extends Optimizer( new SimpleCatalystConf(caseSensitiveAnalysis = true)) /** - * Pushes projects down beneath Sample to enable column pruning with sampling. - */ -object PushProjectThroughSample extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan transform { -// Push down projection into sample -case Project(projectList, Sample(lb, up, replace, seed, child)) => - Sample(lb, up, replace, seed, Project(projectList, child))() - } -} - -/** * Removes the Project only conducting Alias of its child node. * It is created mainly for removing extra Project added in EliminateSerialization rule, * but can also benefit other operators. http://git-wip-us.apache.org/repos/asf/spark/blob/7b06a894/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala index b5664a5..589607e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala @@ -346,5 +346,20 @@ class ColumnPruningSuite extends PlanTest { comparePlans(Optimize.execute(plan1.analyze), correctAnswer1) } + test("push project down into sample") { +val testRelation = LocalRelation('a.int, 'b.int, 'c.int) +val x = testRelation.subquery('x) + +val query1 = Sample(0.0, 0.6, false, 11L, x)().select('a) +val optimized1 = Optimize.execute(query1.analyze) +val expected1 = Sample(0.0, 0.6, false, 11L, x.select('a))() +comparePlans(optimized1, expected1.analyze) + +val query2 = Sample(0.0, 0.6, false, 11L, x)().select('a as 'aa) +val optimized2 =