[spark] branch branch-3.3 updated: [SPARK-40152][SQL] Fix split_part codegen compilation issue
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 233a54d0ab3 [SPARK-40152][SQL] Fix split_part codegen compilation issue 233a54d0ab3 is described below commit 233a54d0ab39944ec815bd86d2fc6200c03ca79a Author: Yuming Wang AuthorDate: Sun Aug 21 14:30:05 2022 -0500 [SPARK-40152][SQL] Fix split_part codegen compilation issue ### What changes were proposed in this pull request? Fix `split_part` codegen compilation issue: ```sql SELECT split_part(str, delimiter, partNum) FROM VALUES ('11.12.13', '.', 3) AS v1(str, delimiter, partNum); ``` ``` org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 42, Column 1: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 42, Column 1: Expression "project_isNull_0 = false" is not a type ``` ### Why are the changes needed? Fix bug. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #37589 from wangyum/SPARK-40152. Authored-by: Yuming Wang Signed-off-by: Sean Owen (cherry picked from commit cf1a80eeae8bf815270fb39568b1846c2bd8d437) Signed-off-by: Sean Owen --- .../spark/sql/catalyst/expressions/collectionOperations.scala | 6 +++--- sql/core/src/test/resources/sql-tests/inputs/string-functions.sql | 1 + .../resources/sql-tests/results/ansi/string-functions.sql.out | 8 .../src/test/resources/sql-tests/results/string-functions.sql.out | 8 4 files changed, 20 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala index 4a5ae5d2e02..3786c1a33bc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala @@ -2225,9 +2225,9 @@ case class ElementAt( case Some(value) => val defaultValueEval = value.genCode(ctx) s""" - ${defaultValueEval.code} - ${ev.isNull} = ${defaultValueEval.isNull} - ${ev.value} = ${defaultValueEval.value} + ${defaultValueEval.code}; + ${ev.isNull} = ${defaultValueEval.isNull}; + ${ev.value} = ${defaultValueEval.value}; """.stripMargin case None => s"${ev.isNull} = true;" } diff --git a/sql/core/src/test/resources/sql-tests/inputs/string-functions.sql b/sql/core/src/test/resources/sql-tests/inputs/string-functions.sql index e1c97b468f2..058ea891797 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/string-functions.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/string-functions.sql @@ -38,6 +38,7 @@ SELECT split_part('11.12.13', '.', 4); SELECT split_part('11.12.13', '.', 5); SELECT split_part('11.12.13', '.', -5); SELECT split_part(null, '.', 1); +SELECT split_part(str, delimiter, partNum) FROM VALUES ('11.12.13', '.', 3) AS v1(str, delimiter, partNum); -- substring function SELECT substr('Spark SQL', 5); diff --git a/sql/core/src/test/resources/sql-tests/results/ansi/string-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/ansi/string-functions.sql.out index 35ec3a97566..c7fda3f68bc 100644 --- a/sql/core/src/test/resources/sql-tests/results/ansi/string-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/ansi/string-functions.sql.out @@ -213,6 +213,14 @@ struct NULL +-- !query +SELECT split_part(str, delimiter, partNum) FROM VALUES ('11.12.13', '.', 3) AS v1(str, delimiter, partNum) +-- !query schema +struct +-- !query output +13 + + -- !query SELECT substr('Spark SQL', 5) -- !query schema diff --git a/sql/core/src/test/resources/sql-tests/results/string-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/string-functions.sql.out index dc72dfe137d..b1d49ae2876 100644 --- a/sql/core/src/test/resources/sql-tests/results/string-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/string-functions.sql.out @@ -205,6 +205,14 @@ struct NULL +-- !query +SELECT split_part(str, delimiter, partNum) FROM VALUES ('11.12.13', '.', 3) AS v1(str, delimiter, partNum) +-- !query schema +struct +-- !query output +13 + + -- !query SELECT substr('Spark SQL', 5) -- !query schema - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@
[spark] branch master updated: [SPARK-40152][SQL] Fix split_part codegen compilation issue
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new cf1a80eeae8 [SPARK-40152][SQL] Fix split_part codegen compilation issue cf1a80eeae8 is described below commit cf1a80eeae8bf815270fb39568b1846c2bd8d437 Author: Yuming Wang AuthorDate: Sun Aug 21 14:30:05 2022 -0500 [SPARK-40152][SQL] Fix split_part codegen compilation issue ### What changes were proposed in this pull request? Fix `split_part` codegen compilation issue: ```sql SELECT split_part(str, delimiter, partNum) FROM VALUES ('11.12.13', '.', 3) AS v1(str, delimiter, partNum); ``` ``` org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 42, Column 1: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 42, Column 1: Expression "project_isNull_0 = false" is not a type ``` ### Why are the changes needed? Fix bug. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #37589 from wangyum/SPARK-40152. Authored-by: Yuming Wang Signed-off-by: Sean Owen --- .../spark/sql/catalyst/expressions/collectionOperations.scala | 6 +++--- sql/core/src/test/resources/sql-tests/inputs/string-functions.sql | 1 + .../resources/sql-tests/results/ansi/string-functions.sql.out | 8 .../src/test/resources/sql-tests/results/string-functions.sql.out | 8 4 files changed, 20 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala index 40eade75578..148bbc721e7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala @@ -2270,9 +2270,9 @@ case class ElementAt( case Some(value) => val defaultValueEval = value.genCode(ctx) s""" - ${defaultValueEval.code} - ${ev.isNull} = ${defaultValueEval.isNull} - ${ev.value} = ${defaultValueEval.value} + ${defaultValueEval.code}; + ${ev.isNull} = ${defaultValueEval.isNull}; + ${ev.value} = ${defaultValueEval.value}; """.stripMargin case None => s"${ev.isNull} = true;" } diff --git a/sql/core/src/test/resources/sql-tests/inputs/string-functions.sql b/sql/core/src/test/resources/sql-tests/inputs/string-functions.sql index e1c97b468f2..058ea891797 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/string-functions.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/string-functions.sql @@ -38,6 +38,7 @@ SELECT split_part('11.12.13', '.', 4); SELECT split_part('11.12.13', '.', 5); SELECT split_part('11.12.13', '.', -5); SELECT split_part(null, '.', 1); +SELECT split_part(str, delimiter, partNum) FROM VALUES ('11.12.13', '.', 3) AS v1(str, delimiter, partNum); -- substring function SELECT substr('Spark SQL', 5); diff --git a/sql/core/src/test/resources/sql-tests/results/ansi/string-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/ansi/string-functions.sql.out index 08dcc011f24..add89a635a8 100644 --- a/sql/core/src/test/resources/sql-tests/results/ansi/string-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/ansi/string-functions.sql.out @@ -239,6 +239,14 @@ struct NULL +-- !query +SELECT split_part(str, delimiter, partNum) FROM VALUES ('11.12.13', '.', 3) AS v1(str, delimiter, partNum) +-- !query schema +struct +-- !query output +13 + + -- !query SELECT substr('Spark SQL', 5) -- !query schema diff --git a/sql/core/src/test/resources/sql-tests/results/string-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/string-functions.sql.out index a02b27142ff..dedbd29d4bb 100644 --- a/sql/core/src/test/resources/sql-tests/results/string-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/string-functions.sql.out @@ -205,6 +205,14 @@ struct NULL +-- !query +SELECT split_part(str, delimiter, partNum) FROM VALUES ('11.12.13', '.', 3) AS v1(str, delimiter, partNum) +-- !query schema +struct +-- !query output +13 + + -- !query SELECT substr('Spark SQL', 5) -- !query schema - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40163][SQL] feat: SparkSession.config(Map)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ec4d0ced0b9 [SPARK-40163][SQL] feat: SparkSession.config(Map) ec4d0ced0b9 is described below commit ec4d0ced0b912df16adff24196ac3c533588307f Author: seunggabi AuthorDate: Sun Aug 21 14:28:49 2022 -0500 [SPARK-40163][SQL] feat: SparkSession.config(Map) > https://issues.apache.org/jira/browse/SPARK-40163 ### What changes were proposed in this pull request? - as-is ```kotlin private fun config(builder: SparkSession.Builder): SparkSession.Builder { val map = YamlUtils.read(this::class.java, "spark", Extension.YAML) var b = builder map.keys.forEach { val k = it val v = map[k] b = when (v) { is Long -> b.config(k, v) is String -> b.config(k, v) is Double -> b.config(k, v) is Boolean -> b.config(k, v) else -> b } } return b } } ``` - to-be ```kotlin private fun config(builder: SparkSession.Builder): SparkSession.Builder { val map = YamlUtils.read(this::class.java, "spark", Extension.YAML) return b.config(map) } } ``` ### Why are the changes needed? - string, boolean, long, double -> toString - so this is simple code! ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? - added test code Closes #37478 from seunggabi/feat/spark-session-config-by-object. Authored-by: seunggabi Signed-off-by: Sean Owen --- .../scala/org/apache/spark/sql/SparkSession.scala | 25 ++ .../apache/spark/sql/JavaSparkSessionSuite.java| 56 ++ .../spark/sql/SparkSessionBuilderSuite.scala | 18 +++ 3 files changed, 99 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 5b212c3d595..4f5c89a796a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -859,6 +859,31 @@ object SparkSession extends Logging { this } +/** + * Sets a config option. Options set using this method are automatically propagated to + * both `SparkConf` and SparkSession's own configuration. + * + * @since 3.4.0 + */ +def config(map: Map[String, Any]): Builder = synchronized { + map.foreach { +kv: (String, Any) => { + options += kv._1 -> kv._2.toString +} + } + this +} + +/** + * Sets a config option. Options set using this method are automatically propagated to + * both `SparkConf` and SparkSession's own configuration. + * + * @since 3.4.0 + */ +def config(map: java.util.Map[String, Any]): Builder = synchronized { + config(map.asScala.toMap) +} + /** * Sets a list of config options based on the given `SparkConf`. * diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaSparkSessionSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaSparkSessionSuite.java new file mode 100644 index 000..00f744f4d86 --- /dev/null +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaSparkSessionSuite.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.sql; + +import org.apache.spark.sql.*; +import org.junit.After; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class JavaSparkSessionSuite { + private SparkSession spark; + + @After + public void tearDown() { +spark.stop(); +spark = null; + } + + @Test + public void config() { +// SPARK-40163: SparkSession.config(Map) +Map map = new HashMap() {{ + put("string",
[spark] branch master updated (3ed382f3919 -> 36921910ace)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 3ed382f3919 [SPARK-40165][BUILD] Update test plugins to latest versions add 36921910ace [SPARK-40162][BUILD] Upgrade RoaringBitmap from 0.9.30 to 0.9.31 No new revisions were added by this update. Summary of changes: .../MapStatusesSerDeserBenchmark-jdk11-results.txt | 54 +++--- .../MapStatusesSerDeserBenchmark-jdk17-results.txt | 54 +++--- .../MapStatusesSerDeserBenchmark-results.txt | 54 +++--- dev/deps/spark-deps-hadoop-2-hive-2.3 | 4 +- dev/deps/spark-deps-hadoop-3-hive-2.3 | 4 +- pom.xml| 2 +- 6 files changed, 86 insertions(+), 86 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40165][BUILD] Update test plugins to latest versions
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 3ed382f3919 [SPARK-40165][BUILD] Update test plugins to latest versions 3ed382f3919 is described below commit 3ed382f391911ec4a79fb204f4986728017dfa4a Author: panbingkun AuthorDate: Sun Aug 21 08:14:05 2022 -0500 [SPARK-40165][BUILD] Update test plugins to latest versions ### What changes were proposed in this pull request? This PR test updates plugins to latest versions. ### Why are the changes needed? This brings improvment & bug fixes like the following: - 1.scalacheck (from 1.15.4 to 1.16.0) https://github.com/typelevel/scalacheck/releases https://github.com/typelevel/scalacheck/compare/1.15.4...v1.16.0 https://github.com/typelevel/scalacheck/commit/2ae1be5c8e5ee1c14abea607d631e334a56796de https://github.com/typelevel/scalacheck/commit/902121e498e59b5151066a6c1794cdf47a31428f - 2.maven-surefire-plugin (from 3.0.0-M5 to 3.0.0-M7) https://github.com/apache/maven-surefire/releases https://github.com/apache/maven-surefire/compare/surefire-3.0.0-M5...surefire-3.0.0-M7 - 3.maven-dependency-plugin (from 3.1.1 to 3.3.0) https://github.com/apache/maven-dependency-plugin/compare/maven-dependency-plugin-3.1.1...maven-dependency-plugin-3.3.0 https://github.com/apache/maven-dependency-plugin/commit/9646b0ed76a6d00e468c8eb1b6a27d260b09e944 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA and testing with the existing code. Closes #37598 from panbingkun/upgrade_maven-dependency-plugin. Authored-by: panbingkun Signed-off-by: Sean Owen --- pom.xml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index 6bba2d53888..0b121354810 100644 --- a/pom.xml +++ b/pom.xml @@ -1179,7 +1179,7 @@ org.scalacheck scalacheck_${scala.binary.version} -1.15.4 +1.16.0 test @@ -2932,7 +2932,7 @@ org.apache.maven.plugins maven-surefire-plugin - 3.0.0-M5 + 3.0.0-M7 @@ -3172,7 +3172,7 @@ org.apache.maven.plugins maven-dependency-plugin - 3.1.1 + 3.3.0 default-cli - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (e69b2df7a3b -> cde71aaf173)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from e69b2df7a3b [SPARK-40161][PS] Make Series.mode apply PandasMode add cde71aaf173 [SPARK-39833][SQL] Disable Parquet column index in DSv1 to fix a correctness issue in the case of overlapping partition and data columns No new revisions were added by this update. Summary of changes: .../datasources/parquet/ParquetFileFormat.scala| 5 + .../datasources/parquet/ParquetQuerySuite.scala| 22 ++ 2 files changed, 27 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated: [SPARK-39833][SQL] Disable Parquet column index in DSv1 to fix a correctness issue in the case of overlapping partition and data columns
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new e8a578ac757 [SPARK-39833][SQL] Disable Parquet column index in DSv1 to fix a correctness issue in the case of overlapping partition and data columns e8a578ac757 is described below commit e8a578ac757b4e53072af1bec908f6a1ff8ba611 Author: Ivan Sadikov AuthorDate: Sun Aug 21 18:59:48 2022 +0900 [SPARK-39833][SQL] Disable Parquet column index in DSv1 to fix a correctness issue in the case of overlapping partition and data columns ### What changes were proposed in this pull request? This PR fixes a correctness issue in Parquet DSv1 FileFormat when projection does not contain columns referenced in pushed filters. This typically happens when partition columns and data columns overlap. This could result in empty result when in fact there were records matching predicate as can be seen in the provided fields. The problem is especially visible with `count()` and `show()` reporting different results, for example, show() would return 1+ records where the count() would return 0. In Parquet, when the predicate is provided and column index is enabled, we would try to filter row ranges to figure out what the count should be. Unfortunately, there is an issue that if the projection is empty or is not in the set of filter columns, any checks on columns would fail and 0 rows are returned (`RowRanges.EMPTY`) even though there is data matching the filter. Note that this is rather a mitigation, a quick fix. The actual fix needs to go into Parquet-MR: https://issues.apache.org/jira/browse/PARQUET-2170. The fix is not required in DSv2 where the overlapping columns are removed in `FileScanBuilder::readDataSchema()`. ### Why are the changes needed? Fixes a correctness issue when projection columns are not referenced by columns in pushed down filters or the schema is empty in Parquet DSv1. Downsides: Parquet column filter would be disabled if it had not been explicitly enabled which could affect performance. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I added a unit test that reproduces this behaviour. The test fails without the fix and passes with the fix. Closes #37419 from sadikovi/SPARK-39833. Authored-by: Ivan Sadikov Signed-off-by: Hyukjin Kwon (cherry picked from commit cde71aaf173aadd14dd6393b09e9851b5caad903) Signed-off-by: Hyukjin Kwon --- .../datasources/parquet/ParquetFileFormat.scala| 5 + .../datasources/parquet/ParquetQuerySuite.scala| 22 ++ 2 files changed, 27 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index e5d33b84bf0..6b3922d11a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -228,6 +228,11 @@ class ParquetFileFormat SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, sparkSession.sessionState.conf.isParquetINT96AsTimestamp) +// See PARQUET-2170. +// Disable column index optimisation when required schema does not have columns that appear in +// pushed filters to avoid getting incorrect results. + hadoopConf.setBooleanIfUnset(ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED, false) + val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 9ef43995467..47096948d21 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -933,6 +933,28 @@ class ParquetV1QuerySuite extends ParquetQuerySuite { } } } + + test("SPARK-39833: pushed filters with count()") { +withTempPath { path => + val p = s"${path.getCanonicalPath}${File.separator}col=0${File.separator}" + Seq(0).toDF("COL").coalesce(1).write.save(p) + val df = spark.read.parquet(path.getCanonicalPath) + checkAnswer(df.filter("col = 0"), Seq(Row(0))) + assert(df.filter("col = 0").count() == 1, "col") + assert(df.filter("COL = 0").count() == 1, "COL") +} + } + + test("SPARK-39833: pus
[spark] branch branch-3.3 updated: [SPARK-39833][SQL] Disable Parquet column index in DSv1 to fix a correctness issue in the case of overlapping partition and data columns
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 7c69614f067 [SPARK-39833][SQL] Disable Parquet column index in DSv1 to fix a correctness issue in the case of overlapping partition and data columns 7c69614f067 is described below commit 7c69614f067c9eb68d997e8881d9b5845cde00fd Author: Ivan Sadikov AuthorDate: Sun Aug 21 18:59:48 2022 +0900 [SPARK-39833][SQL] Disable Parquet column index in DSv1 to fix a correctness issue in the case of overlapping partition and data columns ### What changes were proposed in this pull request? This PR fixes a correctness issue in Parquet DSv1 FileFormat when projection does not contain columns referenced in pushed filters. This typically happens when partition columns and data columns overlap. This could result in empty result when in fact there were records matching predicate as can be seen in the provided fields. The problem is especially visible with `count()` and `show()` reporting different results, for example, show() would return 1+ records where the count() would return 0. In Parquet, when the predicate is provided and column index is enabled, we would try to filter row ranges to figure out what the count should be. Unfortunately, there is an issue that if the projection is empty or is not in the set of filter columns, any checks on columns would fail and 0 rows are returned (`RowRanges.EMPTY`) even though there is data matching the filter. Note that this is rather a mitigation, a quick fix. The actual fix needs to go into Parquet-MR: https://issues.apache.org/jira/browse/PARQUET-2170. The fix is not required in DSv2 where the overlapping columns are removed in `FileScanBuilder::readDataSchema()`. ### Why are the changes needed? Fixes a correctness issue when projection columns are not referenced by columns in pushed down filters or the schema is empty in Parquet DSv1. Downsides: Parquet column filter would be disabled if it had not been explicitly enabled which could affect performance. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I added a unit test that reproduces this behaviour. The test fails without the fix and passes with the fix. Closes #37419 from sadikovi/SPARK-39833. Authored-by: Ivan Sadikov Signed-off-by: Hyukjin Kwon (cherry picked from commit cde71aaf173aadd14dd6393b09e9851b5caad903) Signed-off-by: Hyukjin Kwon --- .../datasources/parquet/ParquetFileFormat.scala| 5 + .../datasources/parquet/ParquetQuerySuite.scala| 22 ++ 2 files changed, 27 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 9765e7c7801..2fa0854c983 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -230,6 +230,11 @@ class ParquetFileFormat SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, sparkSession.sessionState.conf.isParquetINT96AsTimestamp) +// See PARQUET-2170. +// Disable column index optimisation when required schema does not have columns that appear in +// pushed filters to avoid getting incorrect results. + hadoopConf.setBooleanIfUnset(ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED, false) + val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 33656c84c88..d0a9a93b00f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -1065,6 +1065,28 @@ class ParquetV1QuerySuite extends ParquetQuerySuite { } } } + + test("SPARK-39833: pushed filters with count()") { +withTempPath { path => + val p = s"${path.getCanonicalPath}${File.separator}col=0${File.separator}" + Seq(0).toDF("COL").coalesce(1).write.save(p) + val df = spark.read.parquet(path.getCanonicalPath) + checkAnswer(df.filter("col = 0"), Seq(Row(0))) + assert(df.filter("col = 0").count() == 1, "col") + assert(df.filter("COL = 0").count() == 1, "COL") +} + } + + test("SPARK-39833: p
[spark] branch master updated: [SPARK-40161][PS] Make Series.mode apply PandasMode
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new e69b2df7a3b [SPARK-40161][PS] Make Series.mode apply PandasMode e69b2df7a3b is described below commit e69b2df7a3bb0cc77c315830180bb0c3e76957d7 Author: Ruifeng Zheng AuthorDate: Sun Aug 21 18:25:34 2022 +0900 [SPARK-40161][PS] Make Series.mode apply PandasMode ### What changes were proposed in this pull request? 1, move `PandasMode` into `pyspark.pandas.spark.functions` 2, apply `PandasMode` internally, so that only one pass on the dataset is needed ### Why are the changes needed? to simplify existing implementation ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing doctests Closes #37596 from zhengruifeng/ps_update_series_mode. Authored-by: Ruifeng Zheng Signed-off-by: Hyukjin Kwon --- python/pyspark/pandas/frame.py | 5 + python/pyspark/pandas/series.py | 12 ++-- python/pyspark/pandas/spark/functions.py | 5 + 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py index df2b5fffa62..72913bc17d3 100644 --- a/python/pyspark/pandas/frame.py +++ b/python/pyspark/pandas/frame.py @@ -65,7 +65,6 @@ if TYPE_CHECKING: from pandas.core.dtypes.common import infer_dtype_from_object from pandas.core.accessor import CachedAccessor from pandas.core.dtypes.inference import is_sequence -from pyspark import SparkContext from pyspark import StorageLevel from pyspark.sql import Column, DataFrame as SparkDataFrame, functions as F from pyspark.sql.functions import pandas_udf @@ -12442,8 +12441,6 @@ defaultdict(, {'col..., 'col...})] if numeric_only is None and axis == 0: numeric_only = True -sql_utils = SparkContext._active_spark_context._jvm.PythonSQLUtils - mode_scols: List[Column] = [] mode_col_names: List[str] = [] mode_labels: List[Label] = [] @@ -12455,7 +12452,7 @@ defaultdict(, {'col..., 'col...})] if not numeric_only or is_numeric: scol = psser.spark.column -mode_scol = Column(sql_utils.pandasMode(scol._jc, dropna)).alias(col_name) +mode_scol = SF.mode(scol, dropna).alias(col_name) mode_scols.append(mode_scol) mode_col_names.append(col_name) mode_labels.append(label) diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py index e1b4ac3a3e3..fa99ddf76ce 100644 --- a/python/pyspark/pandas/series.py +++ b/python/pyspark/pandas/series.py @@ -4731,12 +4731,12 @@ class Series(Frame, IndexOpsMixin, Generic[T]): 3NaN dtype: float64 """ -ser_count = self.value_counts(dropna=dropna, sort=False) -sdf_count = ser_count._internal.spark_frame -most_value = ser_count.max() -sdf_most_value = sdf_count.filter("count == {}".format(str(most_value))) -sdf = sdf_most_value.select( -F.col(SPARK_DEFAULT_INDEX_NAME).alias(SPARK_DEFAULT_SERIES_NAME) +scol = self.spark.column +name = self._internal.data_spark_column_names[0] +sdf = ( +self._internal.spark_frame.select(SF.mode(scol, dropna).alias(name)) +.select(F.array_sort(F.col(name)).alias(name)) +.select(F.explode(F.col(name)).alias(name)) ) internal = InternalFrame(spark_frame=sdf, index_spark_columns=None, column_labels=[None]) ser_mode = first_series(DataFrame(internal)) diff --git a/python/pyspark/pandas/spark/functions.py b/python/pyspark/pandas/spark/functions.py index 11f9dbbb8c0..58715b5f781 100644 --- a/python/pyspark/pandas/spark/functions.py +++ b/python/pyspark/pandas/spark/functions.py @@ -46,6 +46,11 @@ def kurt(col: Column) -> Column: return Column(sc._jvm.PythonSQLUtils.pandasKurtosis(col._jc)) +def mode(col: Column, dropna: bool) -> Column: +sc = SparkContext._active_spark_context +return Column(sc._jvm.PythonSQLUtils.pandasMode(col._jc, dropna)) + + def repeat(col: Column, n: Union[int, Column]) -> Column: """ Repeats a string column n times, and returns it as a new string column. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-39925][SQL] Add array_sort(column, comparator) overload to DataFrame operations
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 1e26abf824b [SPARK-39925][SQL] Add array_sort(column, comparator) overload to DataFrame operations 1e26abf824b is described below commit 1e26abf824b7a1bb4e13cb5233d8f1d81388a095 Author: Brandon Dahler AuthorDate: Sun Aug 21 18:21:13 2022 +0900 [SPARK-39925][SQL] Add array_sort(column, comparator) overload to DataFrame operations ### What changes were proposed in this pull request? Adding a new `array_sort` overload to `org.apache.spark.sql.functions` that matches the new overload defined in [SPARK-29020](https://issues.apache.org/jira/browse/SPARK-29020) and added via #25728. ### Why are the changes needed? Adds access to the new overload for users of the DataFrame API so that they don't need to use the `expr` escape hatch. ### Does this PR introduce _any_ user-facing change? Yes, now allows users to optionally provide a comparator function to the `array_sort`, which opens up the ability to sort descending as well as sort items that aren't naturally orderable. Example: Old: ``` df.selectExpr("array_sort(a, (x, y) -> cardinality(x) - cardinality(y))"); ``` Added: ``` df.select(array_sort(col("a"), (x, y) => size(x) - size(y))); ``` ### How was this patch tested? Unit tests updated to validate that the overload matches the expression's behavior. Closes #37361 from brandondahler/features/ArraySortOverload. Authored-by: Brandon Dahler Signed-off-by: Hyukjin Kwon --- .../scala/org/apache/spark/sql/functions.scala | 13 +++ .../apache/spark/sql/DataFrameFunctionsSuite.scala | 42 ++ 2 files changed, 55 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 533c5614885..bd7473706ca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -3968,6 +3968,19 @@ object functions { */ def array_sort(e: Column): Column = withExpr { new ArraySort(e.expr) } + /** + * Sorts the input array based on the given comparator function. The comparator will take two + * arguments representing two elements of the array. It returns a negative integer, 0, or a + * positive integer as the first element is less than, equal to, or greater than the second + * element. If the comparator function returns null, the function will fail and raise an error. + * + * @group collection_funcs + * @since 3.4.0 + */ + def array_sort(e: Column, comparator: (Column, Column) => Column): Column = withExpr { +new ArraySort(e.expr, createLambda(comparator)) + } + /** * Remove all elements that equal to element from the given array. * diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala index 9924fbfbf62..b80925f8638 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala @@ -434,6 +434,18 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession { }) val df1 = Seq(Array[Int](3, 2, 5, 1, 2)).toDF("a") +checkAnswer( + df1.select(array_sort(col("a"), (x, y) => call_udf("fAsc", x, y))), + Seq( +Row(Seq(1, 2, 2, 3, 5))) +) + +checkAnswer( + df1.select(array_sort(col("a"), (x, y) => call_udf("fDesc", x, y))), + Seq( +Row(Seq(5, 3, 2, 2, 1))) +) + checkAnswer( df1.selectExpr("array_sort(a, (x, y) -> fAsc(x, y))"), Seq( @@ -447,6 +459,12 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession { ) val df2 = Seq(Array[String]("bc", "ab", "dc")).toDF("a") +checkAnswer( + df2.select(array_sort(col("a"), (x, y) => call_udf("fString", x, y))), + Seq( +Row(Seq("dc", "bc", "ab"))) +) + checkAnswer( df2.selectExpr("array_sort(a, (x, y) -> fString(x, y))"), Seq( @@ -454,6 +472,12 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession { ) val df3 = Seq(Array[String]("a", "abcd", "abc")).toDF("a") +checkAnswer( + df3.select(array_sort(col("a"), (x, y) => call_udf("fStringLength", x, y))), + Seq( +Row(Seq("a", "abc", "abcd"))) +) + checkAnswer( df3.selectExpr("array_sort(a, (x, y) -> fStringLength(x, y))"), Seq( @@ -462,6 +486,12 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession { val df4 = Seq((Array[Array[Int]](Array(