szehon-ho commented on code in PR #54459:
URL: https://github.com/apache/spark/pull/54459#discussion_r2896752207


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala:
##########
@@ -96,18 +149,55 @@ object PushDownUtils {
           }
         }
 
-        // Data source filters that need to be evaluated again after scanning. 
which means
-        // the data source cannot guarantee the rows returned can pass these 
filters.
-        // As a result we must return it so Spark can plan an extra filter 
operator.
-        val postScanFilters = r.pushPredicates(translatedFilters.toArray).map 
{ predicate =>
-          DataSourceV2Strategy.rebuildExpressionFromFilter(predicate, 
translatedFilterToExpr)
+        // Post-scan filters candidates: those the data source rejected in the 
first pass
+        // and need to be evaluated by Spark after the scan.
+        val returnedFirstPassFilters = 
r.pushPredicates(translatedFilters.toArray).map {
+          predicate =>
+            DataSourceV2Strategy.rebuildExpressionFromFilter(predicate, 
translatedFilterToExpr)
+        }
+
+        val finalPostScanFilters = (partitionSchema, 
r.supportsEnhancedPartitionFiltering()) match {
+          // If the scan supports enhanced partition filtering, convert to 
PartitionPredicates
+          // (see SPARK-55596). PartitionPredicates are pushed to the scan in 
a second pass.
+          case (Some(structType), true) =>
+            //
+            val (postScanPartitionFilters, postScanDataFilters) =
+              DataSourceUtils.getPartitionFiltersAndDataFilters(
+                structType, returnedFirstPassFilters.toIndexedSeq)
+            val (untranslatablePartitionFilters, untranslatableDataFilters) =
+              DataSourceUtils.getPartitionFiltersAndDataFilters(
+                structType, untranslatableExprs.toSeq)
+
+            // Push second-pass partition filters as PartitionPredicates
+            val allPartitionPredicates = (postScanPartitionFilters ++
+              untranslatablePartitionFilters)
+              .map(expr => new PartitionPredicateImpl(expr, 
toAttributes(structType)))
+            val returnedSecondPassPartitionFilters =
+              r.pushPredicates(allPartitionPredicates.toArray).map { predicate 
=>
+                V2ExpressionUtils.toCatalyst(predicate).getOrElse(
+                  DataSourceV2Strategy.rebuildExpressionFromFilter(
+                    predicate, translatedFilterToExpr))
+              }
+
+            // Normally translated filters (postScanFilters) are simple 
filters that can be
+            // evaluated faster, while the untranslated filters are 
complicated filters that take
+            // more time to evaluate, so we want to evaluate the 
postScanFilters filters first.
+            val untranslatableSet = ExpressionSet(untranslatableExprs)
+            val returnedPostScanPartitionFilters =
+              returnedSecondPassPartitionFilters.filter(e => 
!untranslatableSet.contains(e))

Review Comment:
   that is a good catch,  i thought it was supposed to be handled before, but 
it is not actually.  DSV2 currently gets all expressions (if they can be 
translated) and needs to do the logic themself.  Let me add some checks (the 
ones for file-based sources)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to