Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/20485#discussion_r165579903
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
---
@@ -81,33 +81,44 @@ object PushDownOperatorsToDataSource extends
Rule[LogicalPlan] with PredicateHel
// TODO: add more push down rules.
- pushDownRequiredColumns(filterPushed, filterPushed.outputSet)
+ val columnPruned = pushDownRequiredColumns(filterPushed,
filterPushed.outputSet)
// After column pruning, we may have redundant PROJECT nodes in the
query plan, remove them.
- RemoveRedundantProject(filterPushed)
+ RemoveRedundantProject(columnPruned)
}
// TODO: nested fields pruning
- private def pushDownRequiredColumns(plan: LogicalPlan, requiredByParent:
AttributeSet): Unit = {
+ private def pushDownRequiredColumns(
+ plan: LogicalPlan, requiredByParent: AttributeSet): LogicalPlan = {
plan match {
- case Project(projectList, child) =>
+ case p @ Project(projectList, child) =>
val required = projectList.flatMap(_.references)
- pushDownRequiredColumns(child, AttributeSet(required))
+ p.copy(child = pushDownRequiredColumns(child,
AttributeSet(required)))
- case Filter(condition, child) =>
+ case f @ Filter(condition, child) =>
val required = requiredByParent ++ condition.references
- pushDownRequiredColumns(child, required)
+ f.copy(child = pushDownRequiredColumns(child, required))
case relation: DataSourceV2Relation => relation.reader match {
case reader: SupportsPushDownRequiredColumns =>
+ // TODO: Enable the below assert after we make
`DataSourceV2Relation` immutable. Fow now
+ // it's possible that the mutable reader being updated by
someone else, and we need to
+ // always call `reader.pruneColumns` here to correct it.
+ // assert(relation.output.toStructType == reader.readSchema(),
+ // "Schema of data source reader does not match the relation
plan.")
+
val requiredColumns =
relation.output.filter(requiredByParent.contains)
reader.pruneColumns(requiredColumns.toStructType)
- case _ =>
+ val nameToAttr =
relation.output.map(_.name).zip(relation.output).toMap
+ val newOutput = reader.readSchema().map(_.name).map(nameToAttr)
+ relation.copy(output = newOutput)
--- End diff --
@rdblue This is the bug I mentioned before. Finally I figured out a way to
fix it surgically: always run column pruning even no column needs to be pruned.
This helps us correct the required schema of the reader, if it's updated by
someone else.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]