spark git commit: [SPARK-11390][SQL] Query plan with/without filterPushdown indistinguishable

2015-11-16 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 fbe65c592 -> 90d71bff0


[SPARK-11390][SQL] Query plan with/without filterPushdown indistinguishable

…ishable

Propagate pushed filters to PhyicalRDD in DataSourceStrategy.apply

Author: Zee Chen 

Closes #9679 from zeocio/spark-11390.

(cherry picked from commit 985b38dd2fa5d8f1e23f1c420ce6262e7e3ed181)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90d71bff
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90d71bff
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90d71bff

Branch: refs/heads/branch-1.6
Commit: 90d71bff0c583830aa3fd96b1dd3607f0cb0cbee
Parents: fbe65c5
Author: Zee Chen 
Authored: Mon Nov 16 14:21:28 2015 -0800
Committer: Michael Armbrust 
Committed: Mon Nov 16 14:21:41 2015 -0800

--
 .../org/apache/spark/sql/execution/ExistingRDD.scala  |  6 --
 .../execution/datasources/DataSourceStrategy.scala|  6 --
 .../org/apache/spark/sql/execution/PlannerSuite.scala | 14 ++
 3 files changed, 22 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/90d71bff/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 8b41d3d..62620ec 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -106,7 +106,9 @@ private[sql] object PhysicalRDD {
   def createFromDataSource(
   output: Seq[Attribute],
   rdd: RDD[InternalRow],
-  relation: BaseRelation): PhysicalRDD = {
-PhysicalRDD(output, rdd, relation.toString, 
relation.isInstanceOf[HadoopFsRelation])
+  relation: BaseRelation,
+  extraInformation: String = ""): PhysicalRDD = {
+PhysicalRDD(output, rdd, relation.toString + extraInformation,
+  relation.isInstanceOf[HadoopFsRelation])
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/90d71bff/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 9bbbfa7..544d5ec 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -315,6 +315,8 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
 // `Filter`s or cannot be handled by `relation`.
 val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And)
 
+val pushedFiltersString = pushedFilters.mkString(" PushedFilter: [", ",", 
"] ")
+
 if (projects.map(_.toAttribute) == projects &&
 projectSet.size == projects.size &&
 filterSet.subsetOf(projectSet)) {
@@ -332,7 +334,7 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
   val scan = execution.PhysicalRDD.createFromDataSource(
 projects.map(_.toAttribute),
 scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
-relation.relation)
+relation.relation, pushedFiltersString)
   filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)
 } else {
   // Don't request columns that are only referenced by pushed filters.
@@ -342,7 +344,7 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
   val scan = execution.PhysicalRDD.createFromDataSource(
 requestedColumns,
 scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
-relation.relation)
+relation.relation, pushedFiltersString)
   execution.Project(
 projects, filterCondition.map(execution.Filter(_, 
scan)).getOrElse(scan))
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/90d71bff/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index be53ec3..dfec139 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ 

spark git commit: [SPARK-11390][SQL] Query plan with/without filterPushdown indistinguishable

2015-11-16 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master b1a966262 -> 985b38dd2


[SPARK-11390][SQL] Query plan with/without filterPushdown indistinguishable

…ishable

Propagate pushed filters to PhyicalRDD in DataSourceStrategy.apply

Author: Zee Chen 

Closes #9679 from zeocio/spark-11390.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/985b38dd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/985b38dd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/985b38dd

Branch: refs/heads/master
Commit: 985b38dd2fa5d8f1e23f1c420ce6262e7e3ed181
Parents: b1a9662
Author: Zee Chen 
Authored: Mon Nov 16 14:21:28 2015 -0800
Committer: Michael Armbrust 
Committed: Mon Nov 16 14:21:28 2015 -0800

--
 .../org/apache/spark/sql/execution/ExistingRDD.scala  |  6 --
 .../execution/datasources/DataSourceStrategy.scala|  6 --
 .../org/apache/spark/sql/execution/PlannerSuite.scala | 14 ++
 3 files changed, 22 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/985b38dd/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 8b41d3d..62620ec 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -106,7 +106,9 @@ private[sql] object PhysicalRDD {
   def createFromDataSource(
   output: Seq[Attribute],
   rdd: RDD[InternalRow],
-  relation: BaseRelation): PhysicalRDD = {
-PhysicalRDD(output, rdd, relation.toString, 
relation.isInstanceOf[HadoopFsRelation])
+  relation: BaseRelation,
+  extraInformation: String = ""): PhysicalRDD = {
+PhysicalRDD(output, rdd, relation.toString + extraInformation,
+  relation.isInstanceOf[HadoopFsRelation])
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/985b38dd/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 9bbbfa7..544d5ec 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -315,6 +315,8 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
 // `Filter`s or cannot be handled by `relation`.
 val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And)
 
+val pushedFiltersString = pushedFilters.mkString(" PushedFilter: [", ",", 
"] ")
+
 if (projects.map(_.toAttribute) == projects &&
 projectSet.size == projects.size &&
 filterSet.subsetOf(projectSet)) {
@@ -332,7 +334,7 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
   val scan = execution.PhysicalRDD.createFromDataSource(
 projects.map(_.toAttribute),
 scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
-relation.relation)
+relation.relation, pushedFiltersString)
   filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)
 } else {
   // Don't request columns that are only referenced by pushed filters.
@@ -342,7 +344,7 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
   val scan = execution.PhysicalRDD.createFromDataSource(
 requestedColumns,
 scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
-relation.relation)
+relation.relation, pushedFiltersString)
   execution.Project(
 projects, filterCondition.map(execution.Filter(_, 
scan)).getOrElse(scan))
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/985b38dd/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index be53ec3..dfec139 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -160,6 +160,20 @@ class PlannerSuite extends SharedSQLContext {
 }
   }