Github user cloud-fan commented on a diff in the pull request:
    --- Diff: 
    @@ -17,15 +17,56 @@
     package org.apache.spark.sql.execution.datasources.v2
    -import org.apache.spark.sql.Strategy
    +import org.apache.spark.sql.{execution, Strategy}
    +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
    +import org.apache.spark.sql.catalyst.planning.PhysicalOperation
     import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
     import org.apache.spark.sql.execution.SparkPlan
     object DataSourceV2Strategy extends Strategy {
       override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    -    case r: DataSourceV2Relation =>
    -      DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, 
r.reader) :: Nil
    +    case PhysicalOperation(project, filters, relation: 
DataSourceV2Relation) =>
    +      val projectSet = AttributeSet(project.flatMap(_.references))
    +      val filterSet = AttributeSet(filters.flatMap(_.references))
    +      val projection = if (filterSet.subsetOf(projectSet) &&
    +          AttributeSet(relation.output) == projectSet) {
    +        // When the required projection contains all of the filter columns 
and column pruning alone
    +        // can produce the required projection, push the required 
    +        // A final projection may still be needed if the data source 
produces a different column
    +        // order or if it cannot prune all of the nested columns.
    +        relation.output
    +      } else {
    +        // When there are filter columns not already in the required 
projection or when the required
    +        // projection is more complicated than column pruning, base column 
pruning on the set of
    +        // all columns needed by both.
    +        (projectSet ++ filterSet).toSeq
    +      }
    +      val reader = relation.newReader
    --- End diff --
    it's nice to decouple the problem and do pushdown during planning, but I 
feel the cost is too high in this approach. For file-based data sources, we 
need to query hive metastore to apply partitioning pruning during filter 
pushdown, and this can be very expensive. Doing it twice looks scaring to me.
    cc @gatorsmile @dongjoon-hyun @mallman , please correct me if I have a 
wrong understanding.
    also cc @wzhfy do you have an estimation about how long it takes to move 
statistics to physical plan?


