peter-toth commented on a change in pull request #31573:
URL: https://github.com/apache/spark/pull/31573#discussion_r576989630



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -335,6 +336,19 @@ case class FileSourceScanExec(
     dataFilters.flatMap(DataSourceStrategy.translateFilter(_, 
supportNestedPredicatePushdown))
   }
 
+  @transient
+  private lazy val runtimePushedDownFilters = {
+    dataFilters.flatMap {
+      case e: Expression if ExecSubqueryExpression.hasScalarSubquery(e) =>
+        val updatedValue = e.transform {
+          case s: ScalarSubquery => s.value
+        }
+        Some(updatedValue)
+      case _ =>
+        Nil
+    }.flatMap(translateFilter(_, 
DataSourceUtils.supportNestedPredicatePushdown(relation)))

Review comment:
       nit: IMO we can combine the 2 `flatMap`s and the code is still easy to 
read

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
##########
@@ -166,14 +167,11 @@ object FileSourceStrategy extends Strategy with 
PredicateHelper with Logging {
       val partitionKeyFilters = 
DataSourceStrategy.getPushedDownFilters(partitionColumns,
         normalizedFilters)
 
-      // subquery expressions are filtered out because they can't be used to 
prune buckets or pushed
-      // down as data filters, yet they would be executed
-      val normalizedFiltersWithoutSubqueries =
-        normalizedFilters.filterNot(SubqueryExpression.hasSubquery)
-
       val bucketSpec: Option[BucketSpec] = fsRelation.bucketSpec
       val bucketSet = if (shouldPruneBuckets(bucketSpec)) {
-        genBucketSet(normalizedFiltersWithoutSubqueries, bucketSpec.get)
+        // subquery expressions are filtered out because they can't be used to 
prune buckets
+        // or pushed down as data filters, yet they would be executed

Review comment:
       nit: `or pushed down as data filters` is not valid here

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
##########
@@ -184,7 +182,9 @@ object FileSourceStrategy extends Strategy with 
PredicateHelper with Logging {
       // Partition keys are not available in the statistics of the files.
       // `dataColumns` might have partition columns, we need to filter them 
out.
       val dataColumnsWithoutPartitionCols = 
dataColumns.filterNot(partitionColumns.contains)
-      val dataFilters = normalizedFiltersWithoutSubqueries.flatMap { f =>
+      // Non-scalar subquery expressions are filtered out because they can't 
be used to prune
+      // buckets or pushed down as data filters, yet they would be executed

Review comment:
       bi: `to prune buckets` is not valid here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to