Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/21503#discussion_r194871032
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
---
@@ -17,15 +17,56 @@
package org.apache.spark.sql.execution.datasources.v2
-import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.{execution, Strategy}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference,
AttributeSet}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import
org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource,
WriteToContinuousDataSourceExec}
object DataSourceV2Strategy extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case r: DataSourceV2Relation =>
- DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters,
r.reader) :: Nil
+ case PhysicalOperation(project, filters, relation:
DataSourceV2Relation) =>
+ val projectSet = AttributeSet(project.flatMap(_.references))
+ val filterSet = AttributeSet(filters.flatMap(_.references))
+
+ val projection = if (filterSet.subsetOf(projectSet) &&
+ AttributeSet(relation.output) == projectSet) {
+ // When the required projection contains all of the filter columns
and column pruning alone
+ // can produce the required projection, push the required
projection.
+ // A final projection may still be needed if the data source
produces a different column
+ // order or if it cannot prune all of the nested columns.
+ relation.output
+ } else {
+ // When there are filter columns not already in the required
projection or when the required
+ // projection is more complicated than column pruning, base column
pruning on the set of
+ // all columns needed by both.
+ (projectSet ++ filterSet).toSeq
+ }
+
+ val reader = relation.newReader
--- End diff --
OK we need to make a decision here:
1. Apply operator pushdown twice(proposed by tihs PR). This moves the
pushdown logic to planner which is more ideal and cleaner. The drawback is,
before moving statistics to physical plan, we have some duplicated pushdown
code in `DataSourceV2Relation` and applying pushdown twice has performance
penalty.
2. Apply operator pushdown only once in the planner. Same as 1, it's
cleaner. The drawback is, before moving statistics to physical plan, data
source v2 can't report statistics after filter.
3. Apply operator pushdown only once in the optimizer(proposed by
https://github.com/apache/spark/pull/21319). It has no performance penalty and
we can report statistics after filter. The drawback is, before moving
statistics to physical plan, we have a temporary `DataSourceReader` in
`DataSourceV2Relation`, which is hacky.
The tradeoff is: shall we bear with hacky code and move forward with the
data source v2 operator pushdown support? or shall we make the code cleaner and
bear with some performance pemalty(apply pushdown twice or not report stats
after filter)? or shall we just hold back and think about how to move stats to
physical plan?
cc @marmbrus @jose-torres
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]