Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21503#discussion_r194871032
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 ---
    @@ -17,15 +17,56 @@
     
     package org.apache.spark.sql.execution.datasources.v2
     
    -import org.apache.spark.sql.Strategy
    +import org.apache.spark.sql.{execution, Strategy}
    +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet}
    +import org.apache.spark.sql.catalyst.planning.PhysicalOperation
     import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
     import org.apache.spark.sql.execution.SparkPlan
     import 
org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource,
 WriteToContinuousDataSourceExec}
     
     object DataSourceV2Strategy extends Strategy {
       override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    -    case r: DataSourceV2Relation =>
    -      DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, 
r.reader) :: Nil
    +    case PhysicalOperation(project, filters, relation: 
DataSourceV2Relation) =>
    +      val projectSet = AttributeSet(project.flatMap(_.references))
    +      val filterSet = AttributeSet(filters.flatMap(_.references))
    +
    +      val projection = if (filterSet.subsetOf(projectSet) &&
    +          AttributeSet(relation.output) == projectSet) {
    +        // When the required projection contains all of the filter columns 
and column pruning alone
    +        // can produce the required projection, push the required 
projection.
    +        // A final projection may still be needed if the data source 
produces a different column
    +        // order or if it cannot prune all of the nested columns.
    +        relation.output
    +      } else {
    +        // When there are filter columns not already in the required 
projection or when the required
    +        // projection is more complicated than column pruning, base column 
pruning on the set of
    +        // all columns needed by both.
    +        (projectSet ++ filterSet).toSeq
    +      }
    +
    +      val reader = relation.newReader
    --- End diff --
    
    OK we need to make a decision here:
    1. Apply operator pushdown twice(proposed by tihs PR). This moves the 
pushdown logic to planner which is more ideal and cleaner. The drawback is, 
before moving statistics to physical plan, we have some duplicated pushdown 
code in `DataSourceV2Relation` and applying pushdown twice has performance 
penalty.
    2. Apply operator pushdown only once in the planner. Same as 1, it's 
cleaner. The drawback is, before moving statistics to physical plan, data 
source v2 can't report statistics after filter.
    3. Apply operator pushdown only once in the optimizer(proposed by 
https://github.com/apache/spark/pull/21319). It has no performance penalty and 
we can report statistics after filter. The drawback is, before moving 
statistics to physical plan, we have a temporary `DataSourceReader` in 
`DataSourceV2Relation`, which is hacky.
    
    The tradeoff is: shall we bear with hacky code and move forward with the 
data source v2 operator pushdown support? or shall we make the code cleaner and 
bear with some performance pemalty(apply pushdown twice or not report stats 
after filter)? or shall we just hold back and think about how to move stats to 
physical plan?
    
    cc @marmbrus @jose-torres 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to