szehon-ho commented on code in PR #54459:
URL: https://github.com/apache/spark/pull/54459#discussion_r2909284758


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala:
##########
@@ -96,18 +159,55 @@ object PushDownUtils {
           }
         }
 
-        // Data source filters that need to be evaluated again after scanning. 
which means
-        // the data source cannot guarantee the rows returned can pass these 
filters.
-        // As a result we must return it so Spark can plan an extra filter 
operator.
-        val postScanFilters = r.pushPredicates(translatedFilters.toArray).map 
{ predicate =>
-          DataSourceV2Strategy.rebuildExpressionFromFilter(predicate, 
translatedFilterToExpr)
+        // Post-scan filters candidates: those the data source rejected in the 
first pass
+        // and need to be evaluated by Spark after the scan.
+        val returnedFirstPassFilters = 
r.pushPredicates(translatedFilters.toArray).map {
+          predicate =>
+            DataSourceV2Strategy.rebuildExpressionFromFilter(predicate, 
translatedFilterToExpr)
+        }
+
+        val finalPostScanFilters = (partitionSchema, 
r.supportsEnhancedPartitionFiltering()) match {
+          // If the scan supports enhanced partition filtering, convert to 
PartitionPredicates
+          // (see SPARK-55596). PartitionPredicates are pushed to the scan in 
a second pass.
+          case (Some(structType), true) =>
+            val (postScanPartitionFilters, postScanDataFilters) =
+              DataSourceUtils.getPartitionFiltersAndDataFilters(
+                structType, returnedFirstPassFilters.toIndexedSeq)
+            val (untranslatablePartitionFilters, untranslatableDataFilters) =
+              DataSourceUtils.getPartitionFiltersAndDataFilters(
+                structType, untranslatableExprs.toSeq)
+
+            // Push second-pass partition filters as PartitionPredicates
+            val allPartitionFilters = postScanPartitionFilters ++ 
untranslatablePartitionFilters
+            val (partitionFiltersForPush, partitionFiltersNotPushed) =
+              allPartitionFilters.partition(isPartitionPushableFilter)
+            val partitionPredicatesForPush = partitionFiltersForPush
+              .map(expr => PartitionPredicateImpl(expr, 
toAttributes(structType)))
+            val returnedSecondPassPartitionFilters =
+              r.pushPredicates(partitionPredicatesForPush.toArray).map { 
predicate =>
+                V2ExpressionUtils.toCatalyst(predicate).getOrElse(
+                  // should not happen
+                  DataSourceV2Strategy.rebuildExpressionFromFilter(
+                    predicate, translatedFilterToExpr))
+              }
+
+            // Normally translated filters (postScanFilters) are simple 
filters that can be
+            // evaluated faster, while the untranslated filters are 
complicated filters that take
+            // more time to evaluate, so we want to evaluate the 
postScanFilters filters first.

Review Comment:
   this is a good catch.  I had this in mind but end up reversing it, fixed it.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala:
##########
@@ -96,18 +159,55 @@ object PushDownUtils {
           }
         }
 
-        // Data source filters that need to be evaluated again after scanning. 
which means
-        // the data source cannot guarantee the rows returned can pass these 
filters.
-        // As a result we must return it so Spark can plan an extra filter 
operator.
-        val postScanFilters = r.pushPredicates(translatedFilters.toArray).map 
{ predicate =>
-          DataSourceV2Strategy.rebuildExpressionFromFilter(predicate, 
translatedFilterToExpr)
+        // Post-scan filters candidates: those the data source rejected in the 
first pass
+        // and need to be evaluated by Spark after the scan.
+        val returnedFirstPassFilters = 
r.pushPredicates(translatedFilters.toArray).map {
+          predicate =>
+            DataSourceV2Strategy.rebuildExpressionFromFilter(predicate, 
translatedFilterToExpr)
+        }
+
+        val finalPostScanFilters = (partitionSchema, 
r.supportsEnhancedPartitionFiltering()) match {
+          // If the scan supports enhanced partition filtering, convert to 
PartitionPredicates
+          // (see SPARK-55596). PartitionPredicates are pushed to the scan in 
a second pass.
+          case (Some(structType), true) =>
+            val (postScanPartitionFilters, postScanDataFilters) =
+              DataSourceUtils.getPartitionFiltersAndDataFilters(
+                structType, returnedFirstPassFilters.toIndexedSeq)
+            val (untranslatablePartitionFilters, untranslatableDataFilters) =
+              DataSourceUtils.getPartitionFiltersAndDataFilters(
+                structType, untranslatableExprs.toSeq)
+
+            // Push second-pass partition filters as PartitionPredicates
+            val allPartitionFilters = postScanPartitionFilters ++ 
untranslatablePartitionFilters
+            val (partitionFiltersForPush, partitionFiltersNotPushed) =
+              allPartitionFilters.partition(isPartitionPushableFilter)
+            val partitionPredicatesForPush = partitionFiltersForPush
+              .map(expr => PartitionPredicateImpl(expr, 
toAttributes(structType)))
+            val returnedSecondPassPartitionFilters =
+              r.pushPredicates(partitionPredicatesForPush.toArray).map { 
predicate =>
+                V2ExpressionUtils.toCatalyst(predicate).getOrElse(
+                  // should not happen

Review Comment:
   ok done



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryEnhancedPartitionFilterTable.scala:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog
+
+import java.util
+
+import scala.collection.mutable.{ArrayBuffer, Buffer}
+
+import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.connector.expressions.filter.{PartitionPredicate, 
Predicate}
+import org.apache.spark.sql.connector.read.{InputPartition, Scan, ScanBuilder, 
SupportsPushDownRequiredColumns, SupportsPushDownV2Filters}
+import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.util.ArrayImplicits._
+
+/**
+ * In-memory table whose scan builder implements enhanced partition filtering 
using
+ * PartitionPredicates pushed in a second pass.
+ */
+class InMemoryEnhancedPartitionFilterTable(
+    name: String,
+    columns: Array[Column],
+    partitioning: Array[Transform],
+    properties: util.Map[String, String])
+  extends InMemoryTable(name, columns, partitioning, properties) {
+
+  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder 
= {
+    new InMemoryEnhancedPartitionFilterScanBuilder(schema())
+  }
+
+  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
+    InMemoryBaseTable.maybeSimulateFailedTableWrite(new 
CaseInsensitiveStringMap(properties))
+    InMemoryBaseTable.maybeSimulateFailedTableWrite(info.options)
+    new InMemoryWriterBuilderWithOverWrite(info)
+  }
+
+  class InMemoryEnhancedPartitionFilterScanBuilder(
+      tableSchema: StructType)
+    extends ScanBuilder
+    with SupportsPushDownV2Filters
+    with SupportsPushDownRequiredColumns {
+
+    private var readSchema: StructType = tableSchema
+    private val partitionPredicates: Buffer[PartitionPredicate] = 
ArrayBuffer.empty
+    private val firstPassPushedPredicates: Buffer[Predicate] = 
ArrayBuffer.empty
+
+    private val rejectPartitionPredicates =
+      InMemoryEnhancedPartitionFilterTable.this.properties.getOrDefault(
+        InMemoryEnhancedPartitionFilterTable.RejectPartitionPredicatesKey, 
"false")
+        .toBoolean
+
+    private val rejectDataPredicates =
+      InMemoryEnhancedPartitionFilterTable.this.properties.getOrDefault(
+        InMemoryEnhancedPartitionFilterTable.RejectDataPredicatesKey, "false")
+        .toBoolean
+
+    override def supportsEnhancedPartitionFiltering(): Boolean = true
+
+    override def pushPredicates(predicates: Array[Predicate]): 
Array[Predicate] = {
+      val partNames = 
InMemoryEnhancedPartitionFilterTable.this.partCols.flatMap(_.toSeq).toSet
+      def referencesOnlyPartitionCols(p: Predicate): Boolean =
+        p.references().forall(ref => 
partNames.contains(ref.fieldNames().mkString(".")))
+      def referencesOnlyDataCols(p: Predicate): Boolean =
+        p.references().forall(ref => 
!partNames.contains(ref.fieldNames().mkString(".")))
+
+      val returned = ArrayBuffer.empty[Predicate]
+
+      predicates.foreach {
+        case p: PartitionPredicate =>
+          if (rejectPartitionPredicates) {
+            returned += p
+          } else {
+            partitionPredicates += p
+          }
+        case p if referencesOnlyPartitionCols(p) &&
+            InMemoryTableWithV2Filter.supportsPredicates(Array(p)) =>
+          if (rejectPartitionPredicates) {
+            returned += p
+          } else {
+            firstPassPushedPredicates += p
+          }
+        case p if rejectDataPredicates && referencesOnlyDataCols(p) =>
+          // Reject: we are mocking a data source that can evaluate this data 
predicate
+        case p =>
+          returned += p
+      }
+
+      if (partitionPredicates.nonEmpty) Array.empty[Predicate]

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to