Github user gengliangwang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21319#discussion_r188232881
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
 ---
    @@ -17,48 +17,81 @@
     
     package org.apache.spark.sql.execution.datasources.v2
     
    -import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet}
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, 
Expression}
     import org.apache.spark.sql.catalyst.planning.PhysicalOperation
     import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
     import org.apache.spark.sql.catalyst.rules.Rule
    +import org.apache.spark.sql.execution.datasources.DataSourceStrategy
    +import org.apache.spark.sql.sources
    +import 
org.apache.spark.sql.sources.v2.reader.{SupportsPushDownCatalystFilters, 
SupportsPushDownFilters, SupportsPushDownRequiredColumns}
     
     object PushDownOperatorsToDataSource extends Rule[LogicalPlan] {
       override def apply(plan: LogicalPlan): LogicalPlan = plan match {
         // PhysicalOperation guarantees that filters are deterministic; no 
need to check
         case PhysicalOperation(project, filters, relation: 
DataSourceV2Relation) =>
    -      assert(relation.filters.isEmpty, "data source v2 should do push down 
only once.")
    +      val newReader = relation.createFreshReader
    +      var newRelation = relation.copy(optimizedReader = Some(newReader))
     
    -      val projectAttrs = project.map(_.toAttribute)
    -      val projectSet = AttributeSet(project.flatMap(_.references))
    -      val filterSet = AttributeSet(filters.flatMap(_.references))
    +      val postScanFilters: Seq[Expression] = newReader match {
    +        case r: SupportsPushDownCatalystFilters =>
    +          val postScanFilters = r.pushCatalystFilters(filters.toArray)
    +          newRelation.copy(pushedFilters = r.pushedCatalystFilters())
    +          postScanFilters
     
    -      val projection = if (filterSet.subsetOf(projectSet) &&
    -          AttributeSet(projectAttrs) == projectSet) {
    -        // When the required projection contains all of the filter columns 
and column pruning alone
    -        // can produce the required projection, push the required 
projection.
    -        // A final projection may still be needed if the data source 
produces a different column
    -        // order or if it cannot prune all of the nested columns.
    -        projectAttrs
    -      } else {
    -        // When there are filter columns not already in the required 
projection or when the required
    -        // projection is more complicated than column pruning, base column 
pruning on the set of
    -        // all columns needed by both.
    -        (projectSet ++ filterSet).toSeq
    +        case r: SupportsPushDownFilters =>
    +          // A map from translated data source filters to original 
catalyst filter expressions.
    +          val translatedFilterToExpr = 
mutable.HashMap.empty[sources.Filter, Expression]
    +          // Catalyst filter expression that can't be translated to data 
source filters.
    +          val untranslatableExprs = mutable.ArrayBuffer.empty[Expression]
    +
    +          for (filterExpr <- filters) {
    +            val translated = DataSourceStrategy.translateFilter(filterExpr)
    +            if (translated.isDefined) {
    +              translatedFilterToExpr(translated.get) = filterExpr
    +            } else {
    +              untranslatableExprs += filterExpr
    +            }
    +          }
    +
    +          // Data source filters that need to be evaluated again after 
scanning. which means
    +          // the data source cannot guarantee the rows returned can pass 
these filters.
    +          // As a result we must return it so Spark can plan an extra 
filter operator.
    +          val postScanFilters =
    +          
r.pushFilters(translatedFilterToExpr.keys.toArray).map(translatedFilterToExpr)
    --- End diff --
    
    nit: indent


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to