This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new dcdcb80c536 [SPARK-42134][SQL] Fix getPartitionFiltersAndDataFilters() 
to handle filters without referenced attributes
dcdcb80c536 is described below

commit dcdcb80c53681d1daff416c007cf8a2810155625
Author: Peter Toth <peter.t...@gmail.com>
AuthorDate: Fri Jan 20 18:35:33 2023 -0800

    [SPARK-42134][SQL] Fix getPartitionFiltersAndDataFilters() to handle 
filters without referenced attributes
    
    ### What changes were proposed in this pull request?
    This is a small correctness fix to 
`DataSourceUtils.getPartitionFiltersAndDataFilters()` to handle filters without 
any referenced attributes correctly. E.g. without the fix the following query 
on ParquetV2 source:
    ```
    spark.conf.set("spark.sql.sources.useV1SourceList", "")
    spark.range(1).write.mode("overwrite").format("parquet").save(path)
    df = spark.read.parquet(path).toDF("i")
    f = udf(lambda x: False, "boolean")(lit(1))
    val r = df.filter(f)
    r.show()
    ```
    returns
    ```
    +---+
    |  i|
    +---+
    |  0|
    +---+
    ```
    but it should return with empty results.
    The root cause of the issue is that during `V2ScanRelationPushDown` a 
filter that doesn't reference any column is incorrectly identified as partition 
filter.
    
    ### Why are the changes needed?
    To fix a correctness issue.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, fixes a correctness issue.
    
    ### How was this patch tested?
    Added new UT.
    
    Closes #39676 from 
peter-toth/SPARK-42134-fix-getpartitionfiltersanddatafilters.
    
    Authored-by: Peter Toth <peter.t...@gmail.com>
    Signed-off-by: huaxingao <huaxin_...@apple.com>
---
 python/pyspark/sql/tests/test_udf.py                   | 18 ++++++++++++++++++
 .../sql/execution/datasources/DataSourceUtils.scala    |  2 +-
 2 files changed, 19 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/sql/tests/test_udf.py 
b/python/pyspark/sql/tests/test_udf.py
index 79fee7a48e5..1a2ec213ca6 100644
--- a/python/pyspark/sql/tests/test_udf.py
+++ b/python/pyspark/sql/tests/test_udf.py
@@ -681,6 +681,24 @@ class BaseUDFTests(object):
         finally:
             shutil.rmtree(path)
 
+    # SPARK-42134
+    def test_file_dsv2_with_udf_filter(self):
+        from pyspark.sql.functions import lit
+
+        path = tempfile.mkdtemp()
+        shutil.rmtree(path)
+
+        try:
+            with self.sql_conf({"spark.sql.sources.useV1SourceList": ""}):
+                
self.spark.range(1).write.mode("overwrite").format("parquet").save(path)
+                df = self.spark.read.parquet(path).toDF("i")
+                f = udf(lambda x: False, "boolean")(lit(1))
+                result = df.filter(f)
+                self.assertEqual(0, result.count())
+
+        finally:
+            shutil.rmtree(path)
+
     # SPARK-25591
     def test_same_accumulator_in_udfs(self):
         data_schema = StructType(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
index 26f22670a51..5eb422f80e2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
@@ -273,7 +273,7 @@ object DataSourceUtils extends PredicateHelper {
     }
     val partitionSet = AttributeSet(partitionColumns)
     val (partitionFilters, dataFilters) = normalizedFilters.partition(f =>
-      f.references.subsetOf(partitionSet)
+      f.references.nonEmpty && f.references.subsetOf(partitionSet)
     )
     val extraPartitionFilter =
       dataFilters.flatMap(extractPredicatesWithinOutputSet(_, partitionSet))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to