[GitHub] spark pull request #20485: [SPARK-23315][SQL] failed to get output from cano...

2018-02-06 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20485


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20485: [SPARK-23315][SQL] failed to get output from cano...

2018-02-06 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20485#discussion_r166436103
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
 ---
@@ -81,33 +81,44 @@ object PushDownOperatorsToDataSource extends 
Rule[LogicalPlan] with PredicateHel
 
 // TODO: add more push down rules.
 
-pushDownRequiredColumns(filterPushed, filterPushed.outputSet)
+val columnPruned = pushDownRequiredColumns(filterPushed, 
filterPushed.outputSet)
 // After column pruning, we may have redundant PROJECT nodes in the 
query plan, remove them.
-RemoveRedundantProject(filterPushed)
+RemoveRedundantProject(columnPruned)
   }
 
   // TODO: nested fields pruning
-  private def pushDownRequiredColumns(plan: LogicalPlan, requiredByParent: 
AttributeSet): Unit = {
+  private def pushDownRequiredColumns(
+  plan: LogicalPlan, requiredByParent: AttributeSet): LogicalPlan = {
 plan match {
-  case Project(projectList, child) =>
+  case p @ Project(projectList, child) =>
 val required = projectList.flatMap(_.references)
-pushDownRequiredColumns(child, AttributeSet(required))
+p.copy(child = pushDownRequiredColumns(child, 
AttributeSet(required)))
 
-  case Filter(condition, child) =>
+  case f @ Filter(condition, child) =>
 val required = requiredByParent ++ condition.references
-pushDownRequiredColumns(child, required)
+f.copy(child = pushDownRequiredColumns(child, required))
 
   case relation: DataSourceV2Relation => relation.reader match {
 case reader: SupportsPushDownRequiredColumns =>
+  // TODO: Enable the below assert after we make 
`DataSourceV2Relation` immutable. Fow now
--- End diff --

Typo: Fow


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20485: [SPARK-23315][SQL] failed to get output from cano...

2018-02-02 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20485#discussion_r165579903
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
 ---
@@ -81,33 +81,44 @@ object PushDownOperatorsToDataSource extends 
Rule[LogicalPlan] with PredicateHel
 
 // TODO: add more push down rules.
 
-pushDownRequiredColumns(filterPushed, filterPushed.outputSet)
+val columnPruned = pushDownRequiredColumns(filterPushed, 
filterPushed.outputSet)
 // After column pruning, we may have redundant PROJECT nodes in the 
query plan, remove them.
-RemoveRedundantProject(filterPushed)
+RemoveRedundantProject(columnPruned)
   }
 
   // TODO: nested fields pruning
-  private def pushDownRequiredColumns(plan: LogicalPlan, requiredByParent: 
AttributeSet): Unit = {
+  private def pushDownRequiredColumns(
+  plan: LogicalPlan, requiredByParent: AttributeSet): LogicalPlan = {
 plan match {
-  case Project(projectList, child) =>
+  case p @ Project(projectList, child) =>
 val required = projectList.flatMap(_.references)
-pushDownRequiredColumns(child, AttributeSet(required))
+p.copy(child = pushDownRequiredColumns(child, 
AttributeSet(required)))
 
-  case Filter(condition, child) =>
+  case f @ Filter(condition, child) =>
 val required = requiredByParent ++ condition.references
-pushDownRequiredColumns(child, required)
+f.copy(child = pushDownRequiredColumns(child, required))
 
   case relation: DataSourceV2Relation => relation.reader match {
 case reader: SupportsPushDownRequiredColumns =>
+  // TODO: Enable the below assert after we make 
`DataSourceV2Relation` immutable. Fow now
+  // it's possible that the mutable reader being updated by 
someone else, and we need to
+  // always call `reader.pruneColumns` here to correct it.
+  // assert(relation.output.toStructType == reader.readSchema(),
+  //  "Schema of data source reader does not match the relation 
plan.")
+
   val requiredColumns = 
relation.output.filter(requiredByParent.contains)
   reader.pruneColumns(requiredColumns.toStructType)
 
-case _ =>
+  val nameToAttr = 
relation.output.map(_.name).zip(relation.output).toMap
+  val newOutput = reader.readSchema().map(_.name).map(nameToAttr)
+  relation.copy(output = newOutput)
--- End diff --

@rdblue This is the bug I mentioned before. Finally I figured out a way to 
fix it surgically: always run column pruning even no column needs to be pruned. 
This helps us correct the required schema of the reader, if it's updated by 
someone else.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20485: [SPARK-23315][SQL] failed to get output from cano...

2018-02-02 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20485#discussion_r165578555
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
 ---
@@ -99,15 +100,22 @@ object PushDownOperatorsToDataSource extends 
Rule[LogicalPlan] with PredicateHel
 
   case relation: DataSourceV2Relation => relation.reader match {
 case reader: SupportsPushDownRequiredColumns =>
+  // TODO: Enable the below assert after we make 
`DataSourceV2Relation` immutable. Fow now
+  // it's possible that the mutable reader being updated by 
someone else, and we need to
+  // always call `reader.pruneColumns` here to correct it.
+  // assert(relation.output.toStructType == reader.readSchema(),
+  //  "Schema of data source reader does not match the relation 
plan.")
+
   val requiredColumns = 
relation.output.filter(requiredByParent.contains)
   reader.pruneColumns(requiredColumns.toStructType)
+  relation.copy(output = requiredColumns)
--- End diff --

@rdblue This is the bug I mentioned before. Finally I figured out a way to 
fix it surgically: always run column pruning even no column needs to be pruned. 
This helps us correct the required schema of the reader, if it's updated by 
someone else.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20485: [SPARK-23315][SQL] failed to get output from cano...

2018-02-01 Thread cloud-fan
GitHub user cloud-fan opened a pull request:

https://github.com/apache/spark/pull/20485

[SPARK-23315][SQL] failed to get output from canonicalized data source v2 
related plans

## What changes were proposed in this pull request?

`DataSourceV2Relation`  keeps a `fullOutput` and resolves the real output 
on demand by column name lookup. This will be broken after we canonicalize the 
plan, because all attribute names become "None".

To fix this, `DataSourceV2Relation` should just keep `output`, and update 
the `output` when doing column pruning.

## How was this patch tested?

a new test case

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cloud-fan/spark canonicalize

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20485.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20485


commit 75950a1725f01c31764ac31d16acd6e2078956c6
Author: Wenchen Fan 
Date:   2018-02-02T07:53:07Z

failed to get output from canonicalized data source v2 related plans




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org