aokolnychyi commented on code in PR #54459:
URL: https://github.com/apache/spark/pull/54459#discussion_r2917935332


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala:
##########
@@ -96,18 +151,54 @@ object PushDownUtils {
           }
         }
 
-        // Data source filters that need to be evaluated again after scanning. 
which means
-        // the data source cannot guarantee the rows returned can pass these 
filters.
-        // As a result we must return it so Spark can plan an extra filter 
operator.
-        val postScanFilters = r.pushPredicates(translatedFilters.toArray).map 
{ predicate =>
-          DataSourceV2Strategy.rebuildExpressionFromFilter(predicate, 
translatedFilterToExpr)
+        // Post-scan filters candidates: those the data source rejected in the 
first pass
+        // and need to be evaluated by Spark after the scan.
+        val returnedFirstPassFilters = 
r.pushPredicates(translatedFilters.toArray).map {
+          predicate =>
+            DataSourceV2Strategy.rebuildExpressionFromFilter(predicate, 
translatedFilterToExpr)
+        }
+
+        val finalPostScanFilters = (partitionSchema, 
r.supportsEnhancedPartitionFiltering()) match {
+          // If the scan supports enhanced partition filtering, convert to 
PartitionPredicates
+          // (see SPARK-55596). PartitionPredicates are pushed to the scan in 
a second pass.
+          case (Some(structType), true) =>
+            val (postScanPartitionFilters, postScanDataFilters) =
+              DataSourceUtils.getPartitionFiltersAndDataFilters(
+                structType, returnedFirstPassFilters.toIndexedSeq)
+            val (untranslatablePartitionFilters, untranslatableDataFilters) =
+              DataSourceUtils.getPartitionFiltersAndDataFilters(
+                structType, untranslatableExprs.toSeq)
+
+            // Push second-pass partition filters as PartitionPredicates
+            val allPartitionFilters = postScanPartitionFilters ++ 
untranslatablePartitionFilters
+            val (partitionFiltersForPush, partitionFiltersNotPushed) =
+              allPartitionFilters.partition(isPartitionPushableFilter)
+            val partitionPredicatesForPush = partitionFiltersForPush
+              .map(expr => PartitionPredicateImpl(expr, 
toAttributes(structType)))

Review Comment:
   Are we calling `toAttributes` for each? Is it needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to