This is an automated email from the ASF dual-hosted git repository. huaxingao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new dcdcb80c536 [SPARK-42134][SQL] Fix getPartitionFiltersAndDataFilters() to handle filters without referenced attributes dcdcb80c536 is described below commit dcdcb80c53681d1daff416c007cf8a2810155625 Author: Peter Toth <peter.t...@gmail.com> AuthorDate: Fri Jan 20 18:35:33 2023 -0800 [SPARK-42134][SQL] Fix getPartitionFiltersAndDataFilters() to handle filters without referenced attributes ### What changes were proposed in this pull request? This is a small correctness fix to `DataSourceUtils.getPartitionFiltersAndDataFilters()` to handle filters without any referenced attributes correctly. E.g. without the fix the following query on ParquetV2 source: ``` spark.conf.set("spark.sql.sources.useV1SourceList", "") spark.range(1).write.mode("overwrite").format("parquet").save(path) df = spark.read.parquet(path).toDF("i") f = udf(lambda x: False, "boolean")(lit(1)) val r = df.filter(f) r.show() ``` returns ``` +---+ | i| +---+ | 0| +---+ ``` but it should return with empty results. The root cause of the issue is that during `V2ScanRelationPushDown` a filter that doesn't reference any column is incorrectly identified as partition filter. ### Why are the changes needed? To fix a correctness issue. ### Does this PR introduce _any_ user-facing change? Yes, fixes a correctness issue. ### How was this patch tested? Added new UT. Closes #39676 from peter-toth/SPARK-42134-fix-getpartitionfiltersanddatafilters. Authored-by: Peter Toth <peter.t...@gmail.com> Signed-off-by: huaxingao <huaxin_...@apple.com> --- python/pyspark/sql/tests/test_udf.py | 18 ++++++++++++++++++ .../sql/execution/datasources/DataSourceUtils.scala | 2 +- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/tests/test_udf.py b/python/pyspark/sql/tests/test_udf.py index 79fee7a48e5..1a2ec213ca6 100644 --- a/python/pyspark/sql/tests/test_udf.py +++ b/python/pyspark/sql/tests/test_udf.py @@ -681,6 +681,24 @@ class BaseUDFTests(object): finally: shutil.rmtree(path) + # SPARK-42134 + def test_file_dsv2_with_udf_filter(self): + from pyspark.sql.functions import lit + + path = tempfile.mkdtemp() + shutil.rmtree(path) + + try: + with self.sql_conf({"spark.sql.sources.useV1SourceList": ""}): + self.spark.range(1).write.mode("overwrite").format("parquet").save(path) + df = self.spark.read.parquet(path).toDF("i") + f = udf(lambda x: False, "boolean")(lit(1)) + result = df.filter(f) + self.assertEqual(0, result.count()) + + finally: + shutil.rmtree(path) + # SPARK-25591 def test_same_accumulator_in_udfs(self): data_schema = StructType( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index 26f22670a51..5eb422f80e2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -273,7 +273,7 @@ object DataSourceUtils extends PredicateHelper { } val partitionSet = AttributeSet(partitionColumns) val (partitionFilters, dataFilters) = normalizedFilters.partition(f => - f.references.subsetOf(partitionSet) + f.references.nonEmpty && f.references.subsetOf(partitionSet) ) val extraPartitionFilter = dataFilters.flatMap(extractPredicatesWithinOutputSet(_, partitionSet)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org