[GitHub] spark pull request #21143: [SPARK-24072][SQL] clearly define pushed filters
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21143 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21143: [SPARK-24072][SQL] clearly define pushed filters
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21143#discussion_r184601974 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -224,24 +226,29 @@ object DataSourceV2Relation { ) case filterSupport: SupportsPushDownFilters => -// A map from original Catalyst expressions to corresponding translated data source -// filters. If a predicate is not in this map, it means it cannot be pushed down. -val translatedMap: Map[Expression, Filter] = filters.flatMap { p => - DataSourceStrategy.translateFilter(p).map(f => p -> f) -}.toMap +val translateResults: Seq[(Option[Filter], Expression)] = filters.map { p => + (DataSourceStrategy.translateFilter(p), p) +} // Catalyst predicate expressions that cannot be converted to data source filters. -val nonConvertiblePredicates = filters.filterNot(translatedMap.contains) +val nonConvertiblePredicates = translateResults.collect { --- End diff -- Avoid using the `predicates` and `filters` as the variable names in the same function. This just makes the reader confused. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21143: [SPARK-24072][SQL] clearly define pushed filters
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21143#discussion_r184601574 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -224,24 +226,29 @@ object DataSourceV2Relation { ) case filterSupport: SupportsPushDownFilters => -// A map from original Catalyst expressions to corresponding translated data source -// filters. If a predicate is not in this map, it means it cannot be pushed down. -val translatedMap: Map[Expression, Filter] = filters.flatMap { p => - DataSourceStrategy.translateFilter(p).map(f => p -> f) -}.toMap +val translateResults: Seq[(Option[Filter], Expression)] = filters.map { p => + (DataSourceStrategy.translateFilter(p), p) +} // Catalyst predicate expressions that cannot be converted to data source filters. -val nonConvertiblePredicates = filters.filterNot(translatedMap.contains) +val nonConvertiblePredicates = translateResults.collect { + case (None, p) => p +} -// Data source filters that cannot be pushed down. An unhandled filter means -// the data source cannot guarantee the rows returned can pass the filter. +// A map from translated data source filters to original catalyst predicate expressions. +val translatedMap: Map[Filter, Expression] = translateResults.collect { + case (Some(f), p) => (f, p) +}.toMap + +// Data source filters that need to be evaluated again after scanning. which means +// the data source cannot guarantee the rows returned can pass these filters. // As a result we must return it so Spark can plan an extra filter operator. -val unhandledFilters = filterSupport.pushFilters(translatedMap.values.toArray).toSet -val (unhandledPredicates, pushedPredicates) = translatedMap.partition { case (_, f) => - unhandledFilters.contains(f) -} +val afterScanPredicates = + filterSupport.pushFilters(translatedMap.keys.toArray).map(translatedMap) +// The filters which are marked as pushed to this data source +val pushedPredicates = filterSupport.pushedFilters().map(translatedMap) --- End diff -- Add the logInfo messages for nonConvertiblePredicates, afterScanPredicates, pushedPredicates --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21143: [SPARK-24072][SQL] clearly define pushed filters
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21143#discussion_r184600798 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -224,24 +226,29 @@ object DataSourceV2Relation { ) --- End diff -- the indents between 221 and 223 look weird. Also correct them? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21143: [SPARK-24072][SQL] clearly define pushed filters
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21143#discussion_r184600589 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -56,7 +56,7 @@ case class DataSourceV2Relation( lazy val ( --- End diff -- uh... Then, we should mention it in the comment too. For example, ``` // afterScanFilters: predicates that need to be evaluated after the scan. // pushedFilters: predicates that will be pushed down and only evaluated in the underlying data sources. // Note: afterScanFilters and pushedFilters can overlap. For example, the parquet row group filter ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21143: [SPARK-24072][SQL] clearly define pushed filters
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21143#discussion_r184599222 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -56,7 +56,7 @@ case class DataSourceV2Relation( lazy val ( --- End diff -- they can overlap(think about parquet row group filter), so I'd remove the `only` in the above comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21143: [SPARK-24072][SQL] clearly define pushed filters
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21143#discussion_r184598059 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -56,7 +56,7 @@ case class DataSourceV2Relation( lazy val ( --- End diff -- // afterScanFilters: predicates that need to be evaluated after the scan. // pushedFilters: predicates that will be pushed down and only evaluated in the underlying data sources. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21143: [SPARK-24072][SQL] clearly define pushed filters
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/21143#discussion_r183925902 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -237,11 +239,16 @@ object DataSourceV2Relation { // the data source cannot guarantee the rows returned can pass the filter. // As a result we must return it so Spark can plan an extra filter operator. val unhandledFilters = filterSupport.pushFilters(translatedMap.values.toArray).toSet -val (unhandledPredicates, pushedPredicates) = translatedMap.partition { case (_, f) => +val unhandledPredicates = translatedMap.filter { case (_, f) => unhandledFilters.contains(f) -} - -(nonConvertiblePredicates ++ unhandledPredicates.keys, pushedPredicates.keys.toSeq) +}.keys +// The filters which are marked as pushed to this data source +val pushedFilters = filterSupport.pushedFilters() +val pushedPredicates = translatedMap.filter { case (_, f) => + pushedFilters.contains(f) +}.keys.toSeq + +(nonConvertiblePredicates ++ unhandledPredicates, pushedPredicates) --- End diff -- Or ``` val translatedResults: Seq[(Option[Filter], Expression)] = filters.map { p => (DataSourceStrategy.translateFilter(p), p) } // A map from original Catalyst expressions to corresponding translated data source // filters. If a predicate is not in this map, it means it cannot be pushed down. val translatedMap: Map[Filter, Expression] = translatedResults.collect { case (f, p) if f.isDefined => (f.get, p) }.toMap // Catalyst predicate expressions that cannot be converted to data source filters. val nonConvertiblePredicates = translatedResults.collect { case (f, p) if f.isEmpty => p } // Data source filters that cannot be pushed down. An unhandled filter means // the data source cannot guarantee the rows returned can pass the filter. // As a result we must return it so Spark can plan an extra filter operator. val unhandledPredicates = filterSupport.pushFilters(translatedMap.keys.toArray).map(translatedMap) // The filters which are marked as pushed to this data source val pushedPredicates = filterSupport.pushedFilters().map(translatedMap) (nonConvertiblePredicates ++ unhandledPredicates, pushedPredicates) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21143: [SPARK-24072][SQL] clearly define pushed filters
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/21143#discussion_r183924792 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -237,11 +239,16 @@ object DataSourceV2Relation { // the data source cannot guarantee the rows returned can pass the filter. // As a result we must return it so Spark can plan an extra filter operator. val unhandledFilters = filterSupport.pushFilters(translatedMap.values.toArray).toSet -val (unhandledPredicates, pushedPredicates) = translatedMap.partition { case (_, f) => +val unhandledPredicates = translatedMap.filter { case (_, f) => unhandledFilters.contains(f) -} - -(nonConvertiblePredicates ++ unhandledPredicates.keys, pushedPredicates.keys.toSeq) +}.keys +// The filters which are marked as pushed to this data source +val pushedFilters = filterSupport.pushedFilters() +val pushedPredicates = translatedMap.filter { case (_, f) => + pushedFilters.contains(f) +}.keys.toSeq + +(nonConvertiblePredicates ++ unhandledPredicates, pushedPredicates) --- End diff -- maybe we can reverse the map and make it simpler: ``` val translatedMap: Map[Filter, Expression] = filters.flatMap { p => DataSourceStrategy.translateFilter(p).map(f => f -> p) }.toMap // Catalyst predicate expressions that cannot be converted to data source filters. val convertiblePredicates = translatedMap.values.toSet val nonConvertiblePredicates = filters.filterNot(convertiblePredicates.contains) // Data source filters that cannot be pushed down. An unhandled filter means // the data source cannot guarantee the rows returned can pass the filter. // As a result we must return it so Spark can plan an extra filter operator. val unhandledPredicates = filterSupport.pushFilters(translatedMap.keys.toArray).map(translatedMap) // The filters which are marked as pushed to this data source val pushedPredicates = filterSupport.pushedFilters().map(translatedMap) (nonConvertiblePredicates ++ unhandledPredicates, pushedPredicates) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org