Github user gengliangwang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21319#discussion_r188289317
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
 ---
    @@ -17,48 +17,81 @@
     
     package org.apache.spark.sql.execution.datasources.v2
     
    -import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet}
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression}
     import org.apache.spark.sql.catalyst.planning.PhysicalOperation
     import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
     import org.apache.spark.sql.catalyst.rules.Rule
    +import org.apache.spark.sql.execution.datasources.DataSourceStrategy
    +import org.apache.spark.sql.sources
    +import 
org.apache.spark.sql.sources.v2.reader.{SupportsPushDownCatalystFilters, 
SupportsPushDownFilters, SupportsPushDownRequiredColumns}
     
     object PushDownOperatorsToDataSource extends Rule[LogicalPlan] {
       override def apply(plan: LogicalPlan): LogicalPlan = plan match {
         // PhysicalOperation guarantees that filters are deterministic; no 
need to check
         case PhysicalOperation(project, filters, relation: 
DataSourceV2Relation) =>
    -      assert(relation.filters.isEmpty, "data source v2 should do push down 
only once.")
    +      val newReader = relation.createFreshReader
    +      var newRelation = relation.copy(optimizedReader = Some(newReader))
     
    -      val projectAttrs = project.map(_.toAttribute)
    -      val projectSet = AttributeSet(project.flatMap(_.references))
    -      val filterSet = AttributeSet(filters.flatMap(_.references))
    +      val postScanFilters: Seq[Expression] = newReader match {
    +        case r: SupportsPushDownCatalystFilters =>
    +          val postScanFilters = r.pushCatalystFilters(filters.toArray)
    +          newRelation.copy(pushedFilters = r.pushedCatalystFilters())
    --- End diff --
    
    newRelation = newRelation.copy(pushedFilters = pushedFilters)
    
    Maybe make `pushedFilters` as return result and copy after `match{ }..`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to