Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21143#discussion_r184601574
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
    @@ -224,24 +226,29 @@ object DataSourceV2Relation {
             )
     
           case filterSupport: SupportsPushDownFilters =>
    -        // A map from original Catalyst expressions to corresponding 
translated data source
    -        // filters. If a predicate is not in this map, it means it cannot 
be pushed down.
    -        val translatedMap: Map[Expression, Filter] = filters.flatMap { p =>
    -          DataSourceStrategy.translateFilter(p).map(f => p -> f)
    -        }.toMap
    +        val translateResults: Seq[(Option[Filter], Expression)] = 
filters.map { p =>
    +          (DataSourceStrategy.translateFilter(p), p)
    +        }
     
             // Catalyst predicate expressions that cannot be converted to data 
source filters.
    -        val nonConvertiblePredicates = 
filters.filterNot(translatedMap.contains)
    +        val nonConvertiblePredicates = translateResults.collect {
    +          case (None, p) => p
    +        }
     
    -        // Data source filters that cannot be pushed down. An unhandled 
filter means
    -        // the data source cannot guarantee the rows returned can pass the 
filter.
    +        // A map from translated data source filters to original catalyst 
predicate expressions.
    +        val translatedMap: Map[Filter, Expression] = 
translateResults.collect {
    +          case (Some(f), p) => (f, p)
    +        }.toMap
    +
    +        // Data source filters that need to be evaluated again after 
scanning. which means
    +        // the data source cannot guarantee the rows returned can pass 
these filters.
             // As a result we must return it so Spark can plan an extra filter 
operator.
    -        val unhandledFilters = 
filterSupport.pushFilters(translatedMap.values.toArray).toSet
    -        val (unhandledPredicates, pushedPredicates) = 
translatedMap.partition { case (_, f) =>
    -          unhandledFilters.contains(f)
    -        }
    +        val afterScanPredicates =
    +          
filterSupport.pushFilters(translatedMap.keys.toArray).map(translatedMap)
    +        // The filters which are marked as pushed to this data source
    +        val pushedPredicates = 
filterSupport.pushedFilters().map(translatedMap)
    --- End diff --
    
    Add the logInfo messages for nonConvertiblePredicates, afterScanPredicates, 
pushedPredicates


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to