[spark] branch master updated: [SPARK-44340][SPARK-44341][SQL][PYTHON][FOLLOWUP] Set partition index correctly for WindowGroupLimitExec,WindowExec and WindowInPandasExec
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ea3061beedf [SPARK-44340][SPARK-44341][SQL][PYTHON][FOLLOWUP] Set partition index correctly for WindowGroupLimitExec,WindowExec and WindowInPandasExec ea3061beedf is described below commit ea3061beedf7dc10f14e8de27d540dbcc5894fe7 Author: Jiaan Geng AuthorDate: Mon Jul 31 13:53:32 2023 +0800 [SPARK-44340][SPARK-44341][SQL][PYTHON][FOLLOWUP] Set partition index correctly for WindowGroupLimitExec,WindowExec and WindowInPandasExec ### What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/41899 and https://github.com/apache/spark/pull/41939, to set the partition index correctly even if it's not used for now. It also contains a few code cleanup. ### Why are the changes needed? future-proof ### Does this PR introduce _any_ user-facing change? 'No'. ### How was this patch tested? existing tests Closes #42208 from beliefer/SPARK-44340_followup. Authored-by: Jiaan Geng Signed-off-by: Wenchen Fan --- .../apache/spark/sql/execution/python/WindowInPandasExec.scala| 8 +++- .../scala/org/apache/spark/sql/execution/window/WindowExec.scala | 8 +++- .../apache/spark/sql/execution/window/WindowGroupLimitExec.scala | 8 +++- 3 files changed, 9 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala index ba1f2c132ff..ee0044162b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala @@ -80,24 +80,22 @@ case class WindowInPandasExec( ) protected override def doExecute(): RDD[InternalRow] = { -val spillSize = longMetric("spillSize") - val evaluatorFactory = new WindowInPandasEvaluatorFactory( windowExpression, partitionSpec, orderSpec, child.output, -spillSize, +longMetric("spillSize"), pythonMetrics) // Start processing. if (conf.usePartitionEvaluator) { child.execute().mapPartitionsWithEvaluator(evaluatorFactory) } else { - child.execute().mapPartitions { iter => + child.execute().mapPartitionsWithIndex { (index, rowIterator) => val evaluator = evaluatorFactory.createEvaluator() -evaluator.eval(0, iter) +evaluator.eval(index, rowIterator) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala index 35e59aef94f..9ecd1c587a7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala @@ -95,23 +95,21 @@ case class WindowExec( ) protected override def doExecute(): RDD[InternalRow] = { -val spillSize = longMetric("spillSize") - val evaluatorFactory = new WindowEvaluatorFactory( windowExpression, partitionSpec, orderSpec, child.output, -spillSize) +longMetric("spillSize")) // Start processing. if (conf.usePartitionEvaluator) { child.execute().mapPartitionsWithEvaluator(evaluatorFactory) } else { - child.execute().mapPartitions { iter => + child.execute().mapPartitionsWithIndex { (index, rowIterator) => val evaluator = evaluatorFactory.createEvaluator() -evaluator.eval(0, iter) +evaluator.eval(index, rowIterator) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExec.scala index 98969f60c2b..e975f3b219a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExec.scala @@ -72,8 +72,6 @@ case class WindowGroupLimitExec( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) protected override def doExecute(): RDD[InternalRow] = { -val numOutputRows = longMetric("numOutputRows") - val evaluatorFactory = new WindowGroupLimitEvaluatorFactory( partitionSpec, @@ -81,14 +79,14 @@ case class WindowGroupLimitExec( rankLikeFunction, limit, child.output, -numOutputRows) +longMetric("numOutputRows")) if (conf.us
[spark] branch branch-3.5 updated: [SPARK-44542][CORE] Eagerly load SparkExitCode class in exception handler
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 47224b39f6c [SPARK-44542][CORE] Eagerly load SparkExitCode class in exception handler 47224b39f6c is described below commit 47224b39f6c937cadf5946870a4dc8d0dabdfa40 Author: Xianjin AuthorDate: Sun Jul 30 22:12:39 2023 -0500 [SPARK-44542][CORE] Eagerly load SparkExitCode class in exception handler ### What changes were proposed in this pull request? 1. eagerly load SparkExitCode class in the the SparkUncaughtExceptionHandler ### Why are the changes needed? In some extreme case, it's possible for SparkUncaughtExceptionHandler's exit/halt process function calls throw an exception if the SparkExitCode is not loaded earlier, See corresponding jira: [SPARK-44542](https://issues.apache.org/jira/browse/SPARK-44542) for more details. By eagerly load SparkExitCode class, we can make sure at least the halt/exit would work properly. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? No logic change, hence no new UTs. Closes #42195 from advancedxy/SPARK-44542. Authored-by: Xianjin Signed-off-by: Sean Owen (cherry picked from commit 32498b390db99c9451b14c643456437a023c0d93) Signed-off-by: Sean Owen --- .../scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala | 6 ++ 1 file changed, 6 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala index e7712875536..b24129eb369 100644 --- a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala +++ b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala @@ -28,6 +28,12 @@ import org.apache.spark.internal.Logging private[spark] class SparkUncaughtExceptionHandler(val exitOnUncaughtException: Boolean = true) extends Thread.UncaughtExceptionHandler with Logging { + locally { +// eagerly load SparkExitCode class, so the System.exit and runtime.halt have a chance to be +// executed when the disk containing Spark jars is corrupted. See SPARK-44542 for more details. +val _ = SparkExitCode.OOM + } + override def uncaughtException(thread: Thread, exception: Throwable): Unit = { try { // Make it explicit that uncaught exceptions are thrown when container is shutting down. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-44542][CORE] Eagerly load SparkExitCode class in exception handler
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 32498b390db [SPARK-44542][CORE] Eagerly load SparkExitCode class in exception handler 32498b390db is described below commit 32498b390db99c9451b14c643456437a023c0d93 Author: Xianjin AuthorDate: Sun Jul 30 22:12:39 2023 -0500 [SPARK-44542][CORE] Eagerly load SparkExitCode class in exception handler ### What changes were proposed in this pull request? 1. eagerly load SparkExitCode class in the the SparkUncaughtExceptionHandler ### Why are the changes needed? In some extreme case, it's possible for SparkUncaughtExceptionHandler's exit/halt process function calls throw an exception if the SparkExitCode is not loaded earlier, See corresponding jira: [SPARK-44542](https://issues.apache.org/jira/browse/SPARK-44542) for more details. By eagerly load SparkExitCode class, we can make sure at least the halt/exit would work properly. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? No logic change, hence no new UTs. Closes #42195 from advancedxy/SPARK-44542. Authored-by: Xianjin Signed-off-by: Sean Owen --- .../scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala | 6 ++ 1 file changed, 6 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala index e7712875536..b24129eb369 100644 --- a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala +++ b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala @@ -28,6 +28,12 @@ import org.apache.spark.internal.Logging private[spark] class SparkUncaughtExceptionHandler(val exitOnUncaughtException: Boolean = true) extends Thread.UncaughtExceptionHandler with Logging { + locally { +// eagerly load SparkExitCode class, so the System.exit and runtime.halt have a chance to be +// executed when the disk containing Spark jars is corrupted. See SPARK-44542 for more details. +val _ = SparkExitCode.OOM + } + override def uncaughtException(thread: Thread, exception: Throwable): Unit = { try { // Make it explicit that uncaught exceptions are thrown when container is shutting down. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.5 updated: [MINOR][TESTS] Rename ArrowParityTests to JobCancellationTests
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 455ffd3527b [MINOR][TESTS] Rename ArrowParityTests to JobCancellationTests 455ffd3527b is described below commit 455ffd3527ba572d867b6d331cbac217f34eb3e0 Author: Hyukjin Kwon AuthorDate: Mon Jul 31 11:40:21 2023 +0900 [MINOR][TESTS] Rename ArrowParityTests to JobCancellationTests ### What changes were proposed in this pull request? This PR is a followup of https://github.com/apache/spark/pull/42120 that fixes the test class name from `ArrowParityTests` to `JobCancellationTests`. ### Why are the changes needed? The test class name doesn't make sense. It tests job cancellation. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unittests should be good enough. Closes #42229 from HyukjinKwon/minor-rename. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon (cherry picked from commit af6e1775185bf8f400215a636e4c7a133683700a) Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/tests/connect/test_session.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/tests/connect/test_session.py b/python/pyspark/sql/tests/connect/test_session.py index 0482f119d63..131b1b853ac 100644 --- a/python/pyspark/sql/tests/connect/test_session.py +++ b/python/pyspark/sql/tests/connect/test_session.py @@ -77,7 +77,7 @@ class SparkSessionTestCase(unittest.TestCase): session.stop() -class ArrowParityTests(ReusedConnectTestCase): +class JobCancellationTests(ReusedConnectTestCase): def test_tags(self): self.spark.clearTags() self.spark.addTag("a") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [MINOR][TESTS] Rename ArrowParityTests to JobCancellationTests
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new af6e1775185 [MINOR][TESTS] Rename ArrowParityTests to JobCancellationTests af6e1775185 is described below commit af6e1775185bf8f400215a636e4c7a133683700a Author: Hyukjin Kwon AuthorDate: Mon Jul 31 11:40:21 2023 +0900 [MINOR][TESTS] Rename ArrowParityTests to JobCancellationTests ### What changes were proposed in this pull request? This PR is a followup of https://github.com/apache/spark/pull/42120 that fixes the test class name from `ArrowParityTests` to `JobCancellationTests`. ### Why are the changes needed? The test class name doesn't make sense. It tests job cancellation. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unittests should be good enough. Closes #42229 from HyukjinKwon/minor-rename. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/tests/connect/test_session.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/tests/connect/test_session.py b/python/pyspark/sql/tests/connect/test_session.py index 17dd4cefd21..365468c8608 100644 --- a/python/pyspark/sql/tests/connect/test_session.py +++ b/python/pyspark/sql/tests/connect/test_session.py @@ -78,7 +78,7 @@ class SparkSessionTestCase(unittest.TestCase): session.stop() -class ArrowParityTests(ReusedConnectTestCase): +class JobCancellationTests(ReusedConnectTestCase): def test_tags(self): self.spark.clearTags() self.spark.addTag("a") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-43402][SQL] FileSourceScanExec supports push down data filter with scalar subquery
This is an automated email from the ASF dual-hosted git repository. ulyssesyou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new b8cb9d552aa [SPARK-43402][SQL] FileSourceScanExec supports push down data filter with scalar subquery b8cb9d552aa is described below commit b8cb9d5526d6d8211555d3d00fff1e394015 Author: ulysses-you AuthorDate: Mon Jul 31 10:14:58 2023 +0800 [SPARK-43402][SQL] FileSourceScanExec supports push down data filter with scalar subquery ### What changes were proposed in this pull request? Scalar subquery can be pushed down as data filter at runtime, since we always execute subquery first. Ideally, we can rewrite `ScalarSubquery` to `Literal` before pushing down filter. The main issue before we do not support that is `ReuseSubquery` is ineffective, see https://github.com/apache/spark/pull/22518. It is not a issue now. For example: ```sql SELECT * FROM t1 WHERE c1 > (SELECT min(c2) FROM t2) ``` ### Why are the changes needed? Improve peformance if data filter have scalar subquery. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? add test Closes #41088 from ulysses-you/SPARK-43402. Authored-by: ulysses-you Signed-off-by: Xiduo You --- .../spark/sql/execution/DataSourceScanExec.scala | 24 +++-- .../execution/datasources/FileSourceStrategy.scala | 18 --- .../org/apache/spark/sql/execution/subquery.scala | 6 ++- .../sql-tests/results/explain-aqe.sql.out | 58 ++ .../sql-tests/results/explain-cbo.sql.out | 12 - .../resources/sql-tests/results/explain.sql.out| 22 +--- .../approved-plans-v1_4/q14b.sf100/explain.txt | 22 .../approved-plans-v1_4/q14b.sf100/simplified.txt | 30 +-- .../approved-plans-v1_4/q14b/explain.txt | 22 .../approved-plans-v1_4/q14b/simplified.txt| 30 +-- .../approved-plans-v1_4/q54.sf100/explain.txt | 10 ++-- .../approved-plans-v1_4/q54.sf100/simplified.txt | 50 ++- .../approved-plans-v1_4/q54/explain.txt| 10 ++-- .../approved-plans-v1_4/q54/simplified.txt | 50 ++- .../approved-plans-v1_4/q58.sf100/explain.txt | 10 ++-- .../approved-plans-v1_4/q58.sf100/simplified.txt | 15 +++--- .../approved-plans-v1_4/q58/explain.txt| 10 ++-- .../approved-plans-v1_4/q58/simplified.txt | 15 +++--- .../approved-plans-v1_4/q6.sf100/explain.txt | 6 ++- .../approved-plans-v1_4/q6.sf100/simplified.txt| 25 +- .../approved-plans-v1_4/q6/explain.txt | 6 ++- .../approved-plans-v1_4/q6/simplified.txt | 25 +- .../approved-plans-v2_7/q14.sf100/explain.txt | 22 .../approved-plans-v2_7/q14.sf100/simplified.txt | 30 +-- .../approved-plans-v2_7/q14/explain.txt| 22 .../approved-plans-v2_7/q14/simplified.txt | 30 +-- .../approved-plans-v2_7/q6.sf100/explain.txt | 6 ++- .../approved-plans-v2_7/q6.sf100/simplified.txt| 25 +- .../approved-plans-v2_7/q6/explain.txt | 6 ++- .../approved-plans-v2_7/q6/simplified.txt | 25 +- .../resources/tpch-plan-stability/q22/explain.txt | 6 ++- .../tpch-plan-stability/q22/simplified.txt | 25 +- .../spark/sql/DynamicPartitionPruningSuite.scala | 3 +- .../scala/org/apache/spark/sql/SubquerySuite.scala | 45 - 34 files changed, 452 insertions(+), 269 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 6375cdacaa0..a739fa40c71 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.util.{truncatedString, CaseInsensitiveMap} import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.execution import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource} import org.apache.spark.sql.execution.datasources.v2.PushedDownOperators @@ -370,8 +371,7 @@ trait FileSourceScanLike extends DataSourceScanExec { } } - @transient - protected lazy val pushedDownFilters = { + private def translatePushedDownFilters(dataFilters: Seq[Expression]): Seq[Filter] = { val supportNest
[spark] branch branch-3.4 updated: [DOCS] Update concat and concat_ws documentation to point out unexpected behavior
This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new c7671a2a066 [DOCS] Update concat and concat_ws documentation to point out unexpected behavior c7671a2a066 is described below commit c7671a2a0667d0d8f8bc191e5be93cd21b0bb4db Author: Christopher Watford <132389385+watford...@users.noreply.github.com> AuthorDate: Mon Jul 31 10:11:28 2023 +0800 [DOCS] Update concat and concat_ws documentation to point out unexpected behavior Add documentation covering unexpected behavior of concat and concat_ws with respect to null values. ### What changes were proposed in this pull request? Adds additional documentation to `concat` and `concat_ws`. ### Why are the changes needed? The behavior of `concat` and `concat_ws` were unexpected w.r.t. null values and the documentation did not help make their behavior clear. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Closes #42153 from watfordkcf/patch-1. Lead-authored-by: Christopher Watford <132389385+watford...@users.noreply.github.com> Co-authored-by: Christopher Watford Signed-off-by: Kent Yao (cherry picked from commit ff022e54c2caedb164ce943dc628fcf607eddcdd) Signed-off-by: Kent Yao --- .../apache/spark/sql/catalyst/expressions/stringExpressions.scala | 6 +- sql/core/src/main/scala/org/apache/spark/sql/functions.scala| 4 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala index 1e58384c81d..507e4200378 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala @@ -53,13 +53,17 @@ import org.apache.spark.unsafe.types.{ByteArray, UTF8String} */ // scalastyle:off line.size.limit @ExpressionDescription( - usage = "_FUNC_(sep[, str | array(str)]+) - Returns the concatenation of the strings separated by `sep`.", + usage = "_FUNC_(sep[, str | array(str)]+) - Returns the concatenation of the strings separated by `sep`, skipping null values.", examples = """ Examples: > SELECT _FUNC_(' ', 'Spark', 'SQL'); Spark SQL > SELECT _FUNC_('s'); + > SELECT _FUNC_('/', 'foo', null, 'bar'); +foo/bar + > SELECT _FUNC_(null, 'Spark', 'SQL'); +NULL """, since = "1.5.0", group = "string_funcs") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index cb5c1ad5c49..960ce731ac6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -2643,6 +2643,8 @@ object functions { * Concatenates multiple input string columns together into a single string column, * using the given separator. * + * @note Input strings which are null are skipped. + * * @group string_funcs * @since 1.5.0 */ @@ -3959,6 +3961,8 @@ object functions { * Concatenates multiple input columns together into a single column. * The function works with strings, binary and compatible array columns. * + * @note Returns null if any of the input columns are null. + * * @group collection_funcs * @since 1.5.0 */ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.5 updated: [DOCS] Update concat and concat_ws documentation to point out unexpected behavior
This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 4f3994c169f [DOCS] Update concat and concat_ws documentation to point out unexpected behavior 4f3994c169f is described below commit 4f3994c169faca6a712be3c30a62007422280093 Author: Christopher Watford <132389385+watford...@users.noreply.github.com> AuthorDate: Mon Jul 31 10:11:28 2023 +0800 [DOCS] Update concat and concat_ws documentation to point out unexpected behavior Add documentation covering unexpected behavior of concat and concat_ws with respect to null values. ### What changes were proposed in this pull request? Adds additional documentation to `concat` and `concat_ws`. ### Why are the changes needed? The behavior of `concat` and `concat_ws` were unexpected w.r.t. null values and the documentation did not help make their behavior clear. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Closes #42153 from watfordkcf/patch-1. Lead-authored-by: Christopher Watford <132389385+watford...@users.noreply.github.com> Co-authored-by: Christopher Watford Signed-off-by: Kent Yao (cherry picked from commit ff022e54c2caedb164ce943dc628fcf607eddcdd) Signed-off-by: Kent Yao --- .../apache/spark/sql/catalyst/expressions/stringExpressions.scala | 6 +- sql/core/src/main/scala/org/apache/spark/sql/functions.scala| 4 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala index 03596ac40b1..46f8e1a9d67 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala @@ -53,13 +53,17 @@ import org.apache.spark.unsafe.types.{ByteArray, UTF8String} */ // scalastyle:off line.size.limit @ExpressionDescription( - usage = "_FUNC_(sep[, str | array(str)]+) - Returns the concatenation of the strings separated by `sep`.", + usage = "_FUNC_(sep[, str | array(str)]+) - Returns the concatenation of the strings separated by `sep`, skipping null values.", examples = """ Examples: > SELECT _FUNC_(' ', 'Spark', 'SQL'); Spark SQL > SELECT _FUNC_('s'); + > SELECT _FUNC_('/', 'foo', null, 'bar'); +foo/bar + > SELECT _FUNC_(null, 'Spark', 'SQL'); +NULL """, since = "1.5.0", group = "string_funcs") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index ca5e4422ca9..e7e8b945d91 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -3822,6 +3822,8 @@ object functions { * Concatenates multiple input string columns together into a single string column, * using the given separator. * + * @note Input strings which are null are skipped. + * * @group string_funcs * @since 1.5.0 */ @@ -6019,6 +6021,8 @@ object functions { * Concatenates multiple input columns together into a single column. * The function works with strings, binary and compatible array columns. * + * @note Returns null if any of the input columns are null. + * * @group collection_funcs * @since 1.5.0 */ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [DOCS] Update concat and concat_ws documentation to point out unexpected behavior
This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ff022e54c2c [DOCS] Update concat and concat_ws documentation to point out unexpected behavior ff022e54c2c is described below commit ff022e54c2caedb164ce943dc628fcf607eddcdd Author: Christopher Watford <132389385+watford...@users.noreply.github.com> AuthorDate: Mon Jul 31 10:11:28 2023 +0800 [DOCS] Update concat and concat_ws documentation to point out unexpected behavior Add documentation covering unexpected behavior of concat and concat_ws with respect to null values. ### What changes were proposed in this pull request? Adds additional documentation to `concat` and `concat_ws`. ### Why are the changes needed? The behavior of `concat` and `concat_ws` were unexpected w.r.t. null values and the documentation did not help make their behavior clear. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Closes #42153 from watfordkcf/patch-1. Lead-authored-by: Christopher Watford <132389385+watford...@users.noreply.github.com> Co-authored-by: Christopher Watford Signed-off-by: Kent Yao --- .../apache/spark/sql/catalyst/expressions/stringExpressions.scala | 6 +- sql/core/src/main/scala/org/apache/spark/sql/functions.scala| 4 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala index 03596ac40b1..46f8e1a9d67 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala @@ -53,13 +53,17 @@ import org.apache.spark.unsafe.types.{ByteArray, UTF8String} */ // scalastyle:off line.size.limit @ExpressionDescription( - usage = "_FUNC_(sep[, str | array(str)]+) - Returns the concatenation of the strings separated by `sep`.", + usage = "_FUNC_(sep[, str | array(str)]+) - Returns the concatenation of the strings separated by `sep`, skipping null values.", examples = """ Examples: > SELECT _FUNC_(' ', 'Spark', 'SQL'); Spark SQL > SELECT _FUNC_('s'); + > SELECT _FUNC_('/', 'foo', null, 'bar'); +foo/bar + > SELECT _FUNC_(null, 'Spark', 'SQL'); +NULL """, since = "1.5.0", group = "string_funcs") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index ca5e4422ca9..e7e8b945d91 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -3822,6 +3822,8 @@ object functions { * Concatenates multiple input string columns together into a single string column, * using the given separator. * + * @note Input strings which are null are skipped. + * * @group string_funcs * @since 1.5.0 */ @@ -6019,6 +6021,8 @@ object functions { * Concatenates multiple input columns together into a single column. * The function works with strings, binary and compatible array columns. * + * @note Returns null if any of the input columns are null. + * * @group collection_funcs * @since 1.5.0 */ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [MINOR][TESTS] Clearing residual files after SparkSubmitSuite
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new c93b410c81e [MINOR][TESTS] Clearing residual files after SparkSubmitSuite c93b410c81e is described below commit c93b410c81e93cd81bed080aea2c8095d3acb956 Author: panbingkun AuthorDate: Mon Jul 31 10:40:22 2023 +0900 [MINOR][TESTS] Clearing residual files after SparkSubmitSuite ### What changes were proposed in this pull request? The pr aims to clear residual files after SparkSubmitSuite ("SPARK-35084: include jars of the --packages in k8s client mode & driver runs inside a POD") ### Why are the changes needed? Clear residual files after UT. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually Test. ./build/sbt "core/testOnly *SparkSubmitSuite -- -z \"SPARK-35084: include jars of the --packages in k8s client mode & driver runs inside a POD\"" Closes #42201 from panbingkun/minor_SparkSubmitSuite_clean. Authored-by: panbingkun Signed-off-by: Hyukjin Kwon --- .../scala/org/apache/spark/deploy/SparkSubmitSuite.scala | 15 +-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 8e2d6e6cf5f..7c787f83c6a 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -510,8 +510,19 @@ class SparkSubmitSuite "my.great.lib.MyLib", "my.great.dep.MyLib") val appArgs = new SparkSubmitArguments(clArgs) - val (_, _, sparkConf, _) = submit.prepareSubmitEnvironment(appArgs) - sparkConf.get("spark.jars").contains("mylib") shouldBe true + try { +val (_, _, sparkConf, _) = submit.prepareSubmitEnvironment(appArgs) +sparkConf.get("spark.jars").contains("mylib") shouldBe true + } finally { +val mainJarPath = Paths.get("my.great.dep_mylib-0.1.jar") +val depJarPath = Paths.get("my.great.lib_mylib-0.1.jar") +if (Files.exists(mainJarPath)) { + Files.delete(mainJarPath) +} +if (Files.exists(depJarPath)) { + Files.delete(depJarPath) +} + } } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-44524][BUILD] Balancing pyspark-pandas-connect and pyspark-pandas-slow-connect GA testing time
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new be95298f246 [SPARK-44524][BUILD] Balancing pyspark-pandas-connect and pyspark-pandas-slow-connect GA testing time be95298f246 is described below commit be95298f24669cddaa65e412d7681476149e2977 Author: panbingkun AuthorDate: Mon Jul 31 09:13:18 2023 +0800 [SPARK-44524][BUILD] Balancing pyspark-pandas-connect and pyspark-pandas-slow-connect GA testing time ### What changes were proposed in this pull request? The pr aims to balancing `pyspark-pandas-connect` and `pyspark-pandas-slow-connect` GA testing time. ### Why are the changes needed? After pr: https://github.com/apache/spark/pull/42146, the difference in testing time between `pyspark-pandas-connect` and `pyspark-pandas-slow-connect` is a bit significant, which affects the overall running time. In order to make GA operation more efficient and stable. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Pass GA. - Manually monitor GA. Closes #42115 from panbingkun/free_disk_space. Lead-authored-by: panbingkun Co-authored-by: panbingkun <84731...@qq.com> Signed-off-by: Ruifeng Zheng --- dev/sparktestsupport/modules.py | 35 +-- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 79c3f8f26b1..6830039ba1c 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -891,7 +891,7 @@ pyspark_connect = Module( pyspark_pandas_connect = Module( name="pyspark-pandas-connect", -dependencies=[pyspark_connect, pyspark_pandas], +dependencies=[pyspark_connect, pyspark_pandas, pyspark_pandas_slow], source_file_regexes=[ "python/pyspark/pandas", ], @@ -949,23 +949,6 @@ pyspark_pandas_connect = Module( "pyspark.pandas.tests.connect.test_parity_utils", "pyspark.pandas.tests.connect.test_parity_window", "pyspark.pandas.tests.connect.indexes.test_parity_base", -], -excluded_python_implementations=[ -"PyPy" # Skip these tests under PyPy since they require numpy, pandas, and pyarrow and -# they aren't available there -], -) - - -# This module should contain the same test list with 'pyspark_pandas_slow' for maintenance. -pyspark_pandas_slow_connect = Module( -name="pyspark-pandas-slow-connect", -dependencies=[pyspark_connect, pyspark_pandas_slow], -source_file_regexes=[ -"python/pyspark/pandas", -], -python_test_goals=[ -# pandas-on-Spark unittests "pyspark.pandas.tests.connect.indexes.test_parity_datetime", "pyspark.pandas.tests.connect.indexes.test_parity_align", "pyspark.pandas.tests.connect.indexes.test_parity_indexing", @@ -985,6 +968,22 @@ pyspark_pandas_slow_connect = Module( "pyspark.pandas.tests.connect.computation.test_parity_melt", "pyspark.pandas.tests.connect.computation.test_parity_missing_data", "pyspark.pandas.tests.connect.computation.test_parity_pivot", +], +excluded_python_implementations=[ +"PyPy" # Skip these tests under PyPy since they require numpy, pandas, and pyarrow and +# they aren't available there +], +) + + +pyspark_pandas_slow_connect = Module( +name="pyspark-pandas-slow-connect", +dependencies=[pyspark_connect, pyspark_pandas, pyspark_pandas_slow], +source_file_regexes=[ +"python/pyspark/pandas", +], +python_test_goals=[ +# pandas-on-Spark unittests "pyspark.pandas.tests.connect.frame.test_parity_attrs", "pyspark.pandas.tests.connect.frame.test_parity_constructor", "pyspark.pandas.tests.connect.frame.test_parity_conversion", - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.5 updated: [SPARK-44587][SQL][CONNECT] Increase protobuf marshaller recursion limit
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 30efbfdcc8c [SPARK-44587][SQL][CONNECT] Increase protobuf marshaller recursion limit 30efbfdcc8c is described below commit 30efbfdcc8c237c536a2320a688675f4e69bb075 Author: Yihong He AuthorDate: Mon Jul 31 09:39:13 2023 +0900 [SPARK-44587][SQL][CONNECT] Increase protobuf marshaller recursion limit - Use customized marshallers for spark connect grpc methods - Increase Protobuf marshaller recursion limit - Nested DFs fail easily No `build/sbt "connect-client-jvm/testOnly *ClientE2ETestSuite"` Closes #42212 from heyihong/SPARK-44587-2. Authored-by: Yihong He Signed-off-by: Hyukjin Kwon (cherry picked from commit 55391f633a43113bdd36b5720f5a5f6d6a9daed8) Signed-off-by: Hyukjin Kwon --- .../org/apache/spark/sql/ClientE2ETestSuite.scala | 8 +++ .../apache/spark/sql/connect/config/Connect.scala | 10 .../sql/connect/service/SparkConnectService.scala | 60 +++--- 3 files changed, 72 insertions(+), 6 deletions(-) diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala index 36f47cc1fba..1403d460b51 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala @@ -43,6 +43,14 @@ import org.apache.spark.sql.types._ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper with PrivateMethodTester { + test("spark deep recursion") { +var df = spark.range(1) +for (a <- 1 to 500) { + df = df.union(spark.range(a, a + 1)) +} +assert(df.collect().length == 501) + } + test("many tables") { withSQLConf("spark.sql.execution.arrow.maxRecordsPerBatch" -> "10") { val numTables = 20 diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala index 2a805c45392..15288a65a45 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala @@ -56,6 +56,16 @@ object Connect { .bytesConf(ByteUnit.BYTE) .createWithDefault(ConnectCommon.CONNECT_GRPC_MAX_MESSAGE_SIZE) + val CONNECT_GRPC_MARSHALLER_RECURSION_LIMIT = +ConfigBuilder("spark.connect.grpc.marshallerRecursionLimit") + .internal() + .doc(""" + |Sets the recursion limit to grpc protobuf messages. + |""".stripMargin) + .version("3.5.0") + .intConf + .createWithDefault(1024) + val CONNECT_EXTENSIONS_RELATION_CLASSES = ConfigBuilder("spark.connect.extensions.relation.classes") .doc(""" diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala index 121d2accf6b..35a9df82d30 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala @@ -20,21 +20,27 @@ package org.apache.spark.sql.connect.service import java.util.UUID import java.util.concurrent.TimeUnit +import scala.jdk.CollectionConverters._ + import com.google.common.base.Ticker import com.google.common.cache.{CacheBuilder, RemovalListener, RemovalNotification} -import io.grpc.Server +import com.google.protobuf.MessageLite +import io.grpc.{BindableService, MethodDescriptor, Server, ServerMethodDefinition, ServerServiceDefinition} +import io.grpc.MethodDescriptor.PrototypeMarshaller import io.grpc.netty.NettyServerBuilder +import io.grpc.protobuf.lite.ProtoLiteUtils import io.grpc.protobuf.services.ProtoReflectionService import io.grpc.stub.StreamObserver import org.apache.commons.lang3.StringUtils import org.apache.spark.{SparkContext, SparkEnv, SparkSQLException} import org.apache.spark.connect.proto -import org.apache.spark.connect.proto.{AddArtifactsRequest, AddArtifactsResponse} +import org.apache.spark.connect.proto.{AddArtifactsRequest, AddArtifactsResponse, SparkConnectServiceGrpc} +import org.apache.spark.connect.proto.SparkConnectServiceGrpc.AsyncService import org.apache.spark.internal.Logging import org.apache.spark.internal.config.UI.UI_ENABLED import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.connect.config.Connect.{
[spark] branch master updated: [SPARK-44587][SQL][CONNECT] Increase protobuf marshaller recursion limit
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 55391f633a4 [SPARK-44587][SQL][CONNECT] Increase protobuf marshaller recursion limit 55391f633a4 is described below commit 55391f633a43113bdd36b5720f5a5f6d6a9daed8 Author: Yihong He AuthorDate: Mon Jul 31 09:39:13 2023 +0900 [SPARK-44587][SQL][CONNECT] Increase protobuf marshaller recursion limit ### What changes were proposed in this pull request? - Use customized marshallers for spark connect grpc methods - Increase Protobuf marshaller recursion limit ### Why are the changes needed? - Nested DFs fail easily ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? `build/sbt "connect-client-jvm/testOnly *ClientE2ETestSuite"` Closes #42212 from heyihong/SPARK-44587-2. Authored-by: Yihong He Signed-off-by: Hyukjin Kwon --- .../org/apache/spark/sql/ClientE2ETestSuite.scala | 8 +++ .../apache/spark/sql/connect/config/Connect.scala | 10 .../sql/connect/service/SparkConnectService.scala | 60 +++--- 3 files changed, 72 insertions(+), 6 deletions(-) diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala index 36f47cc1fba..1403d460b51 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala @@ -43,6 +43,14 @@ import org.apache.spark.sql.types._ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper with PrivateMethodTester { + test("spark deep recursion") { +var df = spark.range(1) +for (a <- 1 to 500) { + df = df.union(spark.range(a, a + 1)) +} +assert(df.collect().length == 501) + } + test("many tables") { withSQLConf("spark.sql.execution.arrow.maxRecordsPerBatch" -> "10") { val numTables = 20 diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala index 23aa42bad30..142b206fbf4 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala @@ -62,6 +62,16 @@ object Connect { .bytesConf(ByteUnit.BYTE) .createWithDefault(ConnectCommon.CONNECT_GRPC_MAX_MESSAGE_SIZE) + val CONNECT_GRPC_MARSHALLER_RECURSION_LIMIT = +ConfigBuilder("spark.connect.grpc.marshallerRecursionLimit") + .internal() + .doc(""" + |Sets the recursion limit to grpc protobuf messages. + |""".stripMargin) + .version("3.5.0") + .intConf + .createWithDefault(1024) + val CONNECT_EXTENSIONS_RELATION_CLASSES = ConfigBuilder("spark.connect.extensions.relation.classes") .doc(""" diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala index 206e24714fe..8f93d5083f4 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala @@ -21,21 +21,27 @@ import java.net.InetSocketAddress import java.util.UUID import java.util.concurrent.TimeUnit +import scala.jdk.CollectionConverters._ + import com.google.common.base.Ticker import com.google.common.cache.{CacheBuilder, RemovalListener, RemovalNotification} -import io.grpc.Server +import com.google.protobuf.MessageLite +import io.grpc.{BindableService, MethodDescriptor, Server, ServerMethodDefinition, ServerServiceDefinition} +import io.grpc.MethodDescriptor.PrototypeMarshaller import io.grpc.netty.NettyServerBuilder +import io.grpc.protobuf.lite.ProtoLiteUtils import io.grpc.protobuf.services.ProtoReflectionService import io.grpc.stub.StreamObserver import org.apache.commons.lang3.StringUtils import org.apache.spark.{SparkContext, SparkEnv, SparkSQLException} import org.apache.spark.connect.proto -import org.apache.spark.connect.proto.{AddArtifactsRequest, AddArtifactsResponse} +import org.apache.spark.connect.proto.{AddArtifactsRequest, AddArtifactsResponse, SparkConnectServiceGrpc} +import org.apache.spark.connect.proto.SparkConnectServiceGrpc.AsyncService import org.apache.spark.internal.Logging import org.apache.spark.internal.config.UI.UI_ENABLED import org.apa
[spark] branch branch-3.5 updated: [SPARK-44287][SQL][FOLLOWUP] Do not trigger execution too early
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 3a5b737526a [SPARK-44287][SQL][FOLLOWUP] Do not trigger execution too early 3a5b737526a is described below commit 3a5b737526af8b33f7c456d73133729dc159c0f6 Author: Wenchen Fan AuthorDate: Mon Jul 31 09:16:02 2023 +0900 [SPARK-44287][SQL][FOLLOWUP] Do not trigger execution too early ### What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/41839 , to fix an unintentional change. That PR added an optimization to return an empty iterator directly if the input iterator is empty. However, checking `inputIterator.hasNext` may trigger query execution, which is different than before. It should be completely lazy and wait for the root operator's iterator to trigger the execution. ### Why are the changes needed? fix unintentional change ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests Closes #42226 from cloud-fan/fo. Authored-by: Wenchen Fan Signed-off-by: Hyukjin Kwon (cherry picked from commit 0f9cca5b419b09f25c45904aa81bf0515f9e7c44) Signed-off-by: Hyukjin Kwon --- .../sql/execution/ColumnarEvaluatorFactory.scala | 57 ++ 1 file changed, 26 insertions(+), 31 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarEvaluatorFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarEvaluatorFactory.scala index 949722d3cc2..960d4b74a1b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarEvaluatorFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarEvaluatorFactory.scala @@ -70,42 +70,37 @@ class RowToColumnarEvaluatorFactory( inputs: Iterator[InternalRow]*): Iterator[ColumnarBatch] = { assert(inputs.length == 1) val rowIterator = inputs.head + new Iterator[ColumnarBatch] { +private lazy val converters = new RowToColumnConverter(schema) +private lazy val vectors: Seq[WritableColumnVector] = if (enableOffHeapColumnVector) { + OffHeapColumnVector.allocateColumns(numRows, schema) +} else { + OnHeapColumnVector.allocateColumns(numRows, schema) +} +private lazy val cb: ColumnarBatch = new ColumnarBatch(vectors.toArray) - if (rowIterator.hasNext) { -new Iterator[ColumnarBatch] { - private val converters = new RowToColumnConverter(schema) - private val vectors: Seq[WritableColumnVector] = if (enableOffHeapColumnVector) { -OffHeapColumnVector.allocateColumns(numRows, schema) - } else { -OnHeapColumnVector.allocateColumns(numRows, schema) - } - private val cb: ColumnarBatch = new ColumnarBatch(vectors.toArray) - - TaskContext.get().addTaskCompletionListener[Unit] { _ => -cb.close() - } +TaskContext.get().addTaskCompletionListener[Unit] { _ => + cb.close() +} - override def hasNext: Boolean = { -rowIterator.hasNext - } +override def hasNext: Boolean = { + rowIterator.hasNext +} - override def next(): ColumnarBatch = { -cb.setNumRows(0) -vectors.foreach(_.reset()) -var rowCount = 0 -while (rowCount < numRows && rowIterator.hasNext) { - val row = rowIterator.next() - converters.convert(row, vectors.toArray) - rowCount += 1 -} -cb.setNumRows(rowCount) -numInputRows += rowCount -numOutputBatches += 1 -cb +override def next(): ColumnarBatch = { + cb.setNumRows(0) + vectors.foreach(_.reset()) + var rowCount = 0 + while (rowCount < numRows && rowIterator.hasNext) { +val row = rowIterator.next() +converters.convert(row, vectors.toArray) +rowCount += 1 } + cb.setNumRows(rowCount) + numInputRows += rowCount + numOutputBatches += 1 + cb } - } else { -Iterator.empty } } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-44287][SQL][FOLLOWUP] Do not trigger execution too early
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 0f9cca5b419 [SPARK-44287][SQL][FOLLOWUP] Do not trigger execution too early 0f9cca5b419 is described below commit 0f9cca5b419b09f25c45904aa81bf0515f9e7c44 Author: Wenchen Fan AuthorDate: Mon Jul 31 09:16:02 2023 +0900 [SPARK-44287][SQL][FOLLOWUP] Do not trigger execution too early ### What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/41839 , to fix an unintentional change. That PR added an optimization to return an empty iterator directly if the input iterator is empty. However, checking `inputIterator.hasNext` may trigger query execution, which is different than before. It should be completely lazy and wait for the root operator's iterator to trigger the execution. ### Why are the changes needed? fix unintentional change ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests Closes #42226 from cloud-fan/fo. Authored-by: Wenchen Fan Signed-off-by: Hyukjin Kwon --- .../sql/execution/ColumnarEvaluatorFactory.scala | 57 ++ 1 file changed, 26 insertions(+), 31 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarEvaluatorFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarEvaluatorFactory.scala index 949722d3cc2..960d4b74a1b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarEvaluatorFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarEvaluatorFactory.scala @@ -70,42 +70,37 @@ class RowToColumnarEvaluatorFactory( inputs: Iterator[InternalRow]*): Iterator[ColumnarBatch] = { assert(inputs.length == 1) val rowIterator = inputs.head + new Iterator[ColumnarBatch] { +private lazy val converters = new RowToColumnConverter(schema) +private lazy val vectors: Seq[WritableColumnVector] = if (enableOffHeapColumnVector) { + OffHeapColumnVector.allocateColumns(numRows, schema) +} else { + OnHeapColumnVector.allocateColumns(numRows, schema) +} +private lazy val cb: ColumnarBatch = new ColumnarBatch(vectors.toArray) - if (rowIterator.hasNext) { -new Iterator[ColumnarBatch] { - private val converters = new RowToColumnConverter(schema) - private val vectors: Seq[WritableColumnVector] = if (enableOffHeapColumnVector) { -OffHeapColumnVector.allocateColumns(numRows, schema) - } else { -OnHeapColumnVector.allocateColumns(numRows, schema) - } - private val cb: ColumnarBatch = new ColumnarBatch(vectors.toArray) - - TaskContext.get().addTaskCompletionListener[Unit] { _ => -cb.close() - } +TaskContext.get().addTaskCompletionListener[Unit] { _ => + cb.close() +} - override def hasNext: Boolean = { -rowIterator.hasNext - } +override def hasNext: Boolean = { + rowIterator.hasNext +} - override def next(): ColumnarBatch = { -cb.setNumRows(0) -vectors.foreach(_.reset()) -var rowCount = 0 -while (rowCount < numRows && rowIterator.hasNext) { - val row = rowIterator.next() - converters.convert(row, vectors.toArray) - rowCount += 1 -} -cb.setNumRows(rowCount) -numInputRows += rowCount -numOutputBatches += 1 -cb +override def next(): ColumnarBatch = { + cb.setNumRows(0) + vectors.foreach(_.reset()) + var rowCount = 0 + while (rowCount < numRows && rowIterator.hasNext) { +val row = rowIterator.next() +converters.convert(row, vectors.toArray) +rowCount += 1 } + cb.setNumRows(rowCount) + numInputRows += rowCount + numOutputBatches += 1 + cb } - } else { -Iterator.empty } } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.5 updated: [SPARK-44597][PYTHON][TESTS] Migrate test_sql assert_eq to use assertDataFrameEqual
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 4bd975fa79a [SPARK-44597][PYTHON][TESTS] Migrate test_sql assert_eq to use assertDataFrameEqual 4bd975fa79a is described below commit 4bd975fa79a82a417bc9639dcc3305f5624ac861 Author: Amanda Liu AuthorDate: Mon Jul 31 09:03:20 2023 +0900 [SPARK-44597][PYTHON][TESTS] Migrate test_sql assert_eq to use assertDataFrameEqual ### What changes were proposed in this pull request? This PR updates the `python/pyspark/pandas/tests/test_sql.py` to use the new PySpark test util function, `assertDataFrameEqual`, introduced in [SPARK-44042](https://issues.apache.org/jira/browse/SPARK-44042). ### Why are the changes needed? This change is needed as part of the effort in [SPARK-44589](https://issues.apache.org/jira/browse/SPARK-44589) to unify the tests in the codebase to use new PySpark test utils. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests in `python/pyspark/pandas/tests/test_sql.py`. Closes #42217 from asl3/migrate-test-sql. Authored-by: Amanda Liu Signed-off-by: Hyukjin Kwon (cherry picked from commit 9f03f434358d98e87cd80228ea59f05935a37b9f) Signed-off-by: Hyukjin Kwon --- python/pyspark/pandas/tests/test_sql.py | 25 ++--- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/python/pyspark/pandas/tests/test_sql.py b/python/pyspark/pandas/tests/test_sql.py index ec56fe210f9..7800d5756b0 100644 --- a/python/pyspark/pandas/tests/test_sql.py +++ b/python/pyspark/pandas/tests/test_sql.py @@ -19,6 +19,7 @@ from pyspark import pandas as ps from pyspark.errors import ParseException from pyspark.testing.pandasutils import PandasOnSparkTestCase from pyspark.testing.sqlutils import SQLTestUtils +from pyspark.testing.utils import assertDataFrameEqual class SQLTestsMixin: @@ -49,7 +50,7 @@ class SQLTestsMixin: psdf_reset_index=psdf_reset_index, ) expected = psdf.iloc[[1, 2]] -self.assert_eq(actual, expected) +assertDataFrameEqual(actual, expected) # MultiIndex psdf = ps.DataFrame( @@ -65,27 +66,29 @@ class SQLTestsMixin: psdf_reset_index=psdf_reset_index, ) expected = psdf.iloc[[1, 2]] -self.assert_eq(actual, expected) +assertDataFrameEqual(actual, expected) def test_sql_with_pandas_objects(self): import pandas as pd pdf = pd.DataFrame({"a": [1, 2, 3, 4]}) -self.assert_eq(ps.sql("SELECT {col} + 1 as a FROM {tbl}", col=pdf.a, tbl=pdf), pdf + 1) +assertDataFrameEqual( +ps.sql("SELECT {col} + 1 as a FROM {tbl}", col=pdf.a, tbl=pdf), pdf + 1 +) def test_sql_with_python_objects(self): -self.assert_eq( +assertDataFrameEqual( ps.sql("SELECT {col} as a FROM range(1)", col="lit"), ps.DataFrame({"a": ["lit"]}) ) -self.assert_eq( +assertDataFrameEqual( ps.sql("SELECT id FROM range(10) WHERE id IN {pred}", col="lit", pred=(1, 2, 3)), ps.DataFrame({"id": [1, 2, 3]}), ) -self.assert_eq( +assertDataFrameEqual( ps.sql("SELECT {col} as a FROM range(1)", col="a'''c''d"), ps.DataFrame({"a": ["a'''c''d"]}), ) -self.assert_eq( +assertDataFrameEqual( ps.sql("SELECT id FROM range(10) WHERE id IN {pred}", col="a'''c''d", pred=(1, 2, 3)), ps.DataFrame({"id": [1, 2, 3]}), ) @@ -93,14 +96,14 @@ class SQLTestsMixin: def test_sql_with_pandas_on_spark_objects(self): psdf = ps.DataFrame({"a": [1, 2, 3, 4]}) -self.assert_eq(ps.sql("SELECT {col} FROM {tbl}", col=psdf.a, tbl=psdf), psdf) -self.assert_eq(ps.sql("SELECT {tbl.a} FROM {tbl}", tbl=psdf), psdf) +assertDataFrameEqual(ps.sql("SELECT {col} FROM {tbl}", col=psdf.a, tbl=psdf), psdf) +assertDataFrameEqual(ps.sql("SELECT {tbl.a} FROM {tbl}", tbl=psdf), psdf) psdf = ps.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]}) -self.assert_eq( +assertDataFrameEqual( ps.sql("SELECT {col}, {col2} FROM {tbl}", col=psdf.A, col2=psdf.B, tbl=psdf), psdf ) -self.assert_eq(ps.sql("SELECT {tbl.A}, {tbl.B} FROM {tbl}", tbl=psdf), psdf) +assertDataFrameEqual(ps.sql("SELECT {tbl.A}, {tbl.B} FROM {tbl}", tbl=psdf), psdf) class SQLTests(SQLTestsMixin, PandasOnSparkTestCase, SQLTestUtils): - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-44597][PYTHON][TESTS] Migrate test_sql assert_eq to use assertDataFrameEqual
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 9f03f434358 [SPARK-44597][PYTHON][TESTS] Migrate test_sql assert_eq to use assertDataFrameEqual 9f03f434358 is described below commit 9f03f434358d98e87cd80228ea59f05935a37b9f Author: Amanda Liu AuthorDate: Mon Jul 31 09:03:20 2023 +0900 [SPARK-44597][PYTHON][TESTS] Migrate test_sql assert_eq to use assertDataFrameEqual ### What changes were proposed in this pull request? This PR updates the `python/pyspark/pandas/tests/test_sql.py` to use the new PySpark test util function, `assertDataFrameEqual`, introduced in [SPARK-44042](https://issues.apache.org/jira/browse/SPARK-44042). ### Why are the changes needed? This change is needed as part of the effort in [SPARK-44589](https://issues.apache.org/jira/browse/SPARK-44589) to unify the tests in the codebase to use new PySpark test utils. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests in `python/pyspark/pandas/tests/test_sql.py`. Closes #42217 from asl3/migrate-test-sql. Authored-by: Amanda Liu Signed-off-by: Hyukjin Kwon --- python/pyspark/pandas/tests/test_sql.py | 25 ++--- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/python/pyspark/pandas/tests/test_sql.py b/python/pyspark/pandas/tests/test_sql.py index ec56fe210f9..7800d5756b0 100644 --- a/python/pyspark/pandas/tests/test_sql.py +++ b/python/pyspark/pandas/tests/test_sql.py @@ -19,6 +19,7 @@ from pyspark import pandas as ps from pyspark.errors import ParseException from pyspark.testing.pandasutils import PandasOnSparkTestCase from pyspark.testing.sqlutils import SQLTestUtils +from pyspark.testing.utils import assertDataFrameEqual class SQLTestsMixin: @@ -49,7 +50,7 @@ class SQLTestsMixin: psdf_reset_index=psdf_reset_index, ) expected = psdf.iloc[[1, 2]] -self.assert_eq(actual, expected) +assertDataFrameEqual(actual, expected) # MultiIndex psdf = ps.DataFrame( @@ -65,27 +66,29 @@ class SQLTestsMixin: psdf_reset_index=psdf_reset_index, ) expected = psdf.iloc[[1, 2]] -self.assert_eq(actual, expected) +assertDataFrameEqual(actual, expected) def test_sql_with_pandas_objects(self): import pandas as pd pdf = pd.DataFrame({"a": [1, 2, 3, 4]}) -self.assert_eq(ps.sql("SELECT {col} + 1 as a FROM {tbl}", col=pdf.a, tbl=pdf), pdf + 1) +assertDataFrameEqual( +ps.sql("SELECT {col} + 1 as a FROM {tbl}", col=pdf.a, tbl=pdf), pdf + 1 +) def test_sql_with_python_objects(self): -self.assert_eq( +assertDataFrameEqual( ps.sql("SELECT {col} as a FROM range(1)", col="lit"), ps.DataFrame({"a": ["lit"]}) ) -self.assert_eq( +assertDataFrameEqual( ps.sql("SELECT id FROM range(10) WHERE id IN {pred}", col="lit", pred=(1, 2, 3)), ps.DataFrame({"id": [1, 2, 3]}), ) -self.assert_eq( +assertDataFrameEqual( ps.sql("SELECT {col} as a FROM range(1)", col="a'''c''d"), ps.DataFrame({"a": ["a'''c''d"]}), ) -self.assert_eq( +assertDataFrameEqual( ps.sql("SELECT id FROM range(10) WHERE id IN {pred}", col="a'''c''d", pred=(1, 2, 3)), ps.DataFrame({"id": [1, 2, 3]}), ) @@ -93,14 +96,14 @@ class SQLTestsMixin: def test_sql_with_pandas_on_spark_objects(self): psdf = ps.DataFrame({"a": [1, 2, 3, 4]}) -self.assert_eq(ps.sql("SELECT {col} FROM {tbl}", col=psdf.a, tbl=psdf), psdf) -self.assert_eq(ps.sql("SELECT {tbl.a} FROM {tbl}", tbl=psdf), psdf) +assertDataFrameEqual(ps.sql("SELECT {col} FROM {tbl}", col=psdf.a, tbl=psdf), psdf) +assertDataFrameEqual(ps.sql("SELECT {tbl.a} FROM {tbl}", tbl=psdf), psdf) psdf = ps.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]}) -self.assert_eq( +assertDataFrameEqual( ps.sql("SELECT {col}, {col2} FROM {tbl}", col=psdf.A, col2=psdf.B, tbl=psdf), psdf ) -self.assert_eq(ps.sql("SELECT {tbl.A}, {tbl.B} FROM {tbl}", tbl=psdf), psdf) +assertDataFrameEqual(ps.sql("SELECT {tbl.A}, {tbl.B} FROM {tbl}", tbl=psdf), psdf) class SQLTests(SQLTestsMixin, PandasOnSparkTestCase, SQLTestUtils): - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org