(spark) branch master updated: [MINOR] Fix the grammar of some comments on renaming error classes
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new e521d3c1f357 [MINOR] Fix the grammar of some comments on renaming error classes e521d3c1f357 is described below commit e521d3c1f3578a87e18e5a034705d2520f7b3707 Author: Nicholas Chammas AuthorDate: Thu May 2 08:46:50 2024 +0900 [MINOR] Fix the grammar of some comments on renaming error classes ### What changes were proposed in this pull request? Minor fixes to the English of some comments I added in #44920. ### Why are the changes needed? Proper English -- OK, not _proper_, but more correct at least -- makes things easier to read. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Not tested beyond CI. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46321 from nchammas/error-cond-typo. Authored-by: Nicholas Chammas Signed-off-by: Hyukjin Kwon --- .../utils/src/main/scala/org/apache/spark/SparkThrowableHelper.scala | 2 +- .../main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala| 4 ++-- core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala| 2 +- python/pyspark/errors/error_classes.py| 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/SparkThrowableHelper.scala b/common/utils/src/main/scala/org/apache/spark/SparkThrowableHelper.scala index 6bdafb11e4bd..db5eff72e124 100644 --- a/common/utils/src/main/scala/org/apache/spark/SparkThrowableHelper.scala +++ b/common/utils/src/main/scala/org/apache/spark/SparkThrowableHelper.scala @@ -33,7 +33,7 @@ private[spark] object ErrorMessageFormat extends Enumeration { private[spark] object SparkThrowableHelper { val errorReader = new ErrorClassesJsonReader( // Note that though we call them "error classes" here, the proper name is "error conditions", -// hence why the name of the JSON file different. We will address this inconsistency as part +// hence why the name of the JSON file is different. We will address this inconsistency as part // of this ticket: https://issues.apache.org/jira/browse/SPARK-47429 Seq(SparkClassUtils.getSparkClassLoader.getResource("error/error-conditions.json"))) diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala index 65688f7db352..8dc4e543060d 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala @@ -27,8 +27,8 @@ private object KafkaExceptionsHelper { val errorClassesJsonReader: ErrorClassesJsonReader = new ErrorClassesJsonReader( // Note that though we call them "error classes" here, the proper name is "error conditions", - // hence why the name of the JSON file different. We will address this inconsistency as part - // of this ticket: https://issues.apache.org/jira/browse/SPARK-47429 + // hence why the name of the JSON file is different. We will address this inconsistency as + // part of this ticket: https://issues.apache.org/jira/browse/SPARK-47429 Seq(getClass.getClassLoader.getResource("error/kafka-error-conditions.json"))) } diff --git a/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala index 9e795c3e32ec..231cfdc3f32f 100644 --- a/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala @@ -51,7 +51,7 @@ class SparkThrowableSuite extends SparkFunSuite { private val errorJsonFilePath = getWorkspaceFilePath( // Note that though we call them "error classes" here, the proper name is "error conditions", -// hence why the name of the JSON file different. We will address this inconsistency as part +// hence why the name of the JSON file is different. We will address this inconsistency as part // of this ticket: https://issues.apache.org/jira/browse/SPARK-47429 "common", "utils", "src", "main", "resources", "error", "error-conditions.json") diff --git a/python/pyspark/errors/error_classes.py b/python/pyspark/errors/error_classes.py index c6b60c79b34d..30869a3fbb2d 100644 --- a/python/pyspark/errors/error_classes.py +++ b/python/pyspark/errors/error_classes.p
(spark) branch master updated (fd57c3493af7 -> f86a51921f73)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from fd57c3493af7 [SPARK-47911][SQL] Introduces a universal BinaryFormatter to make binary output consistent add f86a51921f73 [SPARK-48062][PYTHON][SS][TESTS] Add pyspark test for SimpleDataSourceStreamingReader No new revisions were added by this update. Summary of changes: .../sql/tests/test_python_streaming_datasource.py | 111 ++--- 1 file changed, 76 insertions(+), 35 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated (c71d02ab7c80 -> 991763c2cdf8)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from c71d02ab7c80 [SPARK-48028][TESTS] Regenerate benchmark results after turning ANSI on add 991763c2cdf8 [SPARK-46894][PYTHON] Move PySpark error conditions into standalone JSON file No new revisions were added by this update. Summary of changes: python/MANIFEST.in |9 +- python/docs/source/getting_started/install.rst |4 +- .../{error_classes.py => error-conditions.json}| 28 - python/pyspark/errors/error_classes.py | 1165 +--- python/pyspark/errors/exceptions/__init__.py | 40 +- python/pyspark/errors_doc_gen.py |2 +- 6 files changed, 29 insertions(+), 1219 deletions(-) copy python/pyspark/errors/{error_classes.py => error-conditions.json} (97%) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated (12a507464f10 -> 332570f42203)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 12a507464f10 [SPARK-47566][SQL] Support SubstringIndex function to work with collated strings add 332570f42203 [SPARK-48052][PYTHON][CONNECT] Recover `pyspark-connect` CI by parent classes No new revisions were added by this update. Summary of changes: python/pyspark/ml/functions.py | 4 +- python/pyspark/sql/connect/dataframe.py | 48 + python/pyspark/sql/dataframe.py | 115 3 files changed, 97 insertions(+), 70 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated (87b20b166c41 -> e0af82497607)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 87b20b166c41 [SPARK-47585][SQL] SQL core: Migrate logInfo with variables to structured logging framework add e0af82497607 [SPARK-48053][PYTHON][CONNECT] SparkSession.createDataFrame should warn for unsupported options No new revisions were added by this update. Summary of changes: python/pyspark/sql/connect/session.py | 8 python/pyspark/sql/session.py | 6 -- 2 files changed, 12 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-48039][PYTHON][CONNECT] Update the error class for `group.apply`
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new c74f584481d9 [SPARK-48039][PYTHON][CONNECT] Update the error class for `group.apply` c74f584481d9 is described below commit c74f584481d9bcefda7e8ac2a37feb2d61891fe4 Author: Ruifeng Zheng AuthorDate: Mon Apr 29 20:06:22 2024 +0900 [SPARK-48039][PYTHON][CONNECT] Update the error class for `group.apply` ### What changes were proposed in this pull request? Update the error class for `group.apply` ### Why are the changes needed? https://github.com/apache/spark/commit/eae91ee3c96b6887581e59821d905b8ea94f6bc0 introduced a dedicated error class `INVALID_UDF_EVAL_TYPE` for `group.apply`, but only used it in Spark Connect. This PR uses this error class in Spark Classic, to make it consistent. And also enable a parity test `GroupedApplyInPandasTests.test_wrong_args ` ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? ci ### Was this patch authored or co-authored using generative AI tooling? no Closes #46277 from zhengruifeng/fix_test_wrong_args. Authored-by: Ruifeng Zheng Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/pandas/group_ops.py | 10 -- .../tests/connect/test_parity_pandas_grouped_map.py | 4 .../sql/tests/pandas/test_pandas_grouped_map.py | 21 +++-- 3 files changed, 15 insertions(+), 20 deletions(-) diff --git a/python/pyspark/sql/pandas/group_ops.py b/python/pyspark/sql/pandas/group_ops.py index d5b214e2f7d5..3d1c50d94902 100644 --- a/python/pyspark/sql/pandas/group_ops.py +++ b/python/pyspark/sql/pandas/group_ops.py @@ -18,7 +18,7 @@ import sys from typing import List, Union, TYPE_CHECKING, cast import warnings -from pyspark.errors import PySparkValueError +from pyspark.errors import PySparkTypeError from pyspark.util import PythonEvalType from pyspark.sql.column import Column from pyspark.sql.dataframe import DataFrame @@ -100,11 +100,9 @@ class PandasGroupedOpsMixin: != PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF ) ): -raise PySparkValueError( -error_class="INVALID_PANDAS_UDF", -message_parameters={ -"detail": "the udf argument must be a pandas_udf of type GROUPED_MAP." -}, +raise PySparkTypeError( +error_class="INVALID_UDF_EVAL_TYPE", +message_parameters={"eval_type": "SQL_GROUPED_MAP_PANDAS_UDF"}, ) warnings.warn( diff --git a/python/pyspark/sql/tests/connect/test_parity_pandas_grouped_map.py b/python/pyspark/sql/tests/connect/test_parity_pandas_grouped_map.py index f0e7eeb606ca..1cc4ce012623 100644 --- a/python/pyspark/sql/tests/connect/test_parity_pandas_grouped_map.py +++ b/python/pyspark/sql/tests/connect/test_parity_pandas_grouped_map.py @@ -30,10 +30,6 @@ class GroupedApplyInPandasTests(GroupedApplyInPandasTestsMixin, ReusedConnectTes def test_wrong_return_type(self): super().test_wrong_return_type() -@unittest.skip("Fails in Spark Connect, should enable.") -def test_wrong_args(self): -super().test_wrong_args() - @unittest.skip("Fails in Spark Connect, should enable.") def test_unsupported_types(self): super().test_unsupported_types() diff --git a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py index 0396006e2b36..f43dafc0a4a1 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py @@ -52,7 +52,7 @@ from pyspark.sql.types import ( MapType, YearMonthIntervalType, ) -from pyspark.errors import PythonException, PySparkTypeError +from pyspark.errors import PythonException, PySparkTypeError, PySparkValueError from pyspark.testing.sqlutils import ( ReusedSQLTestCase, have_pandas, @@ -421,22 +421,23 @@ class GroupedApplyInPandasTestsMixin: def check_wrong_args(self): df = self.data -with self.assertRaisesRegex(ValueError, "Invalid function"): +with self.assertRaisesRegex(PySparkTypeError, "INVALID_UDF_EVAL_TYPE"): df.groupby("id").apply(lambda x: x) -with self.assertRaisesRegex(ValueError, "Invalid function"): +with self.assertRaisesRegex(PySparkTypeError, "INVALID_UDF_EVAL_TYPE"): df.groupby("id").apply(udf(lambda x: x, DoubleType())) -with self.assertRaisesRegex(ValueError, "Invalid function"): +with
(spark) branch master updated: [SPARK-48002][PYTHON][SS][TESTS] Adds sleep before event testing after query termination
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new eaed585c55e5 [SPARK-48002][PYTHON][SS][TESTS] Adds sleep before event testing after query termination eaed585c55e5 is described below commit eaed585c55e5f242fdc397e62dbc9e068b033af4 Author: Hyukjin Kwon AuthorDate: Mon Apr 29 16:32:28 2024 +0900 [SPARK-48002][PYTHON][SS][TESTS] Adds sleep before event testing after query termination ### What changes were proposed in this pull request? This PR is a followup of https://github.com/apache/spark/pull/46237 that makes to wait 5 secs after the query termination to make sure the events arrive. ### Why are the changes needed? To deflake the test. It's flaky (https://github.com/apache/spark/actions/runs/8873809388/job/24360221027) ### Does this PR introduce _any_ user-facing change? No, test-only. ### How was this patch tested? Manually. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46275 from HyukjinKwon/SPARK-48002-followup. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/tests/streaming/test_streaming_listener.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/pyspark/sql/tests/streaming/test_streaming_listener.py b/python/pyspark/sql/tests/streaming/test_streaming_listener.py index 9e4325e3c6ab..15f5575d3647 100644 --- a/python/pyspark/sql/tests/streaming/test_streaming_listener.py +++ b/python/pyspark/sql/tests/streaming/test_streaming_listener.py @@ -232,6 +232,8 @@ class StreamingListenerTestsMixin: while q.lastProgress is None or q.lastProgress["batchId"] == 0: q.awaitTermination(0.5) +time.sleep(5) + self.assertTrue(error_listener.num_rows > 0) self.assertTrue(error_listener.num_error_rows > 0) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated (506b2d5eb8d9 -> 8c446f35dc03)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 506b2d5eb8d9 [SPARK-48014][SQL] Change the makeFromJava error in EvaluatePython to a user-facing error add 8c446f35dc03 [SPARK-47292][SS] safeMapToJValue should consider null typed values No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/streaming/progress.scala | 2 +- .../org/apache/spark/sql/streaming/progress.scala | 2 +- .../StreamingQueryStatusAndProgressSuite.scala | 23 ++ 3 files changed, 25 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated (023f07d845c3 -> 506b2d5eb8d9)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 023f07d845c3 [SPARK-47933][CONNECT][PYTHON][FOLLOW-UP] Remove `pyspark.sql.classic` reference in `pyspark.ml.stat` add 506b2d5eb8d9 [SPARK-48014][SQL] Change the makeFromJava error in EvaluatePython to a user-facing error No new revisions were added by this update. Summary of changes: common/utils/src/main/resources/error/error-conditions.json | 6 ++ .../apache/spark/sql/execution/python/EvaluatePython.scala| 11 ++- 2 files changed, 12 insertions(+), 5 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-47933][CONNECT][PYTHON][FOLLOW-UP] Remove `pyspark.sql.classic` reference in `pyspark.ml.stat`
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 023f07d845c3 [SPARK-47933][CONNECT][PYTHON][FOLLOW-UP] Remove `pyspark.sql.classic` reference in `pyspark.ml.stat` 023f07d845c3 is described below commit 023f07d845c304cfb7d231e85e0700807ee4a113 Author: Hyukjin Kwon AuthorDate: Mon Apr 29 08:42:22 2024 +0900 [SPARK-47933][CONNECT][PYTHON][FOLLOW-UP] Remove `pyspark.sql.classic` reference in `pyspark.ml.stat` ### What changes were proposed in this pull request? This PR is a followup of https://github.com/apache/spark/pull/46155 that removes the reference of `_to_seq` that `pyspark-connect` package does not have. ### Why are the changes needed? To recover the CI https://github.com/apache/spark/actions/runs/8861971303 ### Does this PR introduce _any_ user-facing change? No, the main change has not been released out yet. ### How was this patch tested? Manually tested. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46262 from HyukjinKwon/SPARK-47933-followup4. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- python/pyspark/ml/stat.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/ml/stat.py b/python/pyspark/ml/stat.py index d6020607aff2..4dcc96190952 100644 --- a/python/pyspark/ml/stat.py +++ b/python/pyspark/ml/stat.py @@ -23,7 +23,6 @@ from pyspark.ml.common import _java2py, _py2java from pyspark.ml.linalg import Matrix, Vector from pyspark.ml.wrapper import JavaWrapper, _jvm from pyspark.sql.column import Column -from pyspark.sql.classic.column import _to_seq from pyspark.sql.dataframe import DataFrame from pyspark.sql.functions import lit @@ -432,6 +431,7 @@ class Summarizer: :py:class:`pyspark.ml.stat.SummaryBuilder` """ from pyspark.core.context import SparkContext +from pyspark.sql.classic.column import _to_seq sc = SparkContext._active_spark_context assert sc is not None - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-48024][PYTHON][CONNECT][TESTS] Enable `UDFParityTests.test_udf_timestamp_ntz`
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 657d9d0df5d6 [SPARK-48024][PYTHON][CONNECT][TESTS] Enable `UDFParityTests.test_udf_timestamp_ntz` 657d9d0df5d6 is described below commit 657d9d0df5d60c26ab1d5efb8db32abad8ff08ea Author: Ruifeng Zheng AuthorDate: Sun Apr 28 17:52:12 2024 +0900 [SPARK-48024][PYTHON][CONNECT][TESTS] Enable `UDFParityTests.test_udf_timestamp_ntz` ### What changes were proposed in this pull request? Enable `UDFParityTests.test_udf_timestamp_ntz` ### Why are the changes needed? for test coverage ### Does this PR introduce _any_ user-facing change? no, test only ### How was this patch tested? ci and manually test: ``` (spark_dev_312) ➜ spark git:(master) ✗ python/run-tests -k --python-executables python3 --testnames 'pyspark.sql.tests.connect.test_parity_udf UDFParityTests.test_udf_timestamp_ntz' Running PySpark tests. Output is in /Users/ruifeng.zheng/Dev/spark/python/unit-tests.log Will test against the following Python executables: ['python3'] Will test the following Python tests: ['pyspark.sql.tests.connect.test_parity_udf UDFParityTests.test_udf_timestamp_ntz'] python3 python_implementation is CPython python3 version is: Python 3.12.2 Starting test(python3): pyspark.sql.tests.connect.test_parity_udf UDFParityTests.test_udf_timestamp_ntz (temp output: /Users/ruifeng.zheng/Dev/spark/python/target/90afedde-8472-496c-8741-a3fd5792f6e2/python3__pyspark.sql.tests.connect.test_parity_udf_UDFParityTests.test_udf_timestamp_ntz__7yrowv9l.log) Finished test(python3): pyspark.sql.tests.connect.test_parity_udf UDFParityTests.test_udf_timestamp_ntz (10s) Tests passed in 10 seconds ``` ### Was this patch authored or co-authored using generative AI tooling? no Closes #46257 from zhengruifeng/enable_test_udf_timestamp_ntz. Authored-by: Ruifeng Zheng Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/tests/connect/test_parity_udf.py | 4 1 file changed, 4 deletions(-) diff --git a/python/pyspark/sql/tests/connect/test_parity_udf.py b/python/pyspark/sql/tests/connect/test_parity_udf.py index 17d7ae0eb9fc..5507f8e9f289 100644 --- a/python/pyspark/sql/tests/connect/test_parity_udf.py +++ b/python/pyspark/sql/tests/connect/test_parity_udf.py @@ -44,10 +44,6 @@ class UDFParityTests(BaseUDFTestsMixin, ReusedConnectTestCase): def test_same_accumulator_in_udfs(self): super().test_same_accumulator_in_udfs() -@unittest.skip("Spark Connect does not support spark.conf but the test depends on it.") -def test_udf_timestamp_ntz(self): -super().test_udf_timestamp_ntz() - @unittest.skip("Spark Connect does not support broadcast but the test depends on it.") def test_broadcast_in_udf(self): super().test_broadcast_in_udf() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-48002][PYTHON][SS] Add test for observed metrics in PySpark StreamingQueryListener
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a80579bbcf74 [SPARK-48002][PYTHON][SS] Add test for observed metrics in PySpark StreamingQueryListener a80579bbcf74 is described below commit a80579bbcf74c7bcfe60cb6d74a68d4c1574c14f Author: Wei Liu AuthorDate: Sun Apr 28 17:46:42 2024 +0900 [SPARK-48002][PYTHON][SS] Add test for observed metrics in PySpark StreamingQueryListener ### What changes were proposed in this pull request? Following this doc test revisit PR https://github.com/apache/spark/pull/46189, for extra safety, add a unit test that verify observed metrics works for StreamingQueryListeners for both classic spark and spark connect. ### Why are the changes needed? Additional test coverage ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Test only addition ### Was this patch authored or co-authored using generative AI tooling? No Closes #46237 from WweiL/test-observed-metrics. Authored-by: Wei Liu Signed-off-by: Hyukjin Kwon --- .../sql/tests/streaming/test_streaming_listener.py | 46 ++ 1 file changed, 46 insertions(+) diff --git a/python/pyspark/sql/tests/streaming/test_streaming_listener.py b/python/pyspark/sql/tests/streaming/test_streaming_listener.py index 1920f8255744..9e4325e3c6ab 100644 --- a/python/pyspark/sql/tests/streaming/test_streaming_listener.py +++ b/python/pyspark/sql/tests/streaming/test_streaming_listener.py @@ -30,6 +30,7 @@ from pyspark.sql.streaming.listener import ( StateOperatorProgress, StreamingQueryProgress, ) +from pyspark.sql.functions import count, col, lit from pyspark.testing.sqlutils import ReusedSQLTestCase @@ -193,6 +194,51 @@ class StreamingListenerTestsMixin: self.assertTrue(isinstance(progress.numOutputRows, int)) self.assertTrue(isinstance(progress.metrics, dict)) +# This is a generic test work for both classic Spark and Spark Connect +def test_listener_observed_metrics(self): +class MyErrorListener(StreamingQueryListener): +def __init__(self): +self.num_rows = -1 +self.num_error_rows = -1 + +def onQueryStarted(self, event): +pass + +def onQueryProgress(self, event): +row = event.progress.observedMetrics.get("my_event") +# Save observed metrics for later verification +self.num_rows = row["rc"] +self.num_error_rows = row["erc"] + +def onQueryIdle(self, event): +pass + +def onQueryTerminated(self, event): +pass + +try: +error_listener = MyErrorListener() +self.spark.streams.addListener(error_listener) + +sdf = self.spark.readStream.format("rate").load().withColumn("error", col("value")) + +# Observe row count (rc) and error row count (erc) in the streaming Dataset +observed_ds = sdf.observe( +"my_event", count(lit(1)).alias("rc"), count(col("error")).alias("erc") +) + +q = observed_ds.writeStream.format("console").start() + +while q.lastProgress is None or q.lastProgress["batchId"] == 0: +q.awaitTermination(0.5) + +self.assertTrue(error_listener.num_rows > 0) +self.assertTrue(error_listener.num_error_rows > 0) + +finally: +q.stop() +self.spark.streams.removeListener(error_listener) + class StreamingListenerTests(StreamingListenerTestsMixin, ReusedSQLTestCase): def test_number_of_public_methods(self): - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated (e04ac56e645f -> 95d6c615c081)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from e04ac56e645f [SPARK-45225][SQL][FOLLOW-UP] XML: Fix nested XSD file path resolution add 95d6c615c081 [SPARK-47355][SQL] Use wildcard imports in CollationTypeCasts No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/catalyst/analysis/CollationTypeCasts.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-45225][SQL][FOLLOW-UP] XML: Fix nested XSD file path resolution
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new e04ac56e645f [SPARK-45225][SQL][FOLLOW-UP] XML: Fix nested XSD file path resolution e04ac56e645f is described below commit e04ac56e645f1c0ed5f5134686ddebdbae524d12 Author: Sandip Agarwala <131817656+sandip...@users.noreply.github.com> AuthorDate: Fri Apr 26 17:21:32 2024 +0900 [SPARK-45225][SQL][FOLLOW-UP] XML: Fix nested XSD file path resolution ### What changes were proposed in this pull request? This PR adds support to correctly resolve the path of nested XSD provided with `rowValidationXSDPath` option and `XSDToSchema` API. ### Why are the changes needed? Nested XSD were not resolved correctly. ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? Added a new test ### Was this patch authored or co-authored using generative AI tooling? No Closes #46235 from sandip-db/xml_nested_xsd. Authored-by: Sandip Agarwala <131817656+sandip...@users.noreply.github.com> Signed-off-by: Hyukjin Kwon --- .../apache/spark/sql/catalyst/xml/ValidatorUtil.scala | 2 +- .../sql/execution/datasources/xml/XSDToSchema.scala| 2 +- .../spark/sql/execution/datasources/xml/XmlSuite.scala | 18 ++ 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/ValidatorUtil.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/ValidatorUtil.scala index 3d93c4e8742a..a49de687a27d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/ValidatorUtil.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/ValidatorUtil.scala @@ -42,7 +42,7 @@ object ValidatorUtil extends Logging { val in = openSchemaFile(new Path(key)) try { val schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI) - schemaFactory.newSchema(new StreamSource(in)) + schemaFactory.newSchema(new StreamSource(in, key)) } finally { in.close() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XSDToSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XSDToSchema.scala index 87082299615c..c03c0ba11de5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XSDToSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XSDToSchema.scala @@ -47,7 +47,7 @@ object XSDToSchema extends Logging{ def read(xsdPath: Path): StructType = { val in = ValidatorUtil.openSchemaFile(xsdPath) val xmlSchemaCollection = new XmlSchemaCollection() -xmlSchemaCollection.setBaseUri(xsdPath.getParent.toString) +xmlSchemaCollection.setBaseUri(xsdPath.toString) val xmlSchema = xmlSchemaCollection.read(new InputStreamReader(in)) getStructType(xmlSchema) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala index 7df7c0d49d19..51e8cfc7f103 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala @@ -1206,14 +1206,16 @@ class XmlSuite } test("test XSD validation") { -val basketDF = spark.read - .option("rowTag", "basket") - .option("inferSchema", true) - .option("rowValidationXSDPath", getTestResourcePath(resDir + "basket.xsd") -.replace("file:/", "/")) - .xml(getTestResourcePath(resDir + "basket.xml")) -// Mostly checking it doesn't fail -assert(basketDF.selectExpr("entry[0].key").head().getLong(0) === 9027) +Seq("basket.xsd", "include-example/first.xsd").foreach { xsdFile => + val basketDF = spark.read +.option("rowTag", "basket") +.option("inferSchema", true) +.option("rowValidationXSDPath", getTestResourcePath(resDir + xsdFile) + .replace("file:/", "/")) +.xml(getTestResourcePath(resDir + "basket.xml")) + // Mostly checking it doesn't fail + assert(basketDF.selectExpr("entry[0].key").head().getLong(0) === 9027) +} } test("test XSD validation with validation error") { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated (2e5825fb32c0 -> 3451e66fe71d)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 2e5825fb32c0 [SPARK-47858][PYTHON][FOLLOWUP] Excluding Python magic methods from error context target add 3451e66fe71d [SPARK-47993][PYTHON] Drop Python 3.8 No new revisions were added by this update. Summary of changes: .github/workflows/build_python.yml| 2 +- dev/infra/Dockerfile | 8 python/docs/source/development/contributing.rst | 4 ++-- python/docs/source/getting_started/install.rst| 4 ++-- python/docs/source/user_guide/pandas_on_spark/typehints.rst | 2 +- python/packaging/classic/setup.py | 3 +-- python/packaging/connect/setup.py | 3 +-- python/pyspark/sql/session.py | 3 +++ python/pyspark/sql/tests/connect/test_parity_arrow.py | 2 -- python/pyspark/sql/tests/pandas/test_pandas_udf_typehints.py | 2 -- .../pandas/test_pandas_udf_typehints_with_future_annotations.py | 5 - python/pyspark/sql/tests/test_arrow.py| 2 -- python/run-tests | 4 ++-- 13 files changed, 17 insertions(+), 27 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-47858][PYTHON][FOLLOWUP] Excluding Python magic methods from error context target
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 2e5825fb32c0 [SPARK-47858][PYTHON][FOLLOWUP] Excluding Python magic methods from error context target 2e5825fb32c0 is described below commit 2e5825fb32c03faa854921c54feb6ad0e7b5d432 Author: Haejoon Lee AuthorDate: Fri Apr 26 16:53:22 2024 +0900 [SPARK-47858][PYTHON][FOLLOWUP] Excluding Python magic methods from error context target ### What changes were proposed in this pull request? This PR followups for https://github.com/apache/spark/pull/46063 to exclude Python magic methods from error context target ### Why are the changes needed? We only need to logging the Apache Spark Column API functions for DataFrameQueryContext, but not Python native functions. ### Does this PR introduce _any_ user-facing change? No API changes, but the error message only contain Column APIs that are supported by Apache Spark ### How was this patch tested? The existing CI should pass. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46215 from itholic/47858-followup. Authored-by: Haejoon Lee Signed-off-by: Hyukjin Kwon --- python/pyspark/errors/utils.py | 10 +- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/python/pyspark/errors/utils.py b/python/pyspark/errors/utils.py index 08744dc4c264..180bdd681306 100644 --- a/python/pyspark/errors/utils.py +++ b/python/pyspark/errors/utils.py @@ -197,6 +197,14 @@ def with_origin_to_class(cls: Type[T]) -> Type[T]: """ if os.environ.get("PYSPARK_PIN_THREAD", "true").lower() == "true": for name, method in cls.__dict__.items(): -if callable(method) and name != "__init__": +# Excluding Python magic methods that do not utilize JVM functions. +if callable(method) and name not in ( +"__init__", +"__new__", +"__iter__", +"__nonzero__", +"__repr__", +"__bool__", +): setattr(cls, name, _with_origin(method)) return cls - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated (b4624bf4be28 -> dab4a044b647)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from b4624bf4be28 [SPARK-47414][SQL] Lowercase collation support for regexp expressions add dab4a044b647 [SPARK-47973][CORE] Log call site in SparkContext.stop() and later in SparkContext.assertNotStopped() No new revisions were added by this update. Summary of changes: core/src/main/scala/org/apache/spark/SparkContext.scala | 9 - core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala | 4 +++- 2 files changed, 11 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: Revert "[SPARK-45302][PYTHON] Remove PID communication between Pythonworkers when no demon is used"
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new c6aaa18e6cfd Revert "[SPARK-45302][PYTHON] Remove PID communication between Pythonworkers when no demon is used" c6aaa18e6cfd is described below commit c6aaa18e6cfd49b434f782171e42778012672b80 Author: Hyukjin Kwon AuthorDate: Thu Apr 25 11:57:23 2024 +0900 Revert "[SPARK-45302][PYTHON] Remove PID communication between Pythonworkers when no demon is used" ### What changes were proposed in this pull request? This PR reverts https://github.com/apache/spark/pull/43087. ### Why are the changes needed? To clean up those workers. I will make a refactoring PR soon. I will bring them back again with a refactoring PR. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? CI ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46195 from HyukjinKwon/SPARK-45302-revert. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- core/src/main/scala/org/apache/spark/SparkEnv.scala | 6 +++--- .../scala/org/apache/spark/api/python/PythonRunner.scala | 10 +- .../org/apache/spark/api/python/PythonWorkerFactory.scala | 15 --- python/pyspark/daemon.py | 4 ++-- .../sql/connect/streaming/worker/foreach_batch_worker.py | 2 ++ .../sql/connect/streaming/worker/listener_worker.py | 2 ++ .../sql/streaming/python_streaming_source_runner.py | 2 ++ python/pyspark/sql/worker/analyze_udtf.py | 3 +++ python/pyspark/sql/worker/commit_data_source_write.py | 2 ++ python/pyspark/sql/worker/create_data_source.py | 2 ++ python/pyspark/sql/worker/lookup_data_sources.py | 2 ++ python/pyspark/sql/worker/plan_data_source_read.py| 2 ++ python/pyspark/sql/worker/python_streaming_sink_runner.py | 2 ++ python/pyspark/sql/worker/write_into_data_source.py | 2 ++ python/pyspark/worker.py | 3 +++ .../spark/sql/execution/python/PythonArrowOutput.scala| 2 +- .../spark/sql/execution/python/PythonUDFRunner.scala | 2 +- 17 files changed, 44 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 50d0358004d4..e1c84d181a2f 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -142,7 +142,7 @@ class SparkEnv ( workerModule: String, daemonModule: String, envVars: Map[String, String], - useDaemon: Boolean): (PythonWorker, Option[Long]) = { + useDaemon: Boolean): (PythonWorker, Option[Int]) = { synchronized { val key = PythonWorkersKey(pythonExec, workerModule, daemonModule, envVars) val workerFactory = pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory( @@ -161,7 +161,7 @@ class SparkEnv ( pythonExec: String, workerModule: String, envVars: Map[String, String], - useDaemon: Boolean): (PythonWorker, Option[Long]) = { + useDaemon: Boolean): (PythonWorker, Option[Int]) = { createPythonWorker( pythonExec, workerModule, PythonWorkerFactory.defaultDaemonModule, envVars, useDaemon) } @@ -170,7 +170,7 @@ class SparkEnv ( pythonExec: String, workerModule: String, daemonModule: String, - envVars: Map[String, String]): (PythonWorker, Option[Long]) = { + envVars: Map[String, String]): (PythonWorker, Option[Int]) = { val useDaemon = conf.get(Python.PYTHON_USE_DAEMON) createPythonWorker( pythonExec, workerModule, daemonModule, envVars, useDaemon) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index 17cb0c5a55dd..7ff782db210d 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -88,7 +88,7 @@ private object BasePythonRunner { private lazy val faultHandlerLogDir = Utils.createTempDir(namePrefix = "faulthandler") - private def faultHandlerLogPath(pid: Long): Path = { + private def faultHandlerLogPath(pid: Int): Path = { new File(faultHandlerLogDir, pid.toString).toPath } } @@ -204,7 +204,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( envVars.put("SPARK_JOB_ARTIFACT_UUID", jobArtifactUUID.getOrElse("default")) -val (worker: PythonWorker, pid: Option[Long]) = env.createPythonWorker( +val (worker: PythonWorker, pid: Option
(spark) branch master updated (d23389252a7d -> ea37c860a1a8)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from d23389252a7d [SPARK-47967][SQL] Make `JdbcUtils.makeGetter` handle reading time type as NTZ correctly add ea37c860a1a8 [SPARK-47962][PYTHON][DOCS] PySpark Dataframe doc test improvement No new revisions were added by this update. Summary of changes: python/pyspark/sql/dataframe.py | 20 1 file changed, 16 insertions(+), 4 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-47965][CORE] Avoid orNull in TypedConfigBuilder and OptionalConfigEntry
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 0042b676c232 [SPARK-47965][CORE] Avoid orNull in TypedConfigBuilder and OptionalConfigEntry 0042b676c232 is described below commit 0042b676c23220a73f2672aa42d5306d3878bd05 Author: Hyukjin Kwon AuthorDate: Wed Apr 24 19:37:04 2024 +0900 [SPARK-47965][CORE] Avoid orNull in TypedConfigBuilder and OptionalConfigEntry ### What changes were proposed in this pull request? This PR proposes to avoid orNull in `TypedConfigBuilder`. Keys and values cannot be set `null` anyway, see `RuntimeConfig` and `SparkConf`. Also, uses `ConfigEntry.UNDEFINED` in `OptionalConfigEntry` instead of `null`. ### Why are the changes needed? For code cleanup. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? CI in this PR should verify them. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46197 from HyukjinKwon/SPARK-47965. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- .../org/apache/spark/internal/config/ConfigBuilder.scala | 3 ++- .../org/apache/spark/internal/config/ConfigEntry.scala | 2 +- .../apache/spark/internal/config/ConfigEntrySuite.scala| 14 +- .../main/scala/org/apache/spark/deploy/yarn/Client.scala | 4 ++-- .../main/scala/org/apache/spark/deploy/yarn/config.scala | 4 ++-- .../test/scala/org/apache/spark/sql/SetCommandSuite.scala | 12 .../org/apache/spark/sql/internal/SQLConfEntrySuite.scala | 2 +- 7 files changed, 9 insertions(+), 32 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala index 1f19e9444d38..f50cc0f88842 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala @@ -94,7 +94,7 @@ private[spark] class TypedConfigBuilder[T]( import ConfigHelpers._ def this(parent: ConfigBuilder, converter: String => T) = { -this(parent, converter, Option(_).map(_.toString).orNull) +this(parent, converter, { v: T => v.toString }) } /** Apply a transformation to the user-provided values of the config entry. */ @@ -157,6 +157,7 @@ private[spark] class TypedConfigBuilder[T]( /** Creates a [[ConfigEntry]] that has a default value. */ def createWithDefault(default: T): ConfigEntry[T] = { +assert(default != null, "Use createOptional.") // Treat "String" as a special case, so that both createWithDefault and createWithDefaultString // behave the same w.r.t. variable expansion of default values. default match { diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala index a295ef06a637..c07f2528ee70 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala @@ -227,7 +227,7 @@ private[spark] class OptionalConfigEntry[T]( prependSeparator, alternatives, s => Some(rawValueConverter(s)), -v => v.map(rawStringConverter).orNull, +v => v.map(rawStringConverter).getOrElse(ConfigEntry.UNDEFINED), doc, isPublic, version diff --git a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala index 38063c47ec96..ae9973508405 100644 --- a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala +++ b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala @@ -196,12 +196,6 @@ class ConfigEntrySuite extends SparkFunSuite { assert(conversionError.getMessage === s"${conversionTest.key} should be double, but was abc") } - test("default value handling is null-safe") { -val conf = new SparkConf() -val stringConf = ConfigBuilder(testKey("string")).stringConf.createWithDefault(null) -assert(conf.get(stringConf) === null) - } - test("variable expansion of spark config entries") { val env = Map("ENV1" -> "env1") val conf = new SparkConfWithEnv(env) @@ -220,7 +214,7 @@ class ConfigEntrySuite extends SparkFunSuite { val refConf = ConfigBuilder(testKey("configReferenceTest")) .stringConf - .createWithDefault(null) + .createWithDefault("") def ref(entry: ConfigEntry[_]): String = "${" + entry.
(spark) branch master updated (fd695be19d3f -> 6f01982094f6)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from fd695be19d3f [SPARK-47903][PYTHON][FOLLOW-UP] Removed changes relating to try_parse_json add 6f01982094f6 [SPARK-47964][PYTHON][CONNECT] Hide SQLContext and HiveContext in pyspark-connect No new revisions were added by this update. Summary of changes: python/pyspark/__init__.py | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-47903][PYTHON][FOLLOW-UP] Removed changes relating to try_parse_json
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new fd695be19d3f [SPARK-47903][PYTHON][FOLLOW-UP] Removed changes relating to try_parse_json fd695be19d3f is described below commit fd695be19d3fcdca5503e8f1f4222732ef3ac6ce Author: Harsh Motwani AuthorDate: Wed Apr 24 15:41:51 2024 +0900 [SPARK-47903][PYTHON][FOLLOW-UP] Removed changes relating to try_parse_json ### What changes were proposed in this pull request? Removed changes relating to `try_parse_json` that were accidentally pushed during the late stages of this PR. ### Why are the changes needed? There is already another PR in progress adding support for `try_parse_json` and the implementation that was accidentally pushed is outdated. ### Does this PR introduce _any_ user-facing change? Yes, it removes the `try_parse_json` that was added just now. This feature will be added again soon. ### How was this patch tested? NA ### Was this patch authored or co-authored using generative AI tooling? No Closes #46170 from harshmotw-db/python_scalar_variant. Authored-by: Harsh Motwani Signed-off-by: Hyukjin Kwon --- .../sql/catalyst/analysis/FunctionRegistry.scala | 1 - .../catalyst/expressions/ExpectsInputTypes.scala | 1 + .../expressions/variant/variantExpressions.scala | 30 .../sql-functions/sql-expression-schema.md | 1 - .../apache/spark/sql/VariantEndToEndSuite.scala| 40 -- 5 files changed, 1 insertion(+), 72 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 5f43cc106e67..e4e663d15167 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -822,7 +822,6 @@ object FunctionRegistry { // Variant expression[ParseJson]("parse_json"), -expression[TryParseJson]("try_parse_json"), expression[IsVariantNull]("is_variant_null"), expressionBuilder("variant_get", VariantGetExpressionBuilder), expressionBuilder("try_variant_get", TryVariantGetExpressionBuilder), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpectsInputTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpectsInputTypes.scala index 66c2f736f235..1a4a0271c54b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpectsInputTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpectsInputTypes.scala @@ -48,6 +48,7 @@ trait ExpectsInputTypes extends Expression { } object ExpectsInputTypes extends QueryErrorsBase { + def checkInputDataTypes( inputs: Seq[Expression], inputTypes: Seq[AbstractDataType]): TypeCheckResult = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala index 07f08aa7e70e..6c4a8f90e3b5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala @@ -75,36 +75,6 @@ case class ParseJson(child: Expression) copy(child = newChild) } -// scalastyle:off line.size.limit -@ExpressionDescription( - usage = "_FUNC_(jsonStr) - Parse a JSON string as an Variant value. Returns null when the string is not valid JSON value.", - examples = """ -Examples: - > SELECT _FUNC_('{"a":1,"b":0.8}'); - {"a":1,"b":0.8} - """, - since = "4.0.0", - group = "variant_funcs" -) -// scalastyle:on line.size.limit -case class TryParseJson(expr: Expression, replacement: Expression) - extends RuntimeReplaceable with InheritAnalysisRules { - def this(child: Expression) = this(child, TryEval(ParseJson(child))) - - override def parameters: Seq[Expression] = Seq(expr) - - override def dataType: DataType = VariantType - - override def prettyName: String = "try_parse_json" - - override protected def withNewChildInternal(newChild: Expression): Expression = -copy(replacement = newChild) - - override def checkInputDataTypes(): TypeCheckResult = { -ExpectsInputTypes.checkInputDataTypes(Seq(expr), Seq(StringType)) -
(spark) branch master updated: [SPARK-47933][PYTHON][CONNECT][FOLLOW-UP] Add a check of `__name__` at `_with_origin`
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new aa4a84bd8474 [SPARK-47933][PYTHON][CONNECT][FOLLOW-UP] Add a check of `__name__` at `_with_origin` aa4a84bd8474 is described below commit aa4a84bd8474e697e3e3d7fa9135b09f1de27f2f Author: Hyukjin Kwon AuthorDate: Wed Apr 24 11:49:12 2024 +0900 [SPARK-47933][PYTHON][CONNECT][FOLLOW-UP] Add a check of `__name__` at `_with_origin` ### What changes were proposed in this pull request? This PR is a followup of https://github.com/apache/spark/pull/46155 that adds check of `__name__` at `_with_origin`. ### Why are the changes needed? It is possible for a callable instance without __name__ attribute or/and __module__ attribute to be wrapped. For example, functools.partial. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? `./bin/pyspark` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46198 from HyukjinKwon/SPARK-47933-followup2. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- python/pyspark/errors/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/errors/utils.py b/python/pyspark/errors/utils.py index 16fba7e272bc..08744dc4c264 100644 --- a/python/pyspark/errors/utils.py +++ b/python/pyspark/errors/utils.py @@ -174,7 +174,7 @@ def _with_origin(func: Callable[..., Any]) -> Callable[..., Any]: from pyspark.sql import SparkSession spark = SparkSession.getActiveSession() -if spark is not None: +if spark is not None and hasattr(func, "__name__"): assert spark._jvm is not None pyspark_origin = spark._jvm.org.apache.spark.sql.catalyst.trees.PySparkCurrentOrigin - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: Revert "Revert "[SPARK-45302][PYTHON] Remove PID communication between Python workers when no demon is used""
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new b3c11ef8e090 Revert "Revert "[SPARK-45302][PYTHON] Remove PID communication between Python workers when no demon is used"" b3c11ef8e090 is described below commit b3c11ef8e09082e0091ed9f459b432208a3ccd43 Author: Hyukjin Kwon AuthorDate: Wed Apr 24 09:54:41 2024 +0900 Revert "Revert "[SPARK-45302][PYTHON] Remove PID communication between Python workers when no demon is used"" This reverts commit e8f529bb89a6edfcedbca7a08993e6e8d9612009. --- core/src/main/scala/org/apache/spark/SparkEnv.scala | 4 ++-- .../main/scala/org/apache/spark/api/python/PythonRunner.scala | 10 +- .../org/apache/spark/api/python/PythonWorkerFactory.scala | 11 +-- python/pyspark/daemon.py | 4 ++-- .../sql/connect/streaming/worker/foreach_batch_worker.py | 2 -- .../pyspark/sql/connect/streaming/worker/listener_worker.py | 2 -- python/pyspark/sql/worker/analyze_udtf.py | 3 --- python/pyspark/worker.py | 3 --- .../apache/spark/sql/execution/python/PythonArrowOutput.scala | 2 +- .../apache/spark/sql/execution/python/PythonUDFRunner.scala | 2 +- 10 files changed, 16 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 34c7d955fedd..50d0358004d4 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -142,7 +142,7 @@ class SparkEnv ( workerModule: String, daemonModule: String, envVars: Map[String, String], - useDaemon: Boolean): (PythonWorker, Option[Int]) = { + useDaemon: Boolean): (PythonWorker, Option[Long]) = { synchronized { val key = PythonWorkersKey(pythonExec, workerModule, daemonModule, envVars) val workerFactory = pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory( @@ -161,7 +161,7 @@ class SparkEnv ( pythonExec: String, workerModule: String, envVars: Map[String, String], - useDaemon: Boolean): (PythonWorker, Option[Int]) = { + useDaemon: Boolean): (PythonWorker, Option[Long]) = { createPythonWorker( pythonExec, workerModule, PythonWorkerFactory.defaultDaemonModule, envVars, useDaemon) } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index 7ff782db210d..17cb0c5a55dd 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -88,7 +88,7 @@ private object BasePythonRunner { private lazy val faultHandlerLogDir = Utils.createTempDir(namePrefix = "faulthandler") - private def faultHandlerLogPath(pid: Int): Path = { + private def faultHandlerLogPath(pid: Long): Path = { new File(faultHandlerLogDir, pid.toString).toPath } } @@ -204,7 +204,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( envVars.put("SPARK_JOB_ARTIFACT_UUID", jobArtifactUUID.getOrElse("default")) -val (worker: PythonWorker, pid: Option[Int]) = env.createPythonWorker( +val (worker: PythonWorker, pid: Option[Long]) = env.createPythonWorker( pythonExec, workerModule, daemonModule, envVars.asScala.toMap) // Whether is the worker released into idle pool or closed. When any codes try to release or // close a worker, they should use `releasedOrClosed.compareAndSet` to flip the state to make @@ -257,7 +257,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( startTime: Long, env: SparkEnv, worker: PythonWorker, - pid: Option[Int], + pid: Option[Long], releasedOrClosed: AtomicBoolean, context: TaskContext): Iterator[OUT] @@ -465,7 +465,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( startTime: Long, env: SparkEnv, worker: PythonWorker, - pid: Option[Int], + pid: Option[Long], releasedOrClosed: AtomicBoolean, context: TaskContext) extends Iterator[OUT] { @@ -842,7 +842,7 @@ private[spark] class PythonRunner( startTime: Long, env: SparkEnv, worker: PythonWorker, - pid: Option[Int], + pid: Option[Long], releasedOrClosed: AtomicBoolean, context: TaskContext): Iterator[Array[Byte]] = { new ReaderIterator( diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index 875cf6bc2770..eb7
(spark) branch master updated (390fb7429029 -> e8f529bb89a6)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 390fb7429029 [SPARK-47941][SS][CONNECT] Propagate ForeachBatch worker initialization errors to users for PySpark add e8f529bb89a6 Revert "[SPARK-45302][PYTHON] Remove PID communication between Python workers when no demon is used" No new revisions were added by this update. Summary of changes: core/src/main/scala/org/apache/spark/SparkEnv.scala | 4 ++-- .../main/scala/org/apache/spark/api/python/PythonRunner.scala | 10 +- .../org/apache/spark/api/python/PythonWorkerFactory.scala | 11 ++- python/pyspark/daemon.py | 4 ++-- .../sql/connect/streaming/worker/foreach_batch_worker.py | 2 ++ .../pyspark/sql/connect/streaming/worker/listener_worker.py | 2 ++ python/pyspark/sql/worker/analyze_udtf.py | 3 +++ python/pyspark/worker.py | 3 +++ .../apache/spark/sql/execution/python/PythonArrowOutput.scala | 2 +- .../apache/spark/sql/execution/python/PythonUDFRunner.scala | 2 +- 10 files changed, 27 insertions(+), 16 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated (c88fabfee41d -> 390fb7429029)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from c88fabfee41d [SPARK-47604][CORE] Resource managers: Migrate logInfo with variables to structured logging framework add 390fb7429029 [SPARK-47941][SS][CONNECT] Propagate ForeachBatch worker initialization errors to users for PySpark No new revisions were added by this update. Summary of changes: .../src/main/resources/error/error-conditions.json | 6 .../scala/org/apache/spark/SparkException.scala| 29 + .../spark/api/python/StreamingPythonRunner.scala | 18 ++- .../streaming/worker/foreach_batch_worker.py | 26 .../connect/streaming/test_parity_foreach_batch.py | 36 ++ 5 files changed, 101 insertions(+), 14 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated (9e7ee7601d38 -> b335dd366fb1)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 9e7ee7601d38 [SPARK-47903][PYTHON] Add support for remaining scalar types in the PySpark Variant library add b335dd366fb1 [SPARK-47909][CONNECT][PYTHON][TESTS][FOLLOW-UP] Move `pyspark.classic` references No new revisions were added by this update. Summary of changes: python/pyspark/ml/tests/connect/test_connect_function.py | 3 ++- python/pyspark/sql/tests/connect/test_connect_function.py | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-47903][PYTHON] Add support for remaining scalar types in the PySpark Variant library
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 9e7ee7601d38 [SPARK-47903][PYTHON] Add support for remaining scalar types in the PySpark Variant library 9e7ee7601d38 is described below commit 9e7ee7601d38bb76715df16c3bb8655c5667aac3 Author: Harsh Motwani AuthorDate: Tue Apr 23 08:36:34 2024 +0900 [SPARK-47903][PYTHON] Add support for remaining scalar types in the PySpark Variant library ### What changes were proposed in this pull request? Added support for the `date`, `timestamp`, `timestamp_ntz`, `float` and `binary` scalar types to the variant library in Python. Data of these types can also be extracted now from a variant. ### Why are the changes needed? Support for these types was added to the Scala side as part of a recent PR. This PR also adds support for these data types on the PySpark side. ### Does this PR introduce _any_ user-facing change? Yes, users can now use PySpark to extract data of more types from Variants. ### How was this patch tested? Unit tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #46122 from harshmotw-db/python_scalar_variant. Authored-by: Harsh Motwani Signed-off-by: Hyukjin Kwon --- .../source/reference/pyspark.sql/variant_val.rst | 1 + python/pyspark/sql/tests/test_types.py | 115 +++- python/pyspark/sql/types.py| 13 +++ python/pyspark/sql/variant_utils.py| 117 ++--- .../sql/catalyst/analysis/FunctionRegistry.scala | 1 + .../catalyst/expressions/ExpectsInputTypes.scala | 1 - .../expressions/variant/variantExpressions.scala | 30 ++ .../sql-functions/sql-expression-schema.md | 1 + .../apache/spark/sql/VariantEndToEndSuite.scala| 40 +++ 9 files changed, 301 insertions(+), 18 deletions(-) diff --git a/python/docs/source/reference/pyspark.sql/variant_val.rst b/python/docs/source/reference/pyspark.sql/variant_val.rst index a7f592c18e3a..8630ae8aace1 100644 --- a/python/docs/source/reference/pyspark.sql/variant_val.rst +++ b/python/docs/source/reference/pyspark.sql/variant_val.rst @@ -25,3 +25,4 @@ VariantVal :toctree: api/ VariantVal.toPython +VariantVal.toJson diff --git a/python/pyspark/sql/tests/test_types.py b/python/pyspark/sql/tests/test_types.py index af13adbc21bb..7d45adb832c8 100644 --- a/python/pyspark/sql/tests/test_types.py +++ b/python/pyspark/sql/tests/test_types.py @@ -1427,8 +1427,10 @@ class TypesTestsMixin: ("-int4", "-69633", -69633), ("int8", "4295033089", 4295033089), ("-int8", "-4294967297", -4294967297), -("float4", "1.23456789e-30", 1.23456789e-30), -("-float4", "-4.56789e+29", -4.56789e29), +("float4", "3.402e+38", 3.402e38), +("-float4", "-3.402e+38", -3.402e38), +("float8", "1.79769e+308", 1.79769e308), +("-float8", "-1.79769e+308", -1.79769e308), ("dec4", "123.456", Decimal("123.456")), ("-dec4", "-321.654", Decimal("-321.654")), ("dec8", "429.4967297", Decimal("429.4967297")), @@ -1447,17 +1449,77 @@ class TypesTestsMixin: F.struct([F.parse_json(F.lit('{"b": "2"}'))]).alias("s"), F.create_map([F.lit("k"), F.parse_json(F.lit('{"c": true}'))]).alias("m"), ).collect()[0] -variants = [row["v"], row["a"][0], row["s"]["col1"], row["m"]["k"]] + +# These data types are not supported by parse_json yet so they are being handled +# separately - Date, Timestamp, TimestampNTZ, Binary, Float (Single Precision) +date_columns = self.spark.sql( +"select cast(Date('2021-01-01')" ++ " as variant) as d0, cast(Date('1800-12-31')" ++ " as variant) as d1" +).collect()[0] +float_columns = self.spark.sql( +"select cast(Float(5.5)" + " as variant) as f0, cast(Float(-5.5) as variant) as f1" +).collect()[0] +binary_columns = self.spark.sql( +"select cast(binary(x'324FA69E')" + " as variant) as b" +).collect()[0] +timetamp_ntz_columns = self.spark.sql( +"select cast(cast('1940-
(spark) branch master updated (e1432ef6405a -> 79a1fa4b84dd)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from e1432ef6405a [SPARK-47413][SQL] - add support to substr/left/right for collations add 79a1fa4b84dd [SPARK-47890][CONNECT][PYTHON] Add variant functions to Scala and Python No new revisions were added by this update. Summary of changes: .../scala/org/apache/spark/sql/functions.scala | 69 - .../apache/spark/sql/PlanGenerationTestSuite.scala | 24 .../function_is_variant_null.explain | 2 + .../explain-results/function_parse_json.explain| 2 + .../function_schema_of_variant.explain | 2 + .../function_schema_of_variant_agg.explain | 2 + .../function_try_variant_get.explain | 2 + .../explain-results/function_variant_get.explain | 2 + .../queries/function_is_variant_null.json | 30 .../queries/function_is_variant_null.proto.bin | Bin 0 -> 200 bytes .../query-tests/queries/function_parse_json.json | 25 .../queries/function_parse_json.proto.bin | Bin 0 -> 179 bytes .../queries/function_schema_of_variant.json| 30 .../queries/function_schema_of_variant.proto.bin | Bin 0 -> 202 bytes .../queries/function_schema_of_variant_agg.json| 30 .../function_schema_of_variant_agg.proto.bin | Bin 0 -> 206 bytes .../queries/function_try_variant_get.json | 38 + .../queries/function_try_variant_get.proto.bin | Bin 0 -> 216 bytes .../query-tests/queries/function_variant_get.json | 38 + .../queries/function_variant_get.proto.bin | Bin 0 -> 212 bytes .../source/reference/pyspark.sql/functions.rst | 14 +- python/pyspark/sql/connect/functions/builtin.py| 35 + python/pyspark/sql/functions/builtin.py| 154 + python/pyspark/sql/tests/test_functions.py | 29 .../scala/org/apache/spark/sql/functions.scala | 72 +- .../scala/org/apache/spark/sql/VariantSuite.scala | 30 +++- 26 files changed, 623 insertions(+), 7 deletions(-) create mode 100644 connector/connect/common/src/test/resources/query-tests/explain-results/function_is_variant_null.explain create mode 100644 connector/connect/common/src/test/resources/query-tests/explain-results/function_parse_json.explain create mode 100644 connector/connect/common/src/test/resources/query-tests/explain-results/function_schema_of_variant.explain create mode 100644 connector/connect/common/src/test/resources/query-tests/explain-results/function_schema_of_variant_agg.explain create mode 100644 connector/connect/common/src/test/resources/query-tests/explain-results/function_try_variant_get.explain create mode 100644 connector/connect/common/src/test/resources/query-tests/explain-results/function_variant_get.explain create mode 100644 connector/connect/common/src/test/resources/query-tests/queries/function_is_variant_null.json create mode 100644 connector/connect/common/src/test/resources/query-tests/queries/function_is_variant_null.proto.bin create mode 100644 connector/connect/common/src/test/resources/query-tests/queries/function_parse_json.json create mode 100644 connector/connect/common/src/test/resources/query-tests/queries/function_parse_json.proto.bin create mode 100644 connector/connect/common/src/test/resources/query-tests/queries/function_schema_of_variant.json create mode 100644 connector/connect/common/src/test/resources/query-tests/queries/function_schema_of_variant.proto.bin create mode 100644 connector/connect/common/src/test/resources/query-tests/queries/function_schema_of_variant_agg.json create mode 100644 connector/connect/common/src/test/resources/query-tests/queries/function_schema_of_variant_agg.proto.bin create mode 100644 connector/connect/common/src/test/resources/query-tests/queries/function_try_variant_get.json create mode 100644 connector/connect/common/src/test/resources/query-tests/queries/function_try_variant_get.proto.bin create mode 100644 connector/connect/common/src/test/resources/query-tests/queries/function_variant_get.json create mode 100644 connector/connect/common/src/test/resources/query-tests/queries/function_variant_get.proto.bin - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated (393a84fb074a -> 2d9d444b122d)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 393a84fb074a [SPARK-47909][PYTHON][CONNECT] Parent DataFrame class for Spark Connect and Spark Classic add 2d9d444b122d [MINOR][DOCS] Change `SPARK_ANSI_SQL_MODE`in PlanStabilitySuite documentation No new revisions were added by this update. Summary of changes: sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated (adf02d38061b -> 393a84fb074a)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from adf02d38061b [SPARK-47925][SQL][TESTS] Mark `BloomFilterAggregateQuerySuite` as `ExtendedSQLTest` add 393a84fb074a [SPARK-47909][PYTHON][CONNECT] Parent DataFrame class for Spark Connect and Spark Classic No new revisions were added by this update. Summary of changes: dev/sparktestsupport/modules.py|2 +- python/packaging/classic/setup.py |1 + .../ml/tests/connect/test_connect_function.py |2 +- python/pyspark/sql/{avro => classic}/__init__.py |2 +- python/pyspark/sql/classic/dataframe.py| 1974 python/pyspark/sql/connect/dataframe.py| 535 ++ python/pyspark/sql/connect/functions/builtin.py|4 +- python/pyspark/sql/connect/session.py | 16 +- python/pyspark/sql/dataframe.py| 1676 + python/pyspark/sql/pandas/conversion.py| 24 - python/pyspark/sql/pandas/map_ops.py | 169 -- .../sql/tests/connect/test_connect_column.py |2 +- .../sql/tests/connect/test_connect_function.py |2 +- .../pyspark/sql/tests/connect/test_connect_plan.py |5 + python/pyspark/sql/tests/test_dataframe.py |9 +- python/pyspark/sql/utils.py| 29 +- python/pyspark/testing/mlutils.py |5 + 17 files changed, 2718 insertions(+), 1739 deletions(-) copy python/pyspark/sql/{avro => classic}/__init__.py (96%) create mode 100644 python/pyspark/sql/classic/dataframe.py - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated (e2e17fe6e038 -> bb5ded8f1bb0)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from e2e17fe6e038 [SPARK-47816][CONNECT][DOCS][FOLLOWUP] refine the description add bb5ded8f1bb0 [SPARK-47371][SQL][FOLLOWUP] XML: Stop ignoring CDATA within row tags No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala | 13 - .../resources/test-data/xml-resources/cdata-ending-eof.xml | 7 --- .../resources/test-data/xml-resources/cdata-no-close.xml| 10 +- .../resources/test-data/xml-resources/cdata-no-ignore.xml | 11 +++ .../test/resources/test-data/xml-resources/ignored-rows.xml | 7 --- .../spark/sql/execution/datasources/xml/XmlSuite.scala | 12 6 files changed, 36 insertions(+), 24 deletions(-) create mode 100644 sql/core/src/test/resources/test-data/xml-resources/cdata-no-ignore.xml - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-47816][CONNECT][DOCS][FOLLOWUP] refine the description
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new e2e17fe6e038 [SPARK-47816][CONNECT][DOCS][FOLLOWUP] refine the description e2e17fe6e038 is described below commit e2e17fe6e038d05b78b2008a51a2941b1432d83c Author: Ruifeng Zheng AuthorDate: Fri Apr 19 08:54:09 2024 +0900 [SPARK-47816][CONNECT][DOCS][FOLLOWUP] refine the description ### What changes were proposed in this pull request? `lazily evaluated` -> `lazily analyzed` ### Why are the changes needed? to address https://github.com/apache/spark/pull/46007#discussion_r1568045101 Closes #46118 from zhengruifeng/doc_nit. Authored-by: Ruifeng Zheng Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/session.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 1098c41a3f4c..e0c2c815c827 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -1633,7 +1633,7 @@ class SparkSession(SparkConversionMixin): Notes - In Spark Classic, a temporary view referenced in `spark.sql` is resolved immediately, -while in Spark Connect it is lazily evaluated. +while in Spark Connect it is lazily analyzed. So in Spark Connect if a view is dropped, modified or replaced after `spark.sql`, the execution may fail or generate different results. @@ -1766,7 +1766,7 @@ class SparkSession(SparkConversionMixin): Notes - In Spark Classic, a temporary view referenced in `spark.table` is resolved immediately, -while in Spark Connect it is lazily evaluated. +while in Spark Connect it is lazily analyzed. So in Spark Connect if a view is dropped, modified or replaced after `spark.table`, the execution may fail or generate different results. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-44461][FOLLOWUP][SS][CONNECT] Remove unneeded TODOs
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new d09230b6601c [SPARK-44461][FOLLOWUP][SS][CONNECT] Remove unneeded TODOs d09230b6601c is described below commit d09230b6601ce180213fd24d567ca36c38f90d6c Author: Wei Liu AuthorDate: Fri Apr 19 08:53:23 2024 +0900 [SPARK-44461][FOLLOWUP][SS][CONNECT] Remove unneeded TODOs ### What changes were proposed in this pull request? Remove unneeded todos ### Why are the changes needed? code cleanup ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? No need ### Was this patch authored or co-authored using generative AI tooling? No Closes #46124 from WweiL/remove-todo. Authored-by: Wei Liu Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py | 2 -- python/pyspark/sql/connect/streaming/worker/listener_worker.py | 2 -- 2 files changed, 4 deletions(-) diff --git a/python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py b/python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py index 92ed7a4aaff5..c5730dea4ca1 100644 --- a/python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py +++ b/python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py @@ -62,8 +62,6 @@ def main(infile: IO, outfile: IO) -> None: assert spark_connect_session.session_id == session_id spark = spark_connect_session -# TODO(SPARK-44461): Enable Process Isolation - func = worker.read_command(pickle_ser, infile) write_int(0, outfile) # Indicate successful initialization diff --git a/python/pyspark/sql/connect/streaming/worker/listener_worker.py b/python/pyspark/sql/connect/streaming/worker/listener_worker.py index d3efb5894fc0..3709e50ba026 100644 --- a/python/pyspark/sql/connect/streaming/worker/listener_worker.py +++ b/python/pyspark/sql/connect/streaming/worker/listener_worker.py @@ -70,8 +70,6 @@ def main(infile: IO, outfile: IO) -> None: assert spark_connect_session.session_id == session_id spark = spark_connect_session -# TODO(SPARK-44461): Enable Process Isolation - listener = worker.read_command(pickle_ser, infile) write_int(0, outfile) # Indicate successful initialization - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated (216379df3543 -> bcafb43825ac)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 216379df3543 [SPARK-47858][SPARK-47852][PYTHON][SQL] Refactoring the structure for DataFrame error context add bcafb43825ac [SPARK-47767][SQL] Show offset value in TakeOrderedAndProjectExec No new revisions were added by this update. Summary of changes: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-47858][SPARK-47852][PYTHON][SQL] Refactoring the structure for DataFrame error context
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 216379df3543 [SPARK-47858][SPARK-47852][PYTHON][SQL] Refactoring the structure for DataFrame error context 216379df3543 is described below commit 216379df35435961106c5a2aef35d5f60a6723bf Author: Haejoon Lee AuthorDate: Thu Apr 18 16:01:54 2024 +0900 [SPARK-47858][SPARK-47852][PYTHON][SQL] Refactoring the structure for DataFrame error context ### What changes were proposed in this pull request? This PR proposes to refactoring the current structure for DataFrame error context. This change can cover the reverse binary operations, so it can cover SPARK-47852 as well. ### Why are the changes needed? To make future management and expansion more flexible ### Does this PR introduce _any_ user-facing change? No, it's internal code refactoring ### How was this patch tested? The existing `DataFrameTests.test_dataframe_error_context` should pass. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46063 from itholic/error_context_refactoring. Authored-by: Haejoon Lee Signed-off-by: Hyukjin Kwon --- python/pyspark/errors/utils.py | 85 +++- python/pyspark/sql/column.py | 39 +- .../sql/tests/connect/test_parity_dataframe.py | 4 - ...e.py => test_parity_dataframe_query_context.py} | 18 +- python/pyspark/sql/tests/test_dataframe.py | 482 .../sql/tests/test_dataframe_query_context.py | 497 + .../apache/spark/sql/catalyst/trees/origin.scala | 17 + .../main/scala/org/apache/spark/sql/Column.scala | 23 - .../main/scala/org/apache/spark/sql/package.scala | 76 +--- 9 files changed, 617 insertions(+), 624 deletions(-) diff --git a/python/pyspark/errors/utils.py b/python/pyspark/errors/utils.py index e1f249506dd0..16fba7e272bc 100644 --- a/python/pyspark/errors/utils.py +++ b/python/pyspark/errors/utils.py @@ -16,11 +16,20 @@ # import re -from typing import Dict, Match - +import functools +import inspect +import os +from typing import Any, Callable, Dict, Match, TypeVar, Type, TYPE_CHECKING from pyspark.errors.error_classes import ERROR_CLASSES_MAP +if TYPE_CHECKING: +from pyspark.sql import SparkSession +from py4j.java_gateway import JavaClass + +T = TypeVar("T") + + class ErrorClassesReader: """ A reader to load error information from error_classes.py. @@ -119,3 +128,75 @@ class ErrorClassesReader: message_template = main_message_template + " " + sub_message_template return message_template + + +def _capture_call_site( +spark_session: "SparkSession", pyspark_origin: "JavaClass", fragment: str +) -> None: +""" +Capture the call site information including file name, line number, and function name. +This function updates the thread-local storage from JVM side (PySparkCurrentOrigin) +with the current call site information when a PySpark API function is called. + +Parameters +-- +spark_session : SparkSession +Current active Spark session. +pyspark_origin : py4j.JavaClass +PySparkCurrentOrigin from current active Spark session. +fragment : str +The name of the PySpark API function being captured. + +Notes +- +The call site information is used to enhance error messages with the exact location +in the user code that led to the error. +""" +stack = list(reversed(inspect.stack())) +depth = int( +spark_session.conf.get("spark.sql.stackTracesInDataFrameContext") # type: ignore[arg-type] +) +selected_frames = stack[:depth] +call_sites = [f"{frame.filename}:{frame.lineno}" for frame in selected_frames] +call_sites_str = "\n".join(call_sites) + +pyspark_origin.set(fragment, call_sites_str) + + +def _with_origin(func: Callable[..., Any]) -> Callable[..., Any]: +""" +A decorator to capture and provide the call site information to the server side +when PySpark API functions are invoked. +""" + +@functools.wraps(func) +def wrapper(*args: Any, **kwargs: Any) -> Any: +from pyspark.sql import SparkSession + +spark = SparkSession.getActiveSession() +if spark is not None: +assert spark._jvm is not None +pyspark_origin = spark._jvm.org.apache.spark.sql.catalyst.trees.PySparkCurrentOrigin + +# Update call site when the function is called +_capture_call_site(spark, pyspark_origin,
(spark) branch master updated (eb8688c2b6ce -> 564b2384c40d)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from eb8688c2b6ce [SPARK-47896][BUILD] Upgrade netty to `4.1.109.Final` add 564b2384c40d [SPARK-47864][PYTHON][DOCS] Enhance "Installation" page to cover all installable options No new revisions were added by this update. Summary of changes: python/docs/source/getting_started/install.rst | 118 ++--- 1 file changed, 107 insertions(+), 11 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-46810][DOCS][FOLLOWUP] Make some reference file links clickable
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 2e81f9a3b561 [SPARK-46810][DOCS][FOLLOWUP] Make some reference file links clickable 2e81f9a3b561 is described below commit 2e81f9a3b56107a171249e5ad898e76156f84f0f Author: panbingkun AuthorDate: Thu Apr 18 10:09:18 2024 +0900 [SPARK-46810][DOCS][FOLLOWUP] Make some reference file links clickable ### What changes were proposed in this pull request? The pr is following up https://github.com/apache/spark/pull/44902, to make some `reference files links` clickable. ### Why are the changes needed? Convenient for developers to navigate directly when read this file `README.md`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46105 from panbingkun/SPARK-46810_FOLLOWUP. Authored-by: panbingkun Signed-off-by: Hyukjin Kwon --- common/utils/src/main/resources/error/README.md | 14 +++--- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/common/utils/src/main/resources/error/README.md b/common/utils/src/main/resources/error/README.md index e2f68a1af9f4..adb631ccdca7 100644 --- a/common/utils/src/main/resources/error/README.md +++ b/common/utils/src/main/resources/error/README.md @@ -16,9 +16,9 @@ The error state / SQLSTATE itself is comprised of two parts: 2. Error sub-class Acceptable values for these various error parts are defined in the following files: -* `error-classes.json` -* `error-states.json` -* `error-conditions.json` +* [`error-classes.json`](error-classes.json) +* [`error-states.json`](error-states.json) +* [`error-conditions.json`](error-conditions.json) The terms error class, state, and condition come from the SQL standard. @@ -41,7 +41,7 @@ Unfortunately, we have historically used the term "error class" inconsistently t Fixing this will require renaming `SparkException.errorClass` to `SparkException.errorCondition` and making similar changes to `ErrorClassesJsonReader` and other parts of the codebase. We will address this in [SPARK-47429]. Until that is complete, we will have to live with the fact that a string like `DATATYPE_MISSING_SIZE` is called an "error condition" in our user-facing documentation but an "error class" in the code. -For more details, please see [SPARK-46810][SPARK-46810]. +For more details, please see [SPARK-46810]. [SPARK-46810]: https://issues.apache.org/jira/browse/SPARK-46810 [SPARK-47429]: https://issues.apache.org/jira/browse/SPARK-47429 @@ -51,9 +51,9 @@ For more details, please see [SPARK-46810][SPARK-46810]. 1. Check if the error is an internal error. Internal errors are bugs in the code that we do not expect users to encounter; this does not include unsupported operations. If true, use the error condition `INTERNAL_ERROR` and skip to step 4. -2. Check if an appropriate error condition already exists in `error-conditions.json`. +2. Check if an appropriate error condition already exists in [`error-conditions.json`](error-conditions.json). If true, use the error condition and skip to step 4. -3. Add a new condition to `error-conditions.json`. If the new condition requires a new error state, add the new error state to `error-states.json`. +3. Add a new condition to [`error-conditions.json`](error-conditions.json). If the new condition requires a new error state, add the new error state to [`error-states.json`](error-states.json). 4. Check if the exception type already extends `SparkThrowable`. If true, skip to step 6. 5. Mix `SparkThrowable` into the exception. @@ -165,7 +165,7 @@ For example: The existing `XXKD0` is used for an internal analyzer error. ANSI/ISO standard -The SQLSTATEs in `error-states.json` are collated from: +The SQLSTATEs in [`error-states.json`](error-states.json) are collated from: - SQL2016 - DB2 zOS/LUW - PostgreSQL 15 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-47891][PYTHON][DOCS] Improve docstring of mapInPandas
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a7f8ccef122a [SPARK-47891][PYTHON][DOCS] Improve docstring of mapInPandas a7f8ccef122a is described below commit a7f8ccef122a629559bae91e3847589c4cf1a46a Author: Xinrong Meng AuthorDate: Thu Apr 18 09:47:47 2024 +0900 [SPARK-47891][PYTHON][DOCS] Improve docstring of mapInPandas ### What changes were proposed in this pull request? Improve docstring of mapInPandas - "using a Python native function that takes and outputs a pandas DataFrame" is confusing cause the function takes and outputs "ITERATOR of pandas DataFrames" instead. - "All columns are passed together as an iterator of pandas DataFrames" easily mislead users to think the entire DataFrame will be passed together, "a batch of rows" is used instead. ### Why are the changes needed? More accurate and clear docstring. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Doc change only. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46108 from xinrong-meng/doc_mapInPandas. Authored-by: Xinrong Meng Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/pandas/map_ops.py | 21 ++--- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/python/pyspark/sql/pandas/map_ops.py b/python/pyspark/sql/pandas/map_ops.py index 82bcd58b0c0e..6d8bb7c779b7 100644 --- a/python/pyspark/sql/pandas/map_ops.py +++ b/python/pyspark/sql/pandas/map_ops.py @@ -30,7 +30,7 @@ if TYPE_CHECKING: class PandasMapOpsMixin: """ -Min-in for pandas map operations. Currently, only :class:`DataFrame` +Mix-in for pandas map operations. Currently, only :class:`DataFrame` can use this class. """ @@ -43,16 +43,14 @@ class PandasMapOpsMixin: ) -> "DataFrame": """ Maps an iterator of batches in the current :class:`DataFrame` using a Python native -function that takes and outputs a pandas DataFrame, and returns the result as a -:class:`DataFrame`. +function that is performed on pandas DataFrames both as input and output, +and returns the result as a :class:`DataFrame`. -The function should take an iterator of `pandas.DataFrame`\\s and return -another iterator of `pandas.DataFrame`\\s. All columns are passed -together as an iterator of `pandas.DataFrame`\\s to the function and the -returned iterator of `pandas.DataFrame`\\s are combined as a :class:`DataFrame`. -Each `pandas.DataFrame` size can be controlled by -`spark.sql.execution.arrow.maxRecordsPerBatch`. The size of the function's input and -output can be different. +This method applies the specified Python function to an iterator of +`pandas.DataFrame`\\s, each representing a batch of rows from the original DataFrame. +The returned iterator of `pandas.DataFrame`\\s are combined as a :class:`DataFrame`. +The size of the function's input and output can be different. Each `pandas.DataFrame` +size can be controlled by `spark.sql.execution.arrow.maxRecordsPerBatch`. .. versionadded:: 3.0.0 @@ -68,7 +66,8 @@ class PandasMapOpsMixin: the return type of the `func` in PySpark. The value can be either a :class:`pyspark.sql.types.DataType` object or a DDL-formatted type string. barrier : bool, optional, default False -Use barrier mode execution. +Use barrier mode execution, ensuring that all Python workers in the stage will be +launched concurrently. .. versionadded: 3.5.0 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated (5da7b6878c20 -> 7dc88db0c8b6)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 5da7b6878c20 [SPARK-47763][CONNECT][TESTS] Enable local-cluster tests with pyspark-connect package add 7dc88db0c8b6 [SPARK-47885][PYTHON][CONNECT] Make pyspark.resource compatible with pyspark-connect No new revisions were added by this update. Summary of changes: python/pyspark/resource/profile.py | 11 ++- python/pyspark/resource/requests.py | 22 ++ 2 files changed, 20 insertions(+), 13 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-47763][CONNECT][TESTS] Enable local-cluster tests with pyspark-connect package
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5da7b6878c20 [SPARK-47763][CONNECT][TESTS] Enable local-cluster tests with pyspark-connect package 5da7b6878c20 is described below commit 5da7b6878c2083fc50cb345233e9dac03bf806ac Author: Hyukjin Kwon AuthorDate: Wed Apr 17 21:16:50 2024 +0900 [SPARK-47763][CONNECT][TESTS] Enable local-cluster tests with pyspark-connect package ### What changes were proposed in this pull request? This PR proposes to extends `pyspark-connect` scheduled job to run `pyspark.resource` tests as well. ### Why are the changes needed? In order to make sure pure Python library works with `pyspark.resource`. ### Does this PR introduce _any_ user-facing change? No, test-only. ### How was this patch tested? Tested in my own fork: https://github.com/HyukjinKwon/spark/actions/runs/8718980385/job/23917348664 ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46090 from HyukjinKwon/enable-resources. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- .github/workflows/build_python_connect.yml | 31 +++--- python/packaging/connect/setup.py | 1 + .../resource/tests/test_connect_resources.py | 15 ++- .../sql/tests/connect/client/test_artifact.py | 13 + python/pyspark/sql/tests/connect/test_resources.py | 15 +-- python/pyspark/sql/tests/test_resources.py | 12 + 6 files changed, 56 insertions(+), 31 deletions(-) diff --git a/.github/workflows/build_python_connect.yml b/.github/workflows/build_python_connect.yml index 863980b0c2e5..3e11dec14741 100644 --- a/.github/workflows/build_python_connect.yml +++ b/.github/workflows/build_python_connect.yml @@ -29,6 +29,7 @@ jobs: name: "Build modules: pyspark-connect" runs-on: ubuntu-latest timeout-minutes: 300 +if: github.repository == 'apache/spark' steps: - name: Checkout Spark repository uses: actions/checkout@v4 @@ -80,19 +81,43 @@ jobs: # Make less noisy cp conf/log4j2.properties.template conf/log4j2.properties sed -i 's/rootLogger.level = info/rootLogger.level = warn/g' conf/log4j2.properties - # Start a Spark Connect server + + # Start a Spark Connect server for local PYTHONPATH="python/lib/pyspark.zip:python/lib/py4j-0.10.9.7-src.zip:$PYTHONPATH" ./sbin/start-connect-server.sh \ --driver-java-options "-Dlog4j.configurationFile=file:$GITHUB_WORKSPACE/conf/log4j2.properties" \ --jars "`find connector/connect/server/target -name spark-connect-*SNAPSHOT.jar`,`find connector/protobuf/target -name spark-protobuf-*SNAPSHOT.jar`,`find connector/avro/target -name spark-avro*SNAPSHOT.jar`" + # Make sure running Python workers that contains pyspark.core once. They will be reused. python -c "from pyspark.sql import SparkSession; _ = SparkSession.builder.remote('sc://localhost').getOrCreate().range(100).repartition(100).mapInPandas(lambda x: x, 'id INT').collect()" + # Remove Py4J and PySpark zipped library to make sure there is no JVM connection - rm python/lib/* - rm -r python/pyspark + mv python/lib lib.back + mv python/pyspark pyspark.back + # Several tests related to catalog requires to run them sequencially, e.g., writing a table in a listener. ./python/run-tests --parallelism=1 --python-executables=python3 --modules pyspark-connect,pyspark-ml-connect # None of tests are dependent on each other in Pandas API on Spark so run them in parallel ./python/run-tests --parallelism=4 --python-executables=python3 --modules pyspark-pandas-connect-part0,pyspark-pandas-connect-part1,pyspark-pandas-connect-part2,pyspark-pandas-connect-part3 + + # Stop Spark Connect server. + ./sbin/stop-connect-server.sh + mv lib.back python/lib + mv pyspark.back python/pyspark + + # Start a Spark Connect server for local-cluster + PYTHONPATH="python/lib/pyspark.zip:python/lib/py4j-0.10.9.7-src.zip:$PYTHONPATH" ./sbin/start-connect-server.sh \ +--master "local-cluster[2, 4, 1024]" \ +--driver-java-options "-Dlog4j.configurationFile=file:$GITHUB_WORKSPACE/conf/log4j2.properties" \ +--jars "`find connector/connect/server/target -name spark-connect-*SNAPSHOT.jar`,`find connector/protobuf/target -name spark-protobuf-*SNAPSHOT.jar`,`find connector/avro/target -name spark-avro*SNAPSHOT.
(spark) branch master updated: [SPARK-47884][INFRA] Switch ANSI SQL CI job to NON-ANSI SQL CI job
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new e327c1220abf [SPARK-47884][INFRA] Switch ANSI SQL CI job to NON-ANSI SQL CI job e327c1220abf is described below commit e327c1220abf355dd15135fde07ff08df5ff237b Author: Dongjoon Hyun AuthorDate: Wed Apr 17 19:16:26 2024 +0900 [SPARK-47884][INFRA] Switch ANSI SQL CI job to NON-ANSI SQL CI job ### What changes were proposed in this pull request? This PR aims to switch `ANSI SQL` GitHub Action job to `NON-ANSI SQL` GitHub Action job. - Rename `build_ansi.yml` to `build_non_ansi.yml` - Change job name from `ANSI` to `NON-ANSI` - Change `SPARK_ANSI_SQL_MODE` to `false`. ### Why are the changes needed? Since SPARK-44111, Apache Spark uses ANSI mode by default. So, we need to switch this to keep `NON-ANSI SQL` mode test coverage. - https://github.com/apache/spark/pull/46013 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manual review. This should be tested after merging. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46099 from dongjoon-hyun/SPARK-47884. Authored-by: Dongjoon Hyun Signed-off-by: Hyukjin Kwon --- .github/workflows/{build_ansi.yml => build_non_ansi.yml} | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build_ansi.yml b/.github/workflows/build_non_ansi.yml similarity index 92% rename from .github/workflows/build_ansi.yml rename to .github/workflows/build_non_ansi.yml index d9f587ae203b..cf97cdd4bfa1 100644 --- a/.github/workflows/build_ansi.yml +++ b/.github/workflows/build_non_ansi.yml @@ -17,7 +17,7 @@ # under the License. # -name: "Build / ANSI (master, Hadoop 3, JDK 17, Scala 2.13)" +name: "Build / NON-ANSI (master, Hadoop 3, JDK 17, Scala 2.13)" on: schedule: @@ -36,7 +36,7 @@ jobs: hadoop: hadoop3 envs: >- { - "SPARK_ANSI_SQL_MODE": "true", + "SPARK_ANSI_SQL_MODE": "false", } jobs: >- { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-46812][CONNECT][PYTHON][FOLLOW-UP] Add pyspark.pyspark.sql.connect.resource into PyPi packaging
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 8c2c0fb43a9d [SPARK-46812][CONNECT][PYTHON][FOLLOW-UP] Add pyspark.pyspark.sql.connect.resource into PyPi packaging 8c2c0fb43a9d is described below commit 8c2c0fb43a9d3c5bffcf33aeb3354c01fe6b26cd Author: Hyukjin Kwon AuthorDate: Wed Apr 17 15:27:23 2024 +0900 [SPARK-46812][CONNECT][PYTHON][FOLLOW-UP] Add pyspark.pyspark.sql.connect.resource into PyPi packaging ### What changes were proposed in this pull request? This PR proposes to add `pyspark.pyspark.sql.connect.resource` into PyPi packaging. ### Why are the changes needed? In order for PyPI end users to download PySpark and leverage this feature. ### Does this PR introduce _any_ user-facing change? No, the main change has not been released. ### How was this patch tested? Being tested at https://github.com/apache/spark/pull/46090 ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46094 from HyukjinKwon/SPARK-46812-followup. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- python/packaging/classic/setup.py | 1 + python/packaging/connect/setup.py | 1 + 2 files changed, 2 insertions(+) diff --git a/python/packaging/classic/setup.py b/python/packaging/classic/setup.py index 8eefc17db700..f900fa6e15ee 100755 --- a/python/packaging/classic/setup.py +++ b/python/packaging/classic/setup.py @@ -276,6 +276,7 @@ try: "pyspark.sql.connect.functions", "pyspark.sql.connect.proto", "pyspark.sql.connect.protobuf", +"pyspark.sql.connect.resource", "pyspark.sql.connect.shell", "pyspark.sql.connect.streaming", "pyspark.sql.connect.streaming.worker", diff --git a/python/packaging/connect/setup.py b/python/packaging/connect/setup.py index fe1e7486faa9..19925962804b 100755 --- a/python/packaging/connect/setup.py +++ b/python/packaging/connect/setup.py @@ -145,6 +145,7 @@ try: "pyspark.sql.connect.functions", "pyspark.sql.connect.proto", "pyspark.sql.connect.protobuf", +"pyspark.sql.connect.resource", "pyspark.sql.connect.shell", "pyspark.sql.connect.streaming", "pyspark.sql.connect.streaming.worker", - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-46375][DOCS] Add user guide for Python data source API
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f9ebe1b3d24b [SPARK-46375][DOCS] Add user guide for Python data source API f9ebe1b3d24b is described below commit f9ebe1b3d24b126784b3bb65d1eb710a74cf63de Author: allisonwang-db AuthorDate: Wed Apr 17 09:54:42 2024 +0900 [SPARK-46375][DOCS] Add user guide for Python data source API ### What changes were proposed in this pull request? This PR adds a new user guide for the Python data source API with a simple example. More examples (including streaming) will be added in the future. ### Why are the changes needed? To improve the documentation ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? doctest ### Was this patch authored or co-authored using generative AI tooling? No Closes #46089 from allisonwang-db/spark-46375-pyds-user-guide. Authored-by: allisonwang-db Signed-off-by: Hyukjin Kwon --- python/docs/source/user_guide/sql/index.rst| 1 + .../source/user_guide/sql/python_data_source.rst | 139 + 2 files changed, 140 insertions(+) diff --git a/python/docs/source/user_guide/sql/index.rst b/python/docs/source/user_guide/sql/index.rst index 118cf139d9b3..d1b67f7eeb90 100644 --- a/python/docs/source/user_guide/sql/index.rst +++ b/python/docs/source/user_guide/sql/index.rst @@ -25,5 +25,6 @@ Spark SQL arrow_pandas python_udtf + python_data_source type_conversions diff --git a/python/docs/source/user_guide/sql/python_data_source.rst b/python/docs/source/user_guide/sql/python_data_source.rst new file mode 100644 index ..19ed016b82c2 --- /dev/null +++ b/python/docs/source/user_guide/sql/python_data_source.rst @@ -0,0 +1,139 @@ +.. Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + +..http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. + +== +Python Data Source API +== + +.. currentmodule:: pyspark.sql + +Overview + +The Python Data Source API is a new feature introduced in Spark 4.0, enabling developers to read from custom data sources and write to custom data sinks in Python. +This guide provides a comprehensive overview of the API and instructions on how to create, use, and manage Python data sources. + + +Creating a Python Data Source +- +To create a custom Python data source, you'll need to subclass the :class:`DataSource` base classes and implement the necessary methods for reading and writing data. + +This example demonstrates creating a simple data source to generate synthetic data using the `faker` library. Ensure the `faker` library is installed and accessible in your Python environment. + +**Step 1: Define the Data Source** + +Start by creating a new subclass of :class:`DataSource`. Define the source name, schema, and reader logic as follows: + +.. code-block:: python + +from pyspark.sql.datasource import DataSource, DataSourceReader +from pyspark.sql.types import StructType + +class FakeDataSource(DataSource): +""" +A fake data source for PySpark to generate synthetic data using the `faker` library. +Options: +- numRows: specify number of rows to generate. Default value is 3. +""" + +@classmethod +def name(cls): +return "fake" + +def schema(self): +return "name string, date string, zipcode string, state string" + +def reader(self, schema: StructType): +return FakeDataSourceReader(schema, self.options) + + +**Step 2: Implement the Reader** + +Define the reader logic to generate synthetic data. Use the `faker` library to populate each field in the schema. + +.. code-block:: python + +class FakeDataSourceReader(DataSourceReader): + +def __init__(self, schema, options): +self.schema: StructType = schema +self.options = option
(spark) branch master updated (5321353b24db -> 86837d3155b1)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 5321353b24db [SPARK-47875][CORE] Remove `spark.deploy.recoverySerializer` add 86837d3155b1 [SPARK-47877][SS][CONNECT] Speed up test_parity_listener No new revisions were added by this update. Summary of changes: .../connect/streaming/test_parity_listener.py | 119 +++-- .../sql/tests/streaming/test_streaming_listener.py | 21 ++-- 2 files changed, 71 insertions(+), 69 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-47760][SPARK-47763][CONNECT][TESTS] Reeanble Avro and Protobuf function doctests
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 57c7db2c4c1d [SPARK-47760][SPARK-47763][CONNECT][TESTS] Reeanble Avro and Protobuf function doctests 57c7db2c4c1d is described below commit 57c7db2c4c1dbeeba062fe28ab58245e0a3098eb Author: Hyukjin Kwon AuthorDate: Wed Apr 17 08:47:01 2024 +0900 [SPARK-47760][SPARK-47763][CONNECT][TESTS] Reeanble Avro and Protobuf function doctests ### What changes were proposed in this pull request? This PR proposes to reeanble Avro and Protobuf function doctests by providing the required jars into Spark Connect server. ### Why are the changes needed? For test coverages of Avro and Protobuf functions. ### Does this PR introduce _any_ user-facing change? No, test-only. ### How was this patch tested? Tested in my fork: https://github.com/HyukjinKwon/spark/actions/runs/8704014674/job/23871383802 ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46055 from HyukjinKwon/SPARK-47763-SPARK-47760. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- .github/workflows/build_python_connect.yml | 11 ++- python/pyspark/sql/connect/avro/functions.py | 7 --- python/pyspark/sql/connect/protobuf/functions.py | 7 --- 3 files changed, 6 insertions(+), 19 deletions(-) diff --git a/.github/workflows/build_python_connect.yml b/.github/workflows/build_python_connect.yml index 965e839b6b2b..863980b0c2e5 100644 --- a/.github/workflows/build_python_connect.yml +++ b/.github/workflows/build_python_connect.yml @@ -29,7 +29,6 @@ jobs: name: "Build modules: pyspark-connect" runs-on: ubuntu-latest timeout-minutes: 300 -if: github.repository == 'apache/spark' steps: - name: Checkout Spark repository uses: actions/checkout@v4 @@ -63,7 +62,7 @@ jobs: architecture: x64 - name: Build Spark run: | - ./build/sbt -Phive test:package + ./build/sbt -Phive Test/package - name: Install pure Python package (pyspark-connect) env: SPARK_TESTING: 1 @@ -82,7 +81,9 @@ jobs: cp conf/log4j2.properties.template conf/log4j2.properties sed -i 's/rootLogger.level = info/rootLogger.level = warn/g' conf/log4j2.properties # Start a Spark Connect server - PYTHONPATH="python/lib/pyspark.zip:python/lib/py4j-0.10.9.7-src.zip:$PYTHONPATH" ./sbin/start-connect-server.sh --driver-java-options "-Dlog4j.configurationFile=file:$GITHUB_WORKSPACE/conf/log4j2.properties" --jars `find connector/connect/server/target -name spark-connect*SNAPSHOT.jar` + PYTHONPATH="python/lib/pyspark.zip:python/lib/py4j-0.10.9.7-src.zip:$PYTHONPATH" ./sbin/start-connect-server.sh \ +--driver-java-options "-Dlog4j.configurationFile=file:$GITHUB_WORKSPACE/conf/log4j2.properties" \ +--jars "`find connector/connect/server/target -name spark-connect-*SNAPSHOT.jar`,`find connector/protobuf/target -name spark-protobuf-*SNAPSHOT.jar`,`find connector/avro/target -name spark-avro*SNAPSHOT.jar`" # Make sure running Python workers that contains pyspark.core once. They will be reused. python -c "from pyspark.sql import SparkSession; _ = SparkSession.builder.remote('sc://localhost').getOrCreate().range(100).repartition(100).mapInPandas(lambda x: x, 'id INT').collect()" # Remove Py4J and PySpark zipped library to make sure there is no JVM connection @@ -98,9 +99,9 @@ jobs: with: name: test-results-spark-connect-python-only path: "**/target/test-reports/*.xml" - - name: Upload unit tests log files + - name: Upload Spark Connect server log file if: failure() uses: actions/upload-artifact@v4 with: name: unit-tests-log-spark-connect-python-only - path: "**/target/unit-tests.log" + path: logs/*.out diff --git a/python/pyspark/sql/connect/avro/functions.py b/python/pyspark/sql/connect/avro/functions.py index 43088333b108..f153b17acf58 100644 --- a/python/pyspark/sql/connect/avro/functions.py +++ b/python/pyspark/sql/connect/avro/functions.py @@ -80,15 +80,8 @@ def _test() -> None: import doctest from pyspark.sql import SparkSession as PySparkSession import pyspark.sql.connect.avro.functions -from pyspark.util import is_remote_only globs = pyspark.sql.connect.avro.functions.__dict__.copy() - -# TODO(SPARK-47760): Reeanble Avro function doctests -if is_remote_only(): -del pyspark.sql.connect.avro.fu
(spark) branch master updated: [SPARK-47818][CONNECT] Introduce plan cache in SparkConnectPlanner to improve performance of Analyze requests
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a1fc6d57b27d [SPARK-47818][CONNECT] Introduce plan cache in SparkConnectPlanner to improve performance of Analyze requests a1fc6d57b27d is described below commit a1fc6d57b27d24b832b2f2580e6acd64c4488c62 Author: Xi Lyu AuthorDate: Tue Apr 16 16:27:32 2024 +0900 [SPARK-47818][CONNECT] Introduce plan cache in SparkConnectPlanner to improve performance of Analyze requests ### What changes were proposed in this pull request? While building the DataFrame step by step, each time a new DataFrame is generated with an empty schema, which is lazily computed on access. However, if a user's code frequently accesses the schema of these new DataFrames using methods such as `df.columns`, it will result in a large number of Analyze requests to the server. Each time, the entire plan needs to be reanalyzed, leading to poor performance, especially when constructing highly complex plans. Now, by introducing plan cache in SparkConnectPlanner, we aim to reduce the overhead of repeated analysis during this process. This is achieved by saving significant computation if the resolved logical plan of a subtree of can be cached. A minimal example of the problem: ``` import pyspark.sql.functions as F df = spark.range(10) for i in range(200): if str(i) not in df.columns: # <-- The df.columns call causes a new Analyze request in every iteration df = df.withColumn(str(i), F.col("id") + i) df.show() ``` With this patch, the performance of the above code improved from ~110s to ~5s. ### Why are the changes needed? The performance improvement is huge in the above cases. ### Does this PR introduce _any_ user-facing change? Yes, a static conf `spark.connect.session.planCache.maxSize` and a dynamic conf `spark.connect.session.planCache.enabled` are added. * `spark.connect.session.planCache.maxSize`: Sets the maximum number of cached resolved logical plans in Spark Connect Session. If set to a value less or equal than zero will disable the plan cache * `spark.connect.session.planCache.enabled`: When true, the cache of resolved logical plans is enabled if `spark.connect.session.planCache.maxSize` is greater than zero. When false, the cache is disabled even if `spark.connect.session.planCache.maxSize` is greater than zero. The caching is best-effort and not guaranteed. ### How was this patch tested? Some new tests are added in SparkConnectSessionHolderSuite.scala. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46012 from xi-db/SPARK-47818-plan-cache. Lead-authored-by: Xi Lyu Co-authored-by: Xi Lyu <159039256+xi...@users.noreply.github.com> Signed-off-by: Hyukjin Kwon --- .../apache/spark/sql/connect/config/Connect.scala | 18 ++ .../sql/connect/planner/SparkConnectPlanner.scala | 201 - .../spark/sql/connect/service/SessionHolder.scala | 79 +++- .../service/SparkConnectAnalyzeHandler.scala | 26 +-- .../service/SparkConnectSessionHolderSuite.scala | 125 - 5 files changed, 345 insertions(+), 104 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala index 6ba100af1bb9..e94e86587393 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala @@ -273,4 +273,22 @@ object Connect { .version("4.0.0") .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("2s") + + val CONNECT_SESSION_PLAN_CACHE_SIZE = +buildStaticConf("spark.connect.session.planCache.maxSize") + .doc("Sets the maximum number of cached resolved logical plans in Spark Connect Session." + +" If set to a value less or equal than zero will disable the plan cache.") + .version("4.0.0") + .intConf + .createWithDefault(5) + + val CONNECT_SESSION_PLAN_CACHE_ENABLED = +buildConf("spark.connect.session.planCache.enabled") + .doc("When true, the cache of resolved logical plans is enabled if" + +s" '${CONNECT_SESSION_PLAN_CACHE_SIZE.key}' is greater than zero." + +s" When false, the cache is disabled even if '${CONNECT_SESSION_PLAN_CACHE_SIZE.key}' is" + +" greater than zero. The caching is best-effort and not guaranteed.") +
(spark) branch master updated (3ff339362b75 -> 21cc89b68ba5)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 3ff339362b75 [SPARK-47861][BUILD] Upgrade `slf4j` to 2.0.13 add 21cc89b68ba5 [SPARK-47862][PYTHON][CONNECT] Fix generation of proto files No new revisions were added by this update. Summary of changes: dev/connect-gen-protos.sh | 3 +++ python/pyspark/sql/connect/proto/base_pb2.py| 2 +- python/pyspark/sql/connect/proto/catalog_pb2.py | 4 +++- python/pyspark/sql/connect/proto/commands_pb2.py| 4 +++- python/pyspark/sql/connect/proto/common_pb2.py | 4 +++- python/pyspark/sql/connect/proto/example_plugins_pb2.py | 4 +++- python/pyspark/sql/connect/proto/expressions_pb2.py | 4 +++- python/pyspark/sql/connect/proto/relations_pb2.py | 4 +++- python/pyspark/sql/connect/proto/types_pb2.py | 4 +++- python/pyspark/sql/tests/connect/test_connect_basic.py | 8 10 files changed, 33 insertions(+), 8 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-47233][CONNECT][SS][2/2] Client & Server logic for Client side streaming query listener
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 51d3efcead5b [SPARK-47233][CONNECT][SS][2/2] Client & Server logic for Client side streaming query listener 51d3efcead5b is described below commit 51d3efcead5ba54b568a7be7f236179c6174e547 Author: Wei Liu AuthorDate: Tue Apr 16 12:18:01 2024 +0900 [SPARK-47233][CONNECT][SS][2/2] Client & Server logic for Client side streaming query listener ### What changes were proposed in this pull request? Server and client side for the client side listener. The client should start send a `add_listener_bus_listener` RPC for the first listener ever added. The server should start a long running thread and register a new "SparkConnectListenerBusListener" upon receiving the RPC, the listener should stream back the listener events to the client using the `responseObserver` created in the `executeHandler` of the `add_listener_bus_listener` call. On the client side, a spark client method: `execute_long_running_command` is created to continuously receive new events from the server with a long-running iterator. The client starts a new thread for handing such events. Please see the graphs below for a more detailed illustration. When either the last client side listener is removed, and the client sends "remove_listener_bus_listener" call, or the `send` method of `SparkConnectListenerBusListener` throws, the long-running server thread is stopped, as an effect, the final `ResultComplete` is sent to the client, closing the client's long-running iterator. ### Why are the changes needed? Development of spark connect streaming ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Added unit test. Removed old unit test that created for verifying server-side listener limitations. ### Was this patch authored or co-authored using generative AI tooling? No Closes #46037 from WweiL/SPARK-47233-client-side-listener-2. Authored-by: Wei Liu Signed-off-by: Hyukjin Kwon --- .../service/SparkConnectListenerBusListener.scala | 40 +++-- python/pyspark/sql/connect/client/core.py | 25 +++ python/pyspark/sql/connect/streaming/query.py | 196 ++--- python/pyspark/sql/connect/streaming/readwriter.py | 12 +- .../connect/streaming/test_parity_listener.py | 183 ++- 5 files changed, 376 insertions(+), 80 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala index 1b6c5179871d..56d0d920e95b 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala @@ -51,7 +51,11 @@ private[sql] class ServerSideListenerHolder(val sessionHolder: SessionHolder) { val streamingQueryStartedEventCache : ConcurrentMap[String, StreamingQueryListener.QueryStartedEvent] = new ConcurrentHashMap() - def isServerSideListenerRegistered: Boolean = streamingQueryServerSideListener.isDefined + val lock = new Object() + + def isServerSideListenerRegistered: Boolean = lock.synchronized { +streamingQueryServerSideListener.isDefined + } /** * The initialization of the server side listener and related resources. This method is called @@ -62,7 +66,7 @@ private[sql] class ServerSideListenerHolder(val sessionHolder: SessionHolder) { * @param responseObserver * the responseObserver created from the first long running executeThread. */ - def init(responseObserver: StreamObserver[ExecutePlanResponse]): Unit = { + def init(responseObserver: StreamObserver[ExecutePlanResponse]): Unit = lock.synchronized { val serverListener = new SparkConnectListenerBusListener(this, responseObserver) sessionHolder.session.streams.addListener(serverListener) streamingQueryServerSideListener = Some(serverListener) @@ -76,7 +80,7 @@ private[sql] class ServerSideListenerHolder(val sessionHolder: SessionHolder) { * the latch, so the long-running thread can proceed to send back the final ResultComplete * response. */ - def cleanUp(): Unit = { + def cleanUp(): Unit = lock.synchronized { streamingQueryServerSideListener.foreach { listener => sessionHolder.session.streams.removeListener(listener) } @@ -106,18 +110,18 @@ private[sql] class SparkConnectListenerBusListener( // all related sources are cleaned
(spark) branch master updated (61f8ec6c5175 -> e815012f26ab)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 61f8ec6c5175 [SPARK-47866][SQL][TESTS] Use explicit GC in `PythonForeachWriterSuite` add e815012f26ab [SPARK-47371][SQL] XML: Ignore row tags found in CDATA No new revisions were added by this update. Summary of changes: .../spark/sql/catalyst/xml/StaxXmlParser.scala | 47 + .../test-data/xml-resources/cdata-ending-eof.xml | 48 ++ .../test-data/xml-resources/cdata-no-close.xml | 48 ++ .../test-data/xml-resources/commented-row.xml | 25 --- .../test-data/xml-resources/ignored-rows.xml | 45 .../sql/execution/datasources/xml/XmlSuite.scala | 21 -- 6 files changed, 196 insertions(+), 38 deletions(-) create mode 100644 sql/core/src/test/resources/test-data/xml-resources/cdata-ending-eof.xml create mode 100644 sql/core/src/test/resources/test-data/xml-resources/cdata-no-close.xml delete mode 100644 sql/core/src/test/resources/test-data/xml-resources/commented-row.xml create mode 100644 sql/core/src/test/resources/test-data/xml-resources/ignored-rows.xml - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated (c5b8e60e0d59 -> 61f8ec6c5175)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from c5b8e60e0d59 [SPARK-46810][DOCS] Align error class terminology with SQL standard add 61f8ec6c5175 [SPARK-47866][SQL][TESTS] Use explicit GC in `PythonForeachWriterSuite` No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/execution/python/PythonForeachWriterSuite.scala | 1 + 1 file changed, 1 insertion(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated (e6b7950f553c -> b8354bbe53c0)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from e6b7950f553c [SPARK-47788][SS] Ensure the same hash partitioning for streaming stateful ops add b8354bbe53c0 [SPARK-47851][CONNECT][DOCS] Document pyspark-connect package No new revisions were added by this update. Summary of changes: python/docs/source/getting_started/install.rst | 15 ++- 1 file changed, 14 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated (a7cff5c7a383 -> 73aa4059cd5c)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from a7cff5c7a383 [SPARK-47757][SPARK-47756][CONNECT][PYTHON][TESTS] Make testing Spark Connect server having pyspark.core add 73aa4059cd5c [SPARK-47849][PYTHON][CONNECT] Change release script to release pyspark-connect No new revisions were added by this update. Summary of changes: dev/create-release/release-build.sh | 14 ++ dev/make-distribution.sh| 1 + 2 files changed, 15 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated (1096801ead29 -> a7cff5c7a383)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 1096801ead29 [SPARK-47828][CONNECT][PYTHON] DataFrameWriterV2.overwrite` fails with invalid plan add a7cff5c7a383 [SPARK-47757][SPARK-47756][CONNECT][PYTHON][TESTS] Make testing Spark Connect server having pyspark.core No new revisions were added by this update. Summary of changes: .github/workflows/build_python_connect.yml | 4 +++- .../pyspark/sql/tests/connect/test_parity_memory_profiler.py | 3 --- python/pyspark/sql/tests/connect/test_parity_udf_profiler.py | 3 --- python/pyspark/worker_util.py| 12 4 files changed, 7 insertions(+), 15 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated (85c4f053f25a -> e7d0ba783f43)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 85c4f053f25a [SPARK-47841][BUILD] Upgrade `postgresql` to 42.7.3 add e7d0ba783f43 [SPARK-47826][SQL][PYTHON][FOLLOW-UP] Do not use list[...] that is invalid type syntax for Python 3.8 No new revisions were added by this update. Summary of changes: python/pyspark/sql/variant_utils.py | 12 ++-- 1 file changed, 6 insertions(+), 6 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-47812][CONNECT] Support Serialization of SparkSession for ForEachBatch worker
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new e7fc4003b246 [SPARK-47812][CONNECT] Support Serialization of SparkSession for ForEachBatch worker e7fc4003b246 is described below commit e7fc4003b246bab743ab82d9e7bb77c0e2e5946e Author: Martin Grund AuthorDate: Sat Apr 13 10:30:23 2024 +0900 [SPARK-47812][CONNECT] Support Serialization of SparkSession for ForEachBatch worker ### What changes were proposed in this pull request? This patch adds support to register custom dispatch handlers when serializing objects using the provided Cloudpickle library. This is necessary to provide compatibility when executing ForEachBatch functions in structured streaming. A typical example for this behavior is the following test case: ```python def curried_function(df): def inner(batch_df, batch_id): df.createOrReplaceTempView("updates") batch_df.createOrReplaceTempView("batch_updates") return inner df = spark.readStream.format("text").load("python/test_support/sql/streaming") other_df = self.spark.range(100) df.writeStream.foreachBatch(curried_function(other_df)).start() ``` Here we curry a DataFrame into the function called during ForEachBatch and effectively passing state. Until now, serializing DataFrames and SparkSessions in Spark Connect was not possible since the SparkSession carries the open GPRC connection and the DataFrame itself overrides certain magic methods that make pickling fail. To make serializing Spark Sessions possible, we register a custom session constructor, that simply returns the current active session, during the serialization of the ForEachBatch function. Now, when the ForEachBatch worker starts the execution it already creates and registers an active SparkSession. To serialize and reconstruct the DataFrame we simply have to pass in the session and the plan, the remaining attributes do not carry a permanent state. To avoid modifying any global behavior, the serialization handlers are not registered for all cases but only when the ForEachBatch and ForEach handlers are called. This is to make sure that we don't unexpectedly change behavior. ### Why are the changes needed? Compatibility and Ease of Use ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added and updated tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #46002 from grundprinzip/SPARK-47812. Lead-authored-by: Martin Grund Co-authored-by: Martin Grund Co-authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/connect/dataframe.py| 22 +++ python/pyspark/sql/connect/session.py | 37 .../streaming/worker/foreach_batch_worker.py | 15 - .../connect/streaming/worker/listener_worker.py| 15 - .../connect/streaming/test_parity_foreach_batch.py | 70 +- .../connect/streaming/test_parity_listener.py | 23 ++- .../pyspark/sql/tests/connect/test_parity_udtf.py | 18 +- 7 files changed, 163 insertions(+), 37 deletions(-) diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index 1dddcc078810..f0dc412760a4 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -122,6 +122,28 @@ class DataFrame: self._support_repr_html = False self._cached_schema: Optional[StructType] = None +def __reduce__(self) -> Tuple: +""" +Custom method for serializing the DataFrame object using Pickle. Since the DataFrame +overrides "__getattr__" method, the default serialization method does not work. + +Returns +--- +The tuple containing the information needed to reconstruct the object. + +""" +return ( +DataFrame, +( +self._plan, +self._session, +), +{ +"_support_repr_html": self._support_repr_html, +"_cached_schema": self._cached_schema, +}, +) + def __repr__(self) -> str: if not self._support_repr_html: ( diff --git a/python/pyspark/sql/connect/session.py b/python/pyspark/sql/connect/session.py index 07fe8a62f082..3be6c83cf13b 100644 --- a/python/pyspark/sql/connect/session.py +++ b/python/pyspark/sql/connect/session.py @@ -96,6 +96,7 @@ from pyspark.errors import ( PySparkRuntimeError, PySparkValueEr
(spark) branch master updated (de00ac8a05ae -> f0ea0968d946)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from de00ac8a05ae [SPARK-47765][SQL] Add SET COLLATION to parser rules add f0ea0968d946 [MINOR][PYTHON] Enable parity test `test_different_group_key_cardinality` No new revisions were added by this update. Summary of changes: python/pyspark/sql/tests/connect/test_parity_pandas_cogrouped_map.py | 4 python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py | 2 +- 2 files changed, 1 insertion(+), 5 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-47831][PS][CONNECT][TESTS] Run Pandas API on Spark for pyspark-connect package
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new daf260f74e12 [SPARK-47831][PS][CONNECT][TESTS] Run Pandas API on Spark for pyspark-connect package daf260f74e12 is described below commit daf260f74e12fc5e9fad6091f6230e71a9e6c9c1 Author: Hyukjin Kwon AuthorDate: Fri Apr 12 18:39:22 2024 +0900 [SPARK-47831][PS][CONNECT][TESTS] Run Pandas API on Spark for pyspark-connect package ### What changes were proposed in this pull request? This PR proposes to extends `pyspark-connect` scheduled job to run Pandas API on Spark tests as well. ### Why are the changes needed? In order to make sure pure Python library works with Pandas API on Spark. ### Does this PR introduce _any_ user-facing change? No, test-only. ### How was this patch tested? https://github.com/HyukjinKwon/spark/actions/runs/8659133747/job/23744381515 ### Was this patch authored or co-authored using generative AI tooling? No Closes #46001 from HyukjinKwon/test-ps-scheduledjob. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- .github/workflows/build_python_connect.yml | 12 +++--- python/packaging/connect/setup.py | 26 ++ .../tests/connect/test_parity_memory_profiler.py | 3 +++ 3 files changed, 38 insertions(+), 3 deletions(-) diff --git a/.github/workflows/build_python_connect.yml b/.github/workflows/build_python_connect.yml index 8deee026131e..6bd1b4526b0d 100644 --- a/.github/workflows/build_python_connect.yml +++ b/.github/workflows/build_python_connect.yml @@ -72,18 +72,24 @@ jobs: python packaging/connect/setup.py sdist cd dist pip install pyspark-connect-*.tar.gz - pip install scikit-learn torch torchvision torcheval + pip install 'six==1.16.0' 'pandas<=2.2.2' scipy 'plotly>=4.8' 'mlflow>=2.8.1' coverage matplotlib openpyxl 'memory-profiler>=0.61.0' 'scikit-learn>=1.3.2' torch torchvision torcheval deepspeed unittest-xml-reporting - name: Run tests env: - SPARK_CONNECT_TESTING_REMOTE: sc://localhost SPARK_TESTING: 1 + SPARK_CONNECT_TESTING_REMOTE: sc://localhost run: | + # Make less noisy + cp conf/log4j2.properties.template conf/log4j2.properties + sed -i 's/rootLogger.level = info/rootLogger.level = warn/g' conf/log4j2.properties # Start a Spark Connect server - ./sbin/start-connect-server.sh --jars `find connector/connect/server/target -name spark-connect*SNAPSHOT.jar` + ./sbin/start-connect-server.sh --driver-java-options "-Dlog4j.configurationFile=file:$GITHUB_WORKSPACE/conf/log4j2.properties" --jars `find connector/connect/server/target -name spark-connect*SNAPSHOT.jar` # Remove Py4J and PySpark zipped library to make sure there is no JVM connection rm python/lib/* rm -r python/pyspark + # Several tests related to catalog requires to run them sequencially, e.g., writing a table in a listener. ./python/run-tests --parallelism=1 --python-executables=python3 --modules pyspark-connect,pyspark-ml-connect + # None of tests are dependent on each other in Pandas API on Spark so run them in parallel + ./python/run-tests --parallelism=4 --python-executables=python3 --modules pyspark-pandas-connect-part0,pyspark-pandas-connect-part1,pyspark-pandas-connect-part2,pyspark-pandas-connect-part3 - name: Upload test results to report if: always() uses: actions/upload-artifact@v4 diff --git a/python/packaging/connect/setup.py b/python/packaging/connect/setup.py index 419ed36b4236..fe1e7486faa9 100755 --- a/python/packaging/connect/setup.py +++ b/python/packaging/connect/setup.py @@ -78,6 +78,32 @@ if "SPARK_TESTING" in os.environ: "pyspark.sql.tests.pandas", "pyspark.sql.tests.streaming", "pyspark.ml.tests.connect", +"pyspark.pandas.tests", +"pyspark.pandas.tests.computation", +"pyspark.pandas.tests.data_type_ops", +"pyspark.pandas.tests.diff_frames_ops", +"pyspark.pandas.tests.frame", +"pyspark.pandas.tests.groupby", +"pyspark.pandas.tests.indexes", +"pyspark.pandas.tests.io", +"pyspark.pandas.tests.plot", +"pyspark.pandas.tests.resample", +"pyspark.pandas.tests.reshape", +"pyspark.pandas.tests.series", +"pyspark.pandas.tests.window", +
(spark) branch master updated (1ee3496f4836 -> 4e46852d0880)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 1ee3496f4836 [SPARK-47792][CORE] Make the value of MDC can support `null` & cannot be `MessageWithContext` add 4e46852d0880 [SPARK-47827][PYTHON] Missing warnings for deprecated features No new revisions were added by this update. Summary of changes: python/pyspark/sql/connect/catalog.py | 18 ++ python/pyspark/sql/connect/functions/builtin.py | 5 + 2 files changed, 23 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-47174][CONNECT][SS][1/2] Server side SparkConnectListenerBusListener for Client side streaming query listener
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f086c2327c36 [SPARK-47174][CONNECT][SS][1/2] Server side SparkConnectListenerBusListener for Client side streaming query listener f086c2327c36 is described below commit f086c2327c36c396ae5d886afd3ef613650c6b0d Author: Wei Liu AuthorDate: Fri Apr 12 10:08:45 2024 +0900 [SPARK-47174][CONNECT][SS][1/2] Server side SparkConnectListenerBusListener for Client side streaming query listener ### What changes were proposed in this pull request? Server side `SparkConnectListenerBusListener` implementation for the client side listener. There would only be one such listener for each `SessionHolder`. ### Why are the changes needed? Move streaming query listener to client side ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit test ### Was this patch authored or co-authored using generative AI tooling? No Closes #45988 from WweiL/SPARK-47174-client-side-listener-1. Authored-by: Wei Liu Signed-off-by: Hyukjin Kwon --- .../sql/connect/planner/SparkConnectPlanner.scala | 34 ++- ...SparkConnectStreamingQueryListenerHandler.scala | 121 +++ .../spark/sql/connect/service/SessionHolder.scala | 7 + .../service/SparkConnectListenerBusListener.scala | 156 ++ .../SparkConnectListenerBusListenerSuite.scala | 240 + 5 files changed, 555 insertions(+), 3 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 96db45c5c63e..5e7f3b74c299 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -2551,6 +2551,11 @@ class SparkConnectPlanner( handleStreamingQueryManagerCommand( command.getStreamingQueryManagerCommand, responseObserver) + case proto.Command.CommandTypeCase.STREAMING_QUERY_LISTENER_BUS_COMMAND => +val handler = new SparkConnectStreamingQueryListenerHandler(executeHolder) +handler.handleListenerCommand( + command.getStreamingQueryListenerBusCommand, + responseObserver) case proto.Command.CommandTypeCase.GET_RESOURCES_COMMAND => handleGetResourcesCommand(responseObserver) case proto.Command.CommandTypeCase.CREATE_RESOURCE_PROFILE_COMMAND => @@ -3118,7 +3123,7 @@ class SparkConnectPlanner( } executeHolder.eventsManager.postFinished() -val result = WriteStreamOperationStartResult +val resultBuilder = WriteStreamOperationStartResult .newBuilder() .setQueryId( StreamingQueryInstanceId @@ -3127,14 +3132,37 @@ class SparkConnectPlanner( .setRunId(query.runId.toString) .build()) .setName(Option(query.name).getOrElse("")) - .build() + +// The query started event for this query is sent to the client, and is handled by +// the client side listeners before client's DataStreamWriter.start() returns. +// This is to ensure that the onQueryStarted call back is called before the start() call, which +// is defined in the onQueryStarted API. +// So the flow is: +// 1. On the server side, the query is started above. +// 2. Per the contract of the onQueryStarted API, the queryStartedEvent is added to the +//streamingServersideListenerHolder.streamingQueryStartedEventCache, by the onQueryStarted +//call back of streamingServersideListenerHolder.streamingQueryServerSideListener. +// 3. The queryStartedEvent is sent to the client. +// 4. The client side listener handles the queryStartedEvent and calls the onQueryStarted API, +//before the client side DataStreamWriter.start(). +// This way we ensure that the onQueryStarted API is called before the start() call in Connect. +val queryStartedEvent = Option( + sessionHolder.streamingServersideListenerHolder.streamingQueryStartedEventCache.remove( +query.runId.toString)) +queryStartedEvent.foreach { + logDebug( +s"[SessionId: $sessionId][UserId: $userId][operationId: " + + s"${executeHolder.operationId}][query id: ${query.id}][query runId: ${query.runId}] " + + s"Adding QueryStartedEvent to response") + e => resultBuilder.setQueryStartedEventJson(e.json) +} responseObserver.onNext( ExecutePlanResponse
(spark) branch branch-3.5 updated: [SPARK-47824][PS] Fix nondeterminism in pyspark.pandas.series.asof
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new d18659de626c [SPARK-47824][PS] Fix nondeterminism in pyspark.pandas.series.asof d18659de626c is described below commit d18659de626cc3743e7f6a5dceca0f2a25b006de Author: Mark Jarvin AuthorDate: Fri Apr 12 09:37:19 2024 +0900 [SPARK-47824][PS] Fix nondeterminism in pyspark.pandas.series.asof ### What changes were proposed in this pull request? Use the monotonically ID as a sorting condition for `max_by` instead of a literal string. ### Why are the changes needed? https://github.com/apache/spark/pull/35191 had a error where the literal string `"__monotonically_increasing_id__"` was used as the tie-breaker in `max_by` instead of the actual ID. ### Does this PR introduce _any_ user-facing change? Fixes nondeterminism in `asof` ### How was this patch tested? In some circumstances `//python:pyspark.pandas.tests.connect.series.test_parity_as_of` is sufficient to reproduce ### Was this patch authored or co-authored using generative AI tooling? No Closes #46018 from markj-db/SPARK-47824. Authored-by: Mark Jarvin Signed-off-by: Hyukjin Kwon (cherry picked from commit a0ccdf27e5ff30817b8f058f08f98d5b44bad2db) Signed-off-by: Hyukjin Kwon --- python/pyspark/pandas/series.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py index 95ca92e78787..b54ae88616fa 100644 --- a/python/pyspark/pandas/series.py +++ b/python/pyspark/pandas/series.py @@ -5910,7 +5910,7 @@ class Series(Frame, IndexOpsMixin, Generic[T]): # then return monotonically_increasing_id. This will let max by # to return last index value, which is the behaviour of pandas else spark_column.isNotNull(), -monotonically_increasing_id_column, +F.col(monotonically_increasing_id_column), ), ) for index in where - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch branch-3.4 updated: [SPARK-47824][PS] Fix nondeterminism in pyspark.pandas.series.asof
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new d0fd730839d8 [SPARK-47824][PS] Fix nondeterminism in pyspark.pandas.series.asof d0fd730839d8 is described below commit d0fd730839d8c4351781efb6aee5ff8f7c342ecf Author: Mark Jarvin AuthorDate: Fri Apr 12 09:37:19 2024 +0900 [SPARK-47824][PS] Fix nondeterminism in pyspark.pandas.series.asof ### What changes were proposed in this pull request? Use the monotonically ID as a sorting condition for `max_by` instead of a literal string. ### Why are the changes needed? https://github.com/apache/spark/pull/35191 had a error where the literal string `"__monotonically_increasing_id__"` was used as the tie-breaker in `max_by` instead of the actual ID. ### Does this PR introduce _any_ user-facing change? Fixes nondeterminism in `asof` ### How was this patch tested? In some circumstances `//python:pyspark.pandas.tests.connect.series.test_parity_as_of` is sufficient to reproduce ### Was this patch authored or co-authored using generative AI tooling? No Closes #46018 from markj-db/SPARK-47824. Authored-by: Mark Jarvin Signed-off-by: Hyukjin Kwon (cherry picked from commit a0ccdf27e5ff30817b8f058f08f98d5b44bad2db) Signed-off-by: Hyukjin Kwon --- python/pyspark/pandas/series.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py index 5d6c25eca69e..4e2e3ffbb548 100644 --- a/python/pyspark/pandas/series.py +++ b/python/pyspark/pandas/series.py @@ -5878,7 +5878,7 @@ class Series(Frame, IndexOpsMixin, Generic[T]): # then return monotonically_increasing_id. This will let max by # to return last index value, which is the behaviour of pandas else spark_column.isNotNull(), -monotonically_increasing_id_column, +F.col(monotonically_increasing_id_column), ), ) for index in where - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-47824][PS] Fix nondeterminism in pyspark.pandas.series.asof
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a0ccdf27e5ff [SPARK-47824][PS] Fix nondeterminism in pyspark.pandas.series.asof a0ccdf27e5ff is described below commit a0ccdf27e5ff30817b8f058f08f98d5b44bad2db Author: Mark Jarvin AuthorDate: Fri Apr 12 09:37:19 2024 +0900 [SPARK-47824][PS] Fix nondeterminism in pyspark.pandas.series.asof ### What changes were proposed in this pull request? Use the monotonically ID as a sorting condition for `max_by` instead of a literal string. ### Why are the changes needed? https://github.com/apache/spark/pull/35191 had a error where the literal string `"__monotonically_increasing_id__"` was used as the tie-breaker in `max_by` instead of the actual ID. ### Does this PR introduce _any_ user-facing change? Fixes nondeterminism in `asof` ### How was this patch tested? In some circumstances `//python:pyspark.pandas.tests.connect.series.test_parity_as_of` is sufficient to reproduce ### Was this patch authored or co-authored using generative AI tooling? No Closes #46018 from markj-db/SPARK-47824. Authored-by: Mark Jarvin Signed-off-by: Hyukjin Kwon --- python/pyspark/pandas/series.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py index 98818a368a9f..8edc2c531b51 100644 --- a/python/pyspark/pandas/series.py +++ b/python/pyspark/pandas/series.py @@ -5870,7 +5870,7 @@ class Series(Frame, IndexOpsMixin, Generic[T]): # then return monotonically_increasing_id. This will let max by # to return last index value, which is the behaviour of pandas else spark_column.isNotNull(), -monotonically_increasing_id_column, +F.col(monotonically_increasing_id_column), ), ) for index in where - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-47811][PYTHON][CONNECT][TESTS] Run ML tests for pyspark-connect package
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new aa568354725c [SPARK-47811][PYTHON][CONNECT][TESTS] Run ML tests for pyspark-connect package aa568354725c is described below commit aa568354725ce44fc0261973b97597ab0986edb1 Author: Hyukjin Kwon AuthorDate: Fri Apr 12 09:02:47 2024 +0900 [SPARK-47811][PYTHON][CONNECT][TESTS] Run ML tests for pyspark-connect package ### What changes were proposed in this pull request? This PR proposes to extends `pyspark-connect` scheduled job to run ML tests as well. ### Why are the changes needed? In order to make sure pure Python library works with ML. ### Does this PR introduce _any_ user-facing change? No, test-only. ### How was this patch tested? Tested in my fork: https://github.com/HyukjinKwon/spark/actions/runs/8643632135/job/23697401430 ### Was this patch authored or co-authored using generative AI tooling? No Closes #45941 from HyukjinKwon/test-ps-ci. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- .github/workflows/build_python_connect.yml | 3 +- python/packaging/connect/setup.py | 1 + python/pyspark/ml/connect/classification.py| 1 - python/pyspark/ml/param/__init__.py| 7 +- .../tests/connect/test_connect_classification.py | 10 +- .../ml/tests/connect/test_connect_evaluation.py| 5 +- .../ml/tests/connect/test_connect_feature.py | 5 +- .../ml/tests/connect/test_connect_function.py | 2 + .../ml/tests/connect/test_connect_pipeline.py | 11 +- .../ml/tests/connect/test_connect_summarizer.py| 5 +- .../ml/tests/connect/test_connect_tuning.py| 9 +- .../connect/test_legacy_mode_classification.py | 8 +- .../tests/connect/test_legacy_mode_evaluation.py | 9 +- .../ml/tests/connect/test_legacy_mode_feature.py | 6 +- .../ml/tests/connect/test_legacy_mode_pipeline.py | 6 +- .../tests/connect/test_legacy_mode_summarizer.py | 6 +- .../ml/tests/connect/test_legacy_mode_tuning.py| 9 +- .../tests/connect/test_parity_torch_data_loader.py | 28 ++- .../tests/connect/test_parity_torch_distributor.py | 232 +++-- 19 files changed, 218 insertions(+), 145 deletions(-) diff --git a/.github/workflows/build_python_connect.yml b/.github/workflows/build_python_connect.yml index ec7103e5dbeb..8deee026131e 100644 --- a/.github/workflows/build_python_connect.yml +++ b/.github/workflows/build_python_connect.yml @@ -72,6 +72,7 @@ jobs: python packaging/connect/setup.py sdist cd dist pip install pyspark-connect-*.tar.gz + pip install scikit-learn torch torchvision torcheval - name: Run tests env: SPARK_CONNECT_TESTING_REMOTE: sc://localhost @@ -82,7 +83,7 @@ jobs: # Remove Py4J and PySpark zipped library to make sure there is no JVM connection rm python/lib/* rm -r python/pyspark - ./python/run-tests --parallelism=1 --python-executables=python3 --modules pyspark-connect + ./python/run-tests --parallelism=1 --python-executables=python3 --modules pyspark-connect,pyspark-ml-connect - name: Upload test results to report if: always() uses: actions/upload-artifact@v4 diff --git a/python/packaging/connect/setup.py b/python/packaging/connect/setup.py index 3514e5cdc422..419ed36b4236 100755 --- a/python/packaging/connect/setup.py +++ b/python/packaging/connect/setup.py @@ -77,6 +77,7 @@ if "SPARK_TESTING" in os.environ: "pyspark.sql.tests.connect.shell", "pyspark.sql.tests.pandas", "pyspark.sql.tests.streaming", +"pyspark.ml.tests.connect", ] try: diff --git a/python/pyspark/ml/connect/classification.py b/python/pyspark/ml/connect/classification.py index 8d8c6227eac3..fc7b5cda88a2 100644 --- a/python/pyspark/ml/connect/classification.py +++ b/python/pyspark/ml/connect/classification.py @@ -320,7 +320,6 @@ class LogisticRegressionModel( def _get_transform_fn(self) -> Callable[["pd.Series"], Any]: import torch - import torch.nn as torch_nn model_state_dict = self.torch_model.state_dict() diff --git a/python/pyspark/ml/param/__init__.py b/python/pyspark/ml/param/__init__.py index 345b7f7a5964..f32ead2a580c 100644 --- a/python/pyspark/ml/param/__init__.py +++ b/python/pyspark/ml/param/__init__.py @@ -30,8 +30,8 @@ from typing import ( ) import numpy as np -from py4j.java_gateway import JavaObject +from pyspark.util import is_remote_only from pyspark.ml.linalg import DenseVector, Vector, Matrix from pyspark.ml.util impor
(spark) branch master updated (c303b042966b -> 6fdf9c9df545)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from c303b042966b [SPARK-47808][PYTHON][ML][TESTS] Make pyspark.ml.connect tests running without optional dependencies add 6fdf9c9df545 [SPARK-47807][PYTHON][ML] Make pyspark.ml compatible with pyspark-connect No new revisions were added by this update. Summary of changes: python/pyspark/ml/classification.py | 11 ++--- python/pyspark/ml/clustering.py | 3 ++- python/pyspark/ml/common.py | 45 + python/pyspark/ml/feature.py| 8 ++- python/pyspark/ml/functions.py | 5 - python/pyspark/ml/image.py | 6 - python/pyspark/ml/pipeline.py | 9 +--- python/pyspark/ml/stat.py | 10 - python/pyspark/ml/tuning.py | 17 +++--- python/pyspark/ml/util.py | 15 - python/pyspark/ml/wrapper.py| 17 +- 11 files changed, 111 insertions(+), 35 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-47808][PYTHON][ML][TESTS] Make pyspark.ml.connect tests running without optional dependencies
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new c303b042966b [SPARK-47808][PYTHON][ML][TESTS] Make pyspark.ml.connect tests running without optional dependencies c303b042966b is described below commit c303b042966bb3851da6649fc1d7f03de5db20be Author: Hyukjin Kwon AuthorDate: Thu Apr 11 16:42:23 2024 +0900 [SPARK-47808][PYTHON][ML][TESTS] Make pyspark.ml.connect tests running without optional dependencies ### What changes were proposed in this pull request? This PR makes `pyspark.ml.connect` tests running without optional dependencies. ### Why are the changes needed? Optional dependencies should not stop the tests. ### Does this PR introduce _any_ user-facing change? No, test-only. ### How was this patch tested? Will be tested together in https://github.com/apache/spark/pull/45941 ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45996 from HyukjinKwon/SPARK-47808. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- python/pyspark/ml/connect/classification.py | 16 ++-- .../ml/tests/connect/test_connect_classification.py | 4 +++- python/pyspark/ml/tests/connect/test_connect_feature.py | 13 - python/pyspark/ml/tests/connect/test_connect_pipeline.py | 15 +-- 4 files changed, 42 insertions(+), 6 deletions(-) diff --git a/python/pyspark/ml/connect/classification.py b/python/pyspark/ml/connect/classification.py index 8b816f51ca27..8d8c6227eac3 100644 --- a/python/pyspark/ml/connect/classification.py +++ b/python/pyspark/ml/connect/classification.py @@ -17,8 +17,6 @@ from typing import Any, Dict, Union, List, Tuple, Callable, Optional import math -import torch -import torch.nn as torch_nn import numpy as np import pandas as pd @@ -87,6 +85,8 @@ def _train_logistic_regression_model_worker_fn( seed: int, ) -> Any: from pyspark.ml.torch.distributor import _get_spark_partition_data_loader +import torch +import torch.nn as torch_nn from torch.nn.parallel import DistributedDataParallel as DDP import torch.distributed import torch.optim as optim @@ -216,6 +216,9 @@ class LogisticRegression( self._set(**kwargs) def _fit(self, dataset: Union[DataFrame, pd.DataFrame]) -> "LogisticRegressionModel": +import torch +import torch.nn as torch_nn + if isinstance(dataset, pd.DataFrame): # TODO: support pandas dataframe fitting raise NotImplementedError("Fitting pandas dataframe is not supported yet.") @@ -316,6 +319,10 @@ class LogisticRegressionModel( return output_cols def _get_transform_fn(self) -> Callable[["pd.Series"], Any]: +import torch + +import torch.nn as torch_nn + model_state_dict = self.torch_model.state_dict() num_features = self.num_features num_classes = self.num_classes @@ -357,6 +364,9 @@ class LogisticRegressionModel( return self.__class__.__name__ + ".torch" def _save_core_model(self, path: str) -> None: +import torch +import torch.nn as torch_nn + lor_torch_model = torch_nn.Sequential( self.torch_model, torch_nn.Softmax(dim=1), @@ -364,6 +374,8 @@ class LogisticRegressionModel( torch.save(lor_torch_model, path) def _load_core_model(self, path: str) -> None: +import torch + lor_torch_model = torch.load(path) self.torch_model = lor_torch_model[0] diff --git a/python/pyspark/ml/tests/connect/test_connect_classification.py b/python/pyspark/ml/tests/connect/test_connect_classification.py index 1f811c774cbd..ebc1745874d9 100644 --- a/python/pyspark/ml/tests/connect/test_connect_classification.py +++ b/python/pyspark/ml/tests/connect/test_connect_classification.py @@ -21,6 +21,7 @@ import unittest from pyspark.sql import SparkSession from pyspark.testing.connectutils import should_test_connect, connect_requirement_message +torch_requirement_message = "torch is required" have_torch = True try: import torch # noqa: F401 @@ -32,7 +33,8 @@ if should_test_connect: @unittest.skipIf( -not should_test_connect or not have_torch, connect_requirement_message or "torch is required" +not should_test_connect or not have_torch, +connect_requirement_message or torch_requirement_message, ) class ClassificationTestsOnConnect(ClassificationTestsMixin, unittest.TestCase): def setUp(self) -> None: diff --git a/python/pyspark/ml/tests/connect/test_connect_feature.py b/python/pyspark/ml/tests/connect/test_connect_feature.py
(spark) branch branch-3.4 updated: [MINOR][DOCS] Clarify relation between grouping API and `spark.sql.execution.arrow.maxRecordsPerBatch`
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new e94bb505b897 [MINOR][DOCS] Clarify relation between grouping API and `spark.sql.execution.arrow.maxRecordsPerBatch` e94bb505b897 is described below commit e94bb505b897fe5fd8f91fb680a4989cd1fe72fe Author: Hyukjin Kwon AuthorDate: Thu Apr 11 11:25:49 2024 +0900 [MINOR][DOCS] Clarify relation between grouping API and `spark.sql.execution.arrow.maxRecordsPerBatch` ### What changes were proposed in this pull request? This PR fixes the documentation of `spark.sql.execution.arrow.maxRecordsPerBatch` to clarify the relation between `spark.sql.execution.arrow.maxRecordsPerBatch` and grouping API such as `DataFrame(.cogroup).groupby.applyInPandas`. ### Why are the changes needed? To address confusion about them. ### Does this PR introduce _any_ user-facing change? Yes, it fixes the user-facing SQL configuration page https://spark.apache.org/docs/latest/configuration.html#runtime-sql-configuration ### How was this patch tested? CI in this PR should verify them. I ran linters. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45993 from HyukjinKwon/minor-doc-change. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon (cherry picked from commit 6c8e4cfd6f3f95455b0d4479f2527d425349f1cf) Signed-off-by: Hyukjin Kwon --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala| 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 951a54a15cbc..be9a7c82828e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2800,7 +2800,9 @@ object SQLConf { val ARROW_EXECUTION_MAX_RECORDS_PER_BATCH = buildConf("spark.sql.execution.arrow.maxRecordsPerBatch") .doc("When using Apache Arrow, limit the maximum number of records that can be written " + -"to a single ArrowRecordBatch in memory. If set to zero or negative there is no limit.") +"to a single ArrowRecordBatch in memory. This configuration is not effective for the " + +"grouping API such as DataFrame(.cogroup).groupby.applyInPandas because each group " + +"becomes each ArrowRecordBatch. If set to zero or negative there is no limit.") .version("2.3.0") .intConf .createWithDefault(1) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch branch-3.5 updated: [MINOR][DOCS] Clarify relation between grouping API and `spark.sql.execution.arrow.maxRecordsPerBatch`
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 0818ce7eac97 [MINOR][DOCS] Clarify relation between grouping API and `spark.sql.execution.arrow.maxRecordsPerBatch` 0818ce7eac97 is described below commit 0818ce7eac974a93c684760b0f46ac71a74f63e1 Author: Hyukjin Kwon AuthorDate: Thu Apr 11 11:25:49 2024 +0900 [MINOR][DOCS] Clarify relation between grouping API and `spark.sql.execution.arrow.maxRecordsPerBatch` ### What changes were proposed in this pull request? This PR fixes the documentation of `spark.sql.execution.arrow.maxRecordsPerBatch` to clarify the relation between `spark.sql.execution.arrow.maxRecordsPerBatch` and grouping API such as `DataFrame(.cogroup).groupby.applyInPandas`. ### Why are the changes needed? To address confusion about them. ### Does this PR introduce _any_ user-facing change? Yes, it fixes the user-facing SQL configuration page https://spark.apache.org/docs/latest/configuration.html#runtime-sql-configuration ### How was this patch tested? CI in this PR should verify them. I ran linters. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45993 from HyukjinKwon/minor-doc-change. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon (cherry picked from commit 6c8e4cfd6f3f95455b0d4479f2527d425349f1cf) Signed-off-by: Hyukjin Kwon --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala| 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 2e41374035c8..3e62f656ac9e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2878,7 +2878,9 @@ object SQLConf { val ARROW_EXECUTION_MAX_RECORDS_PER_BATCH = buildConf("spark.sql.execution.arrow.maxRecordsPerBatch") .doc("When using Apache Arrow, limit the maximum number of records that can be written " + -"to a single ArrowRecordBatch in memory. If set to zero or negative there is no limit.") +"to a single ArrowRecordBatch in memory. This configuration is not effective for the " + +"grouping API such as DataFrame(.cogroup).groupby.applyInPandas because each group " + +"becomes each ArrowRecordBatch. If set to zero or negative there is no limit.") .version("2.3.0") .intConf .createWithDefault(1) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [MINOR][DOCS] Clarify relation between grouping API and `spark.sql.execution.arrow.maxRecordsPerBatch`
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 6c8e4cfd6f3f [MINOR][DOCS] Clarify relation between grouping API and `spark.sql.execution.arrow.maxRecordsPerBatch` 6c8e4cfd6f3f is described below commit 6c8e4cfd6f3f95455b0d4479f2527d425349f1cf Author: Hyukjin Kwon AuthorDate: Thu Apr 11 11:25:49 2024 +0900 [MINOR][DOCS] Clarify relation between grouping API and `spark.sql.execution.arrow.maxRecordsPerBatch` ### What changes were proposed in this pull request? This PR fixes the documentation of `spark.sql.execution.arrow.maxRecordsPerBatch` to clarify the relation between `spark.sql.execution.arrow.maxRecordsPerBatch` and grouping API such as `DataFrame(.cogroup).groupby.applyInPandas`. ### Why are the changes needed? To address confusion about them. ### Does this PR introduce _any_ user-facing change? Yes, it fixes the user-facing SQL configuration page https://spark.apache.org/docs/latest/configuration.html#runtime-sql-configuration ### How was this patch tested? CI in this PR should verify them. I ran linters. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45993 from HyukjinKwon/minor-doc-change. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala| 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 27fba0b19f48..55d8b61f8b94 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3016,7 +3016,9 @@ object SQLConf { val ARROW_EXECUTION_MAX_RECORDS_PER_BATCH = buildConf("spark.sql.execution.arrow.maxRecordsPerBatch") .doc("When using Apache Arrow, limit the maximum number of records that can be written " + -"to a single ArrowRecordBatch in memory. If set to zero or negative there is no limit.") +"to a single ArrowRecordBatch in memory. This configuration is not effective for the " + +"grouping API such as DataFrame(.cogroup).groupby.applyInPandas because each group " + +"becomes each ArrowRecordBatch. If set to zero or negative there is no limit.") .version("2.3.0") .intConf .createWithDefault(1) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch branch-3.5 updated: [SPARK-47704][SQL] JSON parsing fails with "java.lang.ClassCastException" when spark.sql.json.enablePartialResults is enabled
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 8a77a012cd6d [SPARK-47704][SQL] JSON parsing fails with "java.lang.ClassCastException" when spark.sql.json.enablePartialResults is enabled 8a77a012cd6d is described below commit 8a77a012cd6d1d3057bb7f1340850cf567b8a6ed Author: Ivan Sadikov AuthorDate: Thu Apr 11 10:50:11 2024 +0900 [SPARK-47704][SQL] JSON parsing fails with "java.lang.ClassCastException" when spark.sql.json.enablePartialResults is enabled This PR fixes a bug that was introduced in [SPARK-47704](https://issues.apache.org/jira/browse/SPARK-47704). To be precise, SPARK-47704 missed this corner case because I could not find a small stable repro for the problem at the time. When `spark.sql.json.enablePartialResults` is enabled (which is the default), if a user tries to read `{"a":[{"key":{"b":0}}]}` with the code: ```scala val df = spark.read .schema("a array>>") .json(path) ``` exception is thrown: ``` java.lang.ClassCastException: class org.apache.spark.sql.catalyst.util.ArrayBasedMapData cannot be cast to class org.apache.spark.sql.catalyst.util.ArrayData (org.apache.spark.sql.catalyst.util.ArrayBasedMapData and org.apache.spark.sql.catalyst.util.ArrayData are in unnamed module of loader 'app') at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getArray(rows.scala:53) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getArray$(rows.scala:53) at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getArray(rows.scala:172) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:605) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.$anonfun$prepareNextFile$1(FileScanRDD.scala:884) at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659) ``` The same happens when map and array are reversed: `{"a":{"key":[{"b":0}]}}`: ```scala val df = spark.read .schema("a map>>") .json(path) ``` In both cases, we should partially parse the record, only struct with boolean type cannot be parsed: - `Row(Array(Map("key" -> Row(null` in the first case. - `Row(Map("key" -> Array(Row(null` in the second case. We simply did not handle all of the partial results exceptions when converting array and map, instead of catching `PartialResultException` which is only for structs. Instead, we should catch `PartialValueException` that covers struct, map, and array. Fixes a bug where user would encounter an exception instead of reading a partially parsed JSON record. No. I added unit tests that verify the fix. No. Closes #45833 from sadikovi/SPARK-47704. Authored-by: Ivan Sadikov Signed-off-by: Hyukjin Kwon (cherry picked from commit a2b7050e0fc5db6ac98db57309e4737acd26bf3a) Signed-off-by: Hyukjin Kwon --- .../spark/sql/catalyst/json/JacksonParser.scala| 12 +++--- .../sql/execution/datasources/json/JsonSuite.scala | 44 ++ 2 files changed, 50 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index f14f70532e65..3f6ea9a174c0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -497,9 +497,9 @@ class JacksonParser( try { values += fieldConverter.apply(parser) } catch { -case PartialResultException(row, cause) if enablePartialResults => - badRecordException = badRecordException.orElse(Some(cause)) - values += row +case err: PartialValueException if enablePartialResults => + badRecordException = badRecordException.orElse(Some(err.cause)) + values += err.partialResult case NonFatal(e) if enablePartialResults => badRecordException = badRecordException.orElse(Some(e)) parser.skipChildren() @@ -534,9 +534,9 @@ class JacksonParse
(spark) branch master updated: [SPARK-47704][SQL] JSON parsing fails with "java.lang.ClassCastException" when spark.sql.json.enablePartialResults is enabled
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a2b7050e0fc5 [SPARK-47704][SQL] JSON parsing fails with "java.lang.ClassCastException" when spark.sql.json.enablePartialResults is enabled a2b7050e0fc5 is described below commit a2b7050e0fc5db6ac98db57309e4737acd26bf3a Author: Ivan Sadikov AuthorDate: Thu Apr 11 10:50:11 2024 +0900 [SPARK-47704][SQL] JSON parsing fails with "java.lang.ClassCastException" when spark.sql.json.enablePartialResults is enabled ### What changes were proposed in this pull request? This PR fixes a bug that was introduced in [SPARK-47704](https://issues.apache.org/jira/browse/SPARK-47704). To be precise, SPARK-47704 missed this corner case because I could not find a small stable repro for the problem at the time. When `spark.sql.json.enablePartialResults` is enabled (which is the default), if a user tries to read `{"a":[{"key":{"b":0}}]}` with the code: ```scala val df = spark.read .schema("a array>>") .json(path) ``` exception is thrown: ``` java.lang.ClassCastException: class org.apache.spark.sql.catalyst.util.ArrayBasedMapData cannot be cast to class org.apache.spark.sql.catalyst.util.ArrayData (org.apache.spark.sql.catalyst.util.ArrayBasedMapData and org.apache.spark.sql.catalyst.util.ArrayData are in unnamed module of loader 'app') at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getArray(rows.scala:53) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getArray$(rows.scala:53) at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getArray(rows.scala:172) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:605) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.$anonfun$prepareNextFile$1(FileScanRDD.scala:884) at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659) ``` The same happens when map and array are reversed: `{"a":{"key":[{"b":0}]}}`: ```scala val df = spark.read .schema("a map>>") .json(path) ``` In both cases, we should partially parse the record, only struct with boolean type cannot be parsed: - `Row(Array(Map("key" -> Row(null` in the first case. - `Row(Map("key" -> Array(Row(null` in the second case. We simply did not handle all of the partial results exceptions when converting array and map, instead of catching `PartialResultException` which is only for structs. Instead, we should catch `PartialValueException` that covers struct, map, and array. ### Why are the changes needed? Fixes a bug where user would encounter an exception instead of reading a partially parsed JSON record. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I added unit tests that verify the fix. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45833 from sadikovi/SPARK-47704. Authored-by: Ivan Sadikov Signed-off-by: Hyukjin Kwon --- .../spark/sql/catalyst/json/JacksonParser.scala| 12 +++--- .../sql/execution/datasources/json/JsonSuite.scala | 44 ++ 2 files changed, 50 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index a16a23cf0049..d3f33a70323f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -497,9 +497,9 @@ class JacksonParser( try { values += fieldConverter.apply(parser) } catch { -case PartialResultException(row, cause) if enablePartialResults => - badRecordException = badRecordException.orElse(Some(cause)) - values += row +case err: PartialValueException if enablePartialResults => + badRecordException = badRecordException.orElse(Some(err.cause)) + values += err.partialResult case NonFatal(e) if enabl
(spark) branch master updated: [SPARK-47725][INFRA][FOLLOW-UP] Do not run scheduled job in forked repository
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 8da9a70f701a [SPARK-47725][INFRA][FOLLOW-UP] Do not run scheduled job in forked repository 8da9a70f701a is described below commit 8da9a70f701a2860c55a86e39a2425f7f4a3afd9 Author: Hyukjin Kwon AuthorDate: Thu Apr 11 09:46:00 2024 +0900 [SPARK-47725][INFRA][FOLLOW-UP] Do not run scheduled job in forked repository ### What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/45870 that skips the run in forked repository. ### Why are the changes needed? For consistency, and to save resources in forked repository by default. ### Does this PR introduce _any_ user-facing change? No, test-only. ### How was this patch tested? Should be tested in individual forked repository. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45992 from HyukjinKwon/SPARK-47725-followup. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- .github/workflows/build_python_connect.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/build_python_connect.yml b/.github/workflows/build_python_connect.yml index 2f80eac9624f..ec7103e5dbeb 100644 --- a/.github/workflows/build_python_connect.yml +++ b/.github/workflows/build_python_connect.yml @@ -29,6 +29,7 @@ jobs: name: "Build modules: pyspark-connect" runs-on: ubuntu-latest timeout-minutes: 300 +if: github.repository == 'apache/spark' steps: - name: Checkout Spark repository uses: actions/checkout@v4 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-47366][SQL][PYTHON] Add VariantVal for PySpark
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f6d5ad3ec75b [SPARK-47366][SQL][PYTHON] Add VariantVal for PySpark f6d5ad3ec75b is described below commit f6d5ad3ec75be63472c6b21dda959972f5360ef2 Author: Gene Pang AuthorDate: Thu Apr 11 09:16:10 2024 +0900 [SPARK-47366][SQL][PYTHON] Add VariantVal for PySpark ### What changes were proposed in this pull request? Add a `VariantVal` implementation for PySpark. It includes convenience methods to convert the Variant to a string, or to a Python object. ### Why are the changes needed? Allows users to work with Variant data more conveniently. ### Does this PR introduce _any_ user-facing change? This is new PySpark functionality to allow users to work with Variant data. ### How was this patch tested? Added unit tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes #45826 from gene-db/variant-pyspark. Lead-authored-by: Gene Pang Co-authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- .../source/reference/pyspark.sql/core_classes.rst | 1 + python/docs/source/reference/pyspark.sql/index.rst | 1 + .../pyspark.sql/{index.rst => variant_val.rst} | 32 +- python/pyspark/sql/__init__.py | 3 +- python/pyspark/sql/connect/conversion.py | 40 +++ python/pyspark/sql/pandas/types.py | 22 ++ python/pyspark/sql/tests/test_types.py | 64 python/pyspark/sql/types.py| 59 +++- python/pyspark/sql/variant_utils.py| 388 + .../org/apache/spark/sql/util/ArrowUtils.scala | 10 + .../spark/sql/execution/arrow/ArrowWriter.scala| 27 +- 11 files changed, 614 insertions(+), 33 deletions(-) diff --git a/python/docs/source/reference/pyspark.sql/core_classes.rst b/python/docs/source/reference/pyspark.sql/core_classes.rst index 65096da21de5..d3dbbc129cb7 100644 --- a/python/docs/source/reference/pyspark.sql/core_classes.rst +++ b/python/docs/source/reference/pyspark.sql/core_classes.rst @@ -49,3 +49,4 @@ Core Classes datasource.DataSourceRegistration datasource.InputPartition datasource.WriterCommitMessage +VariantVal diff --git a/python/docs/source/reference/pyspark.sql/index.rst b/python/docs/source/reference/pyspark.sql/index.rst index 9322a91fba25..93901ab7ce12 100644 --- a/python/docs/source/reference/pyspark.sql/index.rst +++ b/python/docs/source/reference/pyspark.sql/index.rst @@ -41,5 +41,6 @@ This page gives an overview of all public Spark SQL API. observation udf udtf +variant_val protobuf datasource diff --git a/python/docs/source/reference/pyspark.sql/index.rst b/python/docs/source/reference/pyspark.sql/variant_val.rst similarity index 70% copy from python/docs/source/reference/pyspark.sql/index.rst copy to python/docs/source/reference/pyspark.sql/variant_val.rst index 9322a91fba25..a7f592c18e3a 100644 --- a/python/docs/source/reference/pyspark.sql/index.rst +++ b/python/docs/source/reference/pyspark.sql/variant_val.rst @@ -16,30 +16,12 @@ under the License. -= -Spark SQL -= +== +VariantVal +== +.. currentmodule:: pyspark.sql -This page gives an overview of all public Spark SQL API. +.. autosummary:: +:toctree: api/ -.. toctree:: -:maxdepth: 2 - -core_classes -spark_session -configuration -io -dataframe -column -data_types -row -functions -window -grouping -catalog -avro -observation -udf -udtf -protobuf -datasource +VariantVal.toPython diff --git a/python/pyspark/sql/__init__.py b/python/pyspark/sql/__init__.py index dd82b037a6b9..bc046da81d27 100644 --- a/python/pyspark/sql/__init__.py +++ b/python/pyspark/sql/__init__.py @@ -39,7 +39,7 @@ Important classes of Spark SQL and DataFrames: - :class:`pyspark.sql.Window` For working with window functions. """ -from pyspark.sql.types import Row +from pyspark.sql.types import Row, VariantVal from pyspark.sql.context import SQLContext, HiveContext, UDFRegistration, UDTFRegistration from pyspark.sql.session import SparkSession from pyspark.sql.column import Column @@ -67,6 +67,7 @@ __all__ = [ "Row", "DataFrameNaFunctions", "DataFrameStatFunctions", +"VariantVal", "Window", "WindowSpec", "DataFrameReader", diff --git a/python/pyspark/sql/connect/conversion.py b/python/pyspark/sql/connect/conversion.py index c86ee9c75fec..9b1007c41f9c 100644 --- a/python/pyspark/sql/connect/conversion.py +++ b/python/pyspark/sql/
(spark) branch master updated: [SPARK-41811][PYTHON][CONNECT] Implement `SQLStringFormatter` with `WithRelations`
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a100e11936bc [SPARK-41811][PYTHON][CONNECT] Implement `SQLStringFormatter` with `WithRelations` a100e11936bc is described below commit a100e11936bcd92ac091abe94221c1b669811efa Author: Ruifeng Zheng AuthorDate: Thu Apr 11 09:06:23 2024 +0900 [SPARK-41811][PYTHON][CONNECT] Implement `SQLStringFormatter` with `WithRelations` ### What changes were proposed in this pull request? Implement `SQLStringFormatter` for Python Client ### Why are the changes needed? for parity ### Does this PR introduce _any_ user-facing change? yes, new feature ``` In [1]: mydf = spark.range(10) In [2]: spark.sql("SELECT {col} FROM {mydf} WHERE id IN {x}", col=mydf.id, mydf=mydf, x=tuple(range(4))).show() +---+ | id| +---+ | 0| | 1| | 2| | 3| +---+ ``` ### How was this patch tested? enabled doc tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #45614 from zhengruifeng/connect_sql_str_fmt_with_relations. Authored-by: Ruifeng Zheng Signed-off-by: Hyukjin Kwon --- .../src/main/protobuf/spark/connect/commands.proto | 9 +- .../main/protobuf/spark/connect/relations.proto| 18 ++ .../sql/connect/planner/SparkConnectPlanner.scala | 149 --- python/pyspark/sql/connect/plan.py | 56 +++- python/pyspark/sql/connect/proto/commands_pb2.py | 190 ++--- python/pyspark/sql/connect/proto/commands_pb2.pyi | 10 + python/pyspark/sql/connect/proto/relations_pb2.py | 298 +++-- python/pyspark/sql/connect/proto/relations_pb2.pyi | 48 python/pyspark/sql/connect/session.py | 23 +- python/pyspark/sql/{ => connect}/sql_formatter.py | 45 ++-- python/pyspark/sql/session.py | 4 +- python/pyspark/sql/sql_formatter.py| 4 +- .../pyspark/sql/tests/connect/test_connect_plan.py | 2 +- python/pyspark/sql/utils.py| 7 + 14 files changed, 539 insertions(+), 324 deletions(-) diff --git a/connector/connect/common/src/main/protobuf/spark/connect/commands.proto b/connector/connect/common/src/main/protobuf/spark/connect/commands.proto index e0ccf01fe92e..acff0a2089e9 100644 --- a/connector/connect/common/src/main/protobuf/spark/connect/commands.proto +++ b/connector/connect/common/src/main/protobuf/spark/connect/commands.proto @@ -61,7 +61,7 @@ message Command { // almost oblivious to the server-side behavior. message SqlCommand { // (Required) SQL Query. - string sql = 1; + string sql = 1 [deprecated=true]; // (Optional) A map of parameter names to literal expressions. map args = 2 [deprecated=true]; @@ -71,11 +71,14 @@ message SqlCommand { // (Optional) A map of parameter names to expressions. // It cannot coexist with `pos_arguments`. - map named_arguments = 4; + map named_arguments = 4 [deprecated=true]; // (Optional) A sequence of expressions for positional parameters in the SQL query text. // It cannot coexist with `named_arguments`. - repeated Expression pos_arguments = 5; + repeated Expression pos_arguments = 5 [deprecated=true]; + + // (Optional) The relation that this SQL command will be built on. + Relation input = 6; } // A command that can create DataFrame global temp view or local temp view. diff --git a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto index 4d4324ed340b..5cbe6459d226 100644 --- a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto +++ b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto @@ -75,6 +75,7 @@ message Relation { CommonInlineUserDefinedTableFunction common_inline_user_defined_table_function = 38; AsOfJoin as_of_join = 39; CommonInlineUserDefinedDataSource common_inline_user_defined_data_source = 40; +WithRelations with_relations = 41; // NA functions NAFill fill_na = 90; @@ -133,6 +134,23 @@ message SQL { repeated Expression pos_arguments = 5; } +// Relation of type [[WithRelations]]. +// +// This relation contains a root plan, and one or more references that are used by the root plan. +// There are two ways of referencing a relation, by name (through a subquery alias), or by plan_id +// (using RelationCommon.plan_id). +// +// This relation can be used to implement CTEs, describe DAGs, or to reduce tree depth. +message WithRelations { + // (Required) Plan at the root of the query tree. This plan is expected to contain one or more + // references. Those references get expanded later on
(spark-connect-go) branch master updated: [MINOR] Make readme easier to follow
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark-connect-go.git The following commit(s) were added to refs/heads/master by this push: new f7ad518 [MINOR] Make readme easier to follow f7ad518 is described below commit f7ad5188552c4f0c78c2dc1ad6f24c1977583d5c Author: Matthew Powers AuthorDate: Thu Apr 11 09:05:36 2024 +0900 [MINOR] Make readme easier to follow ### What changes were proposed in this pull request? Update the README to make it easier to follow. ### Why are the changes needed? I tried to get spark-connect-go running locally and it was a little confusing. This new layout should make the setup steps a lot clearer. ### Does this PR introduce _any_ user-facing change? Just updates the README. ### How was this patch tested? N/A. Closes #18 from MrPowers/update-readme. Authored-by: Matthew Powers Signed-off-by: Hyukjin Kwon --- README.md | 55 ++- 1 file changed, 22 insertions(+), 33 deletions(-) diff --git a/README.md b/README.md index 8b15743..7832edb 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,6 @@ This project houses the **experimental** client for [Spark Connect](https://spark.apache.org/docs/latest/spark-connect-overview.html) for [Apache Spark](https://spark.apache.org/) written in [Golang](https://go.dev/). - ## Current State of the Project Currently, the Spark Connect client for Golang is highly experimental and should @@ -13,33 +12,42 @@ project reserves the right to withdraw and abandon the development of this proje if it is not sustainable. ## Getting started + +This section explains how to run Spark Connect Go locally. + +Step 1: Install Golang: https://go.dev/doc/install. + +Step 2: Ensure you have installed `buf CLI` installed, [more info here](https://buf.build/docs/installation/) + +Step 3: Run the following commands to setup the Spark Connect client. + ``` git clone https://github.com/apache/spark-connect-go.git git submodule update --init --recursive make gen && make test ``` -> Ensure you have installed `buf CLI`; [more info](https://buf.build/docs/installation/) -## How to write Spark Connect Go Application in your own project +Step 4: Setup the Spark Driver on localhost. -See [Quick Start Guide](quick-start.md) +1. [Download Spark distribution](https://spark.apache.org/downloads.html) (3.4.0+), unzip the package. -## Spark Connect Go Application Example +2. Start the Spark Connect server with the following command (make sure to use a package version that matches your Spark distribution): -A very simple example in Go looks like following: +``` +sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.4.0 +``` + +Step 5: Run the example Go application. ``` -func main() { - remote := "localhost:15002" - spark, _ := sql.SparkSession.Builder.Remote(remote).Build() - defer spark.Stop() - - df, _ := spark.Sql("select 'apple' as word, 123 as count union all select 'orange' as word, 456 as count") - df.Show(100, false) -} +go run cmd/spark-connect-example-spark-session/main.go ``` +## How to write Spark Connect Go Application in your own project + +See [Quick Start Guide](quick-start.md) + ## High Level Design Following [diagram](https://textik.com/#ac299c8f32c4c342) shows main code in current prototype: @@ -66,7 +74,6 @@ Following [diagram](https://textik.com/#ac299c8f32c4c342) shows main code in cur | SparkConnectServiceClient |--+| Spark Driver | | | || +---+ ++ - ``` `SparkConnectServiceClient` is GRPC client which talks to Spark Driver. `sparkSessionImpl` generates `dataFrameImpl` @@ -75,24 +82,6 @@ instances. `dataFrameImpl` uses the GRPC client in `sparkSessionImpl` to communi We will mimic the logic in Spark Connect Scala implementation, and adopt Go common practices, e.g. returning `error` object for error handling. -## How to Run Spark Connect Go Application - -1. Install Golang: https://go.dev/doc/install. - -2. Download Spark distribution (3.4.0+), unzip the folder. - -3. Start Spark Connect server by running command: - -``` -sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.4.0 -``` - -4. In this repo, run Go application: - -``` -go run cmd/spark-connect-example-spark-session/main.go -``` - ## Contributing Please review the [Contribution to Spark guide](https://spark.apache.org/contributing.html) - To unsubscribe, e-mail: commits-unsubscr...@
(spark) branch master updated: [SPARK-47081][CONNECT][FOLLOW] Improving the usability of the Progress Handler
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 4a3fd8f7e69e [SPARK-47081][CONNECT][FOLLOW] Improving the usability of the Progress Handler 4a3fd8f7e69e is described below commit 4a3fd8f7e69e5d0cea52fae120348973bffbb738 Author: Martin Grund AuthorDate: Tue Apr 9 16:58:47 2024 +0900 [SPARK-47081][CONNECT][FOLLOW] Improving the usability of the Progress Handler ### What changes were proposed in this pull request? This patch improves the usability of the progress handler by making sure that an update to the client is sent on every wakeup interval from the server (and not only when a task is finished). The class managing the progress is now usable as a context manager and I've added the progress reporting to more RPC calls to the server. In addition, it adds the operation ID to the progress handler notify message so that the callback can differentiate between multiple concurrent queries. ```python def progress_handler(stages, inflight_tasks, operation_id): print(f"Operation {operation_id}: {inflight_tasks} inflight tasks") spark.registerProgressHandler(progress_handler) ``` ### Why are the changes needed? Usability ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added Tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #45907 from grundprinzip/SPARK-47081_2. Lead-authored-by: Martin Grund Co-authored-by: Martin Grund Signed-off-by: Hyukjin Kwon --- .../scala/org/apache/spark/sql/SparkSession.scala | 5 +- .../ConnectProgressExecutionListener.scala | 10 ++-- .../execution/ExecuteGrpcResponseSender.scala | 9 ++-- .../ConnectProgressExecutionListenerSuite.scala| 12 ++--- .../connect/planner/SparkConnectServiceSuite.scala | 39 +--- python/pyspark/sql/connect/client/core.py | 54 +++--- python/pyspark/sql/connect/shell/progress.py | 37 --- .../sql/tests/connect/shell/test_progress.py | 31 - 8 files changed, 131 insertions(+), 66 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala index 1e467a864442..5a2d9bc44c9f 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -567,8 +567,9 @@ class SparkSession private[sql] ( private[sql] def execute(command: proto.Command): Seq[ExecutePlanResponse] = { val plan = proto.Plan.newBuilder().setCommand(command).build() -// .toSeq forces that the iterator is consumed and closed -client.execute(plan).toSeq +// .toSeq forces that the iterator is consumed and closed. On top, ignore all +// progress messages. +client.execute(plan).filter(!_.hasExecutionProgress).toSeq } private[sql] def registerUdf(udf: proto.CommonInlineUserDefinedFunction): Unit = { diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ConnectProgressExecutionListener.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ConnectProgressExecutionListener.scala index 954956363505..a1881765a416 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ConnectProgressExecutionListener.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ConnectProgressExecutionListener.scala @@ -82,10 +82,14 @@ private[connect] class ConnectProgressExecutionListener extends SparkListener wi * * If the tracker was marked as dirty, the state is reset after. */ -def yieldWhenDirty(thunk: (Seq[StageInfo], Long) => Unit): Unit = { - if (dirty.get()) { +def yieldWhenDirty(force: Boolean = false)(thunk: (Seq[StageInfo], Long) => Unit): Unit = { + if (force) { thunk(stages.values.toSeq, inFlightTasks.get()) -dirty.set(false) + } else { +if (dirty.get()) { + thunk(stages.values.toSeq, inFlightTasks.get()) + dirty.set(false) +} } } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala index a9444862b3aa..4b95f38c6695 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala +++ b/c
(spark) branch master updated: [MINOR][TESTS] Make `check_invalid_args` reusable in parity test
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new fea1994fe756 [MINOR][TESTS] Make `check_invalid_args` reusable in parity test fea1994fe756 is described below commit fea1994fe756c6bc385c2f9769acc6c82f3a2a68 Author: Ruifeng Zheng AuthorDate: Tue Apr 9 10:51:04 2024 +0900 [MINOR][TESTS] Make `check_invalid_args` reusable in parity test ### What changes were proposed in this pull request? Make `check_invalid_args` reusable in parity test ### Why are the changes needed? test coverage ### Does this PR introduce _any_ user-facing change? no, test only ### How was this patch tested? ci ### Was this patch authored or co-authored using generative AI tooling? no Closes #45928 from zhengruifeng/enable_test_invalid_args. Authored-by: Ruifeng Zheng Signed-off-by: Hyukjin Kwon --- .../sql/tests/connect/test_parity_pandas_udf_window.py| 11 +-- python/pyspark/sql/tests/pandas/test_pandas_udf_window.py | 2 +- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql/tests/connect/test_parity_pandas_udf_window.py b/python/pyspark/sql/tests/connect/test_parity_pandas_udf_window.py index 98ed2a23df30..b2288c9d949e 100644 --- a/python/pyspark/sql/tests/connect/test_parity_pandas_udf_window.py +++ b/python/pyspark/sql/tests/connect/test_parity_pandas_udf_window.py @@ -20,12 +20,11 @@ from pyspark.sql.tests.pandas.test_pandas_udf_window import WindowPandasUDFTests from pyspark.testing.connectutils import ReusedConnectTestCase -class PandasUDFWindowParityTests(WindowPandasUDFTestsMixin, ReusedConnectTestCase): -# TODO(SPARK-43734): Expression "(v)" within a window function doesn't raise a -# AnalysisException -@unittest.skip("Fails in Spark Connect, should enable.") -def test_invalid_args(self): -super().test_invalid_args() +class PandasUDFWindowParityTests( +WindowPandasUDFTestsMixin, +ReusedConnectTestCase, +): +pass if __name__ == "__main__": diff --git a/python/pyspark/sql/tests/pandas/test_pandas_udf_window.py b/python/pyspark/sql/tests/pandas/test_pandas_udf_window.py index 5ad136da726d..9b3673d80d22 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_udf_window.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_udf_window.py @@ -292,7 +292,7 @@ class WindowPandasUDFTestsMixin: with self.assertRaisesRegex(AnalysisException, ".*not supported within a window function"): foo_udf = pandas_udf(lambda x: x, "v double", PandasUDFType.GROUPED_MAP) -df.withColumn("v2", foo_udf(df["v"]).over(w)) +df.withColumn("v2", foo_udf(df["v"]).over(w)).schema def test_bounded_simple(self): from pyspark.sql.functions import mean, max, min, count - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch branch-3.5 updated: [SPARK-47762][PYTHON][CONNECT] Add pyspark.sql.connect.protobuf into setup.py
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 932f9c7df38d [SPARK-47762][PYTHON][CONNECT] Add pyspark.sql.connect.protobuf into setup.py 932f9c7df38d is described below commit 932f9c7df38dc766c2b7b05df764f24dc9b55acc Author: Hyukjin Kwon AuthorDate: Mon Apr 8 17:06:33 2024 +0900 [SPARK-47762][PYTHON][CONNECT] Add pyspark.sql.connect.protobuf into setup.py This PR is a followup of https://github.com/apache/spark/pull/42563 (but using new JIRA as it's already released), which adds `pyspark.sql.connect.protobuf` into `setup.py`. So PyPI packaged PySpark can support protobuf function with Spark Connect on. Yes. The new feature is now available with Spark Connect on if users install Spark Connect by `pip`. Being tested in https://github.com/apache/spark/pull/45870 No. Closes #45924 from HyukjinKwon/SPARK-47762. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon (cherry picked from commit f94d95d75886b1af3434cff0c50c99ea1e196092) Signed-off-by: Hyukjin Kwon --- python/setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/setup.py b/python/setup.py index b8e4c9a40e04..2110c2839ca7 100755 --- a/python/setup.py +++ b/python/setup.py @@ -249,6 +249,7 @@ try: "pyspark.sql.connect.avro", "pyspark.sql.connect.client", "pyspark.sql.connect.proto", +"pyspark.sql.connect.protobuf", "pyspark.sql.connect.streaming", "pyspark.sql.pandas", "pyspark.sql.protobuf", - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-47762][PYTHON][CONNECT] Add pyspark.sql.connect.protobuf into setup.py
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f94d95d75886 [SPARK-47762][PYTHON][CONNECT] Add pyspark.sql.connect.protobuf into setup.py f94d95d75886 is described below commit f94d95d75886b1af3434cff0c50c99ea1e196092 Author: Hyukjin Kwon AuthorDate: Mon Apr 8 17:06:33 2024 +0900 [SPARK-47762][PYTHON][CONNECT] Add pyspark.sql.connect.protobuf into setup.py ### What changes were proposed in this pull request? This PR is a followup of https://github.com/apache/spark/pull/42563 (but using new JIRA as it's already released), which adds `pyspark.sql.connect.protobuf` into `setup.py`. ### Why are the changes needed? So PyPI packaged PySpark can support protobuf function with Spark Connect on. ### Does this PR introduce _any_ user-facing change? Yes. The new feature is now available with Spark Connect on if users install Spark Connect by `pip`. ### How was this patch tested? Being tested in https://github.com/apache/spark/pull/45870 ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45924 from HyukjinKwon/SPARK-47762. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- python/packaging/classic/setup.py | 1 + python/packaging/connect/setup.py | 1 + 2 files changed, 2 insertions(+) diff --git a/python/packaging/classic/setup.py b/python/packaging/classic/setup.py index 8844d225d1b8..ddd2448e1c18 100755 --- a/python/packaging/classic/setup.py +++ b/python/packaging/classic/setup.py @@ -275,6 +275,7 @@ try: "pyspark.sql.connect.client", "pyspark.sql.connect.functions", "pyspark.sql.connect.proto", +"pyspark.sql.connect.protobuf", "pyspark.sql.connect.shell", "pyspark.sql.connect.streaming", "pyspark.sql.connect.streaming.worker", diff --git a/python/packaging/connect/setup.py b/python/packaging/connect/setup.py index 157bc86c192e..01c5518d4451 100755 --- a/python/packaging/connect/setup.py +++ b/python/packaging/connect/setup.py @@ -103,6 +103,7 @@ try: "pyspark.sql.connect.client", "pyspark.sql.connect.functions", "pyspark.sql.connect.proto", +"pyspark.sql.connect.protobuf", "pyspark.sql.connect.shell", "pyspark.sql.connect.streaming", "pyspark.sql.connect.streaming.worker", - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [MINOR][PYTHON][SS][TESTS] Drop the tables after being used at `test_streaming_foreach_batch`
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ad2367c55aeb [MINOR][PYTHON][SS][TESTS] Drop the tables after being used at `test_streaming_foreach_batch` ad2367c55aeb is described below commit ad2367c55aebf417183eda13e56c55364276f145 Author: Hyukjin Kwon AuthorDate: Mon Apr 8 11:00:10 2024 +0900 [MINOR][PYTHON][SS][TESTS] Drop the tables after being used at `test_streaming_foreach_batch` ### What changes were proposed in this pull request? This PR proposes to drop the tables after tests finished. ### Why are the changes needed? - To clean up resources properly. - It can affect other test cases when only one session is being used across other tests. ### Does this PR introduce _any_ user-facing change? No, test-only. ### How was this patch tested? Tested in https://github.com/apache/spark/pull/45870 ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45920 from HyukjinKwon/minor-cleanup-table. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- .../streaming/test_streaming_foreach_batch.py | 140 +++-- 1 file changed, 72 insertions(+), 68 deletions(-) diff --git a/python/pyspark/sql/tests/streaming/test_streaming_foreach_batch.py b/python/pyspark/sql/tests/streaming/test_streaming_foreach_batch.py index 5d2c1bbbf62c..ef286115a303 100644 --- a/python/pyspark/sql/tests/streaming/test_streaming_foreach_batch.py +++ b/python/pyspark/sql/tests/streaming/test_streaming_foreach_batch.py @@ -97,46 +97,48 @@ class StreamingTestsForeachBatchMixin: def test_streaming_foreach_batch_spark_session(self): table_name = "testTable_foreach_batch" +with self.table(table_name): -def func(df: DataFrame, batch_id: int): -if batch_id > 0: # only process once -return -spark = df.sparkSession -df1 = spark.createDataFrame([("structured",), ("streaming",)]) -df1.union(df).write.mode("append").saveAsTable(table_name) +def func(df: DataFrame, batch_id: int): +if batch_id > 0: # only process once +return +spark = df.sparkSession +df1 = spark.createDataFrame([("structured",), ("streaming",)]) +df1.union(df).write.mode("append").saveAsTable(table_name) -df = self.spark.readStream.format("text").load("python/test_support/sql/streaming") -q = df.writeStream.foreachBatch(func).start() -q.processAllAvailable() -q.stop() +df = self.spark.readStream.format("text").load("python/test_support/sql/streaming") +q = df.writeStream.foreachBatch(func).start() +q.processAllAvailable() +q.stop() -actual = self.spark.read.table(table_name) -df = ( -self.spark.read.format("text") -.load(path="python/test_support/sql/streaming/") -.union(self.spark.createDataFrame([("structured",), ("streaming",)])) -) -self.assertEqual(sorted(df.collect()), sorted(actual.collect())) +actual = self.spark.read.table(table_name) +df = ( +self.spark.read.format("text") +.load(path="python/test_support/sql/streaming/") +.union(self.spark.createDataFrame([("structured",), ("streaming",)])) +) +self.assertEqual(sorted(df.collect()), sorted(actual.collect())) def test_streaming_foreach_batch_path_access(self): table_name = "testTable_foreach_batch_path" +with self.table(table_name): -def func(df: DataFrame, batch_id: int): -if batch_id > 0: # only process once -return -spark = df.sparkSession -df1 = spark.read.format("text").load("python/test_support/sql/streaming") -df1.union(df).write.mode("append").saveAsTable(table_name) +def func(df: DataFrame, batch_id: int): +if batch_id > 0: # only process once +return +spark = df.sparkSession +df1 = spark.read.format("text").load("python/test_support/sql/streaming") +df1.union(df).write.mode("append").saveAsTable(table_name) -df = self.spark.readStream.format("text").load("python/test_support/sql/streaming") -
(spark) branch master updated (b299b2bc06a9 -> cc6c0eb1bee6)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from b299b2bc06a9 [SPARK-47299][PYTHON][DOCS] Use the same `versions.json` in the dropdown of different versions of PySpark documents add cc6c0eb1bee6 [MINOR][TESTS] Deduplicate test cases `test_parse_datatype_string` No new revisions were added by this update. Summary of changes: python/pyspark/sql/tests/connect/test_parity_types.py | 4 1 file changed, 4 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated (e92e8f5441a7 -> 0c992b205946)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from e92e8f5441a7 [SPARK-47744] Add support for negative-valued bytes in range encoder add 0c992b205946 [SPARK-47755][CONNECT] Pivot should fail when the number of distinct values is too large No new revisions were added by this update. Summary of changes: .../sql/connect/planner/SparkConnectPlanner.scala | 23 +++ python/pyspark/sql/tests/test_group.py | 5 +++ .../spark/sql/RelationalGroupedDataset.scala | 47 -- 3 files changed, 36 insertions(+), 39 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated (d7430124191a -> f7dff4aa0c8f)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from d7430124191a [SPARK-47753][PYTHON][CONNECT][TESTS] Make pyspark.testing compatible with pyspark-connect add f7dff4aa0c8f [SPARK-47752][PS][CONNECT] Make pyspark.pandas compatible with pyspark-connect No new revisions were added by this update. Summary of changes: python/pyspark/pandas/plot/core.py | 6 -- python/pyspark/pandas/spark/functions.py | 21 - 2 files changed, 24 insertions(+), 3 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-47753][PYTHON][CONNECT][TESTS] Make pyspark.testing compatible with pyspark-connect
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new d7430124191a [SPARK-47753][PYTHON][CONNECT][TESTS] Make pyspark.testing compatible with pyspark-connect d7430124191a is described below commit d7430124191ab1f010b2ac873dbbeee5ff9caf52 Author: Hyukjin Kwon AuthorDate: Sun Apr 7 18:34:30 2024 +0900 [SPARK-47753][PYTHON][CONNECT][TESTS] Make pyspark.testing compatible with pyspark-connect ### What changes were proposed in this pull request? This PR proposes to make `pyspark.testing` compatible with `pyspark-connect` by using noop context manager `contextlib.nullcontext` instead of `QuietTest` which requires JVM access. ### Why are the changes needed? In order for `pyspark-connect` to work without classic PySpark packages and dependencies. Also, the logs are hidden as it's written to the separate file so it is actually already quiet. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Yes, at https://github.com/apache/spark/pull/45870. Once CI is setup there, it will be tested there properly. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45916 from HyukjinKwon/SPARK-47753. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- python/pyspark/testing/connectutils.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/pyspark/testing/connectutils.py b/python/pyspark/testing/connectutils.py index 5cb553c4949a..191505741eb4 100644 --- a/python/pyspark/testing/connectutils.py +++ b/python/pyspark/testing/connectutils.py @@ -21,6 +21,7 @@ import os import functools import unittest import uuid +import contextlib grpc_requirement_message = None try: @@ -208,3 +209,5 @@ class ReusedConnectTestCase(unittest.TestCase, SQLTestUtils, PySparkErrorTestUti if self._legacy_sc is not None: return QuietTest(self._legacy_sc) +else: +return contextlib.nullcontext() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-47751][PYTHON][CONNECT] Make pyspark.worker_utils compatible with pyspark-connect
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new c11585ac296e [SPARK-47751][PYTHON][CONNECT] Make pyspark.worker_utils compatible with pyspark-connect c11585ac296e is described below commit c11585ac296eb726e6356bfcc7628a2c948e1d2f Author: Hyukjin Kwon AuthorDate: Sun Apr 7 18:11:12 2024 +0900 [SPARK-47751][PYTHON][CONNECT] Make pyspark.worker_utils compatible with pyspark-connect ### What changes were proposed in this pull request? This PR proposes to make `pyspark.worker_utils` compatible with `pyspark-connect`. ### Why are the changes needed? In order for `pyspark-connect` to work without classic PySpark packages and dependencies. Spark Connect does not support `Broadcast` and `Accumulator`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Yes, at https://github.com/apache/spark/pull/45870. Once CI is setup there, it will be tested there properly. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45914 from HyukjinKwon/SPARK-47751. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- python/pyspark/worker_util.py | 31 ++- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/python/pyspark/worker_util.py b/python/pyspark/worker_util.py index f3c59c91ea2c..22389decac2f 100644 --- a/python/pyspark/worker_util.py +++ b/python/pyspark/worker_util.py @@ -32,10 +32,8 @@ try: except ImportError: has_resource_module = False -from pyspark.accumulators import _accumulatorRegistry -from pyspark.core.broadcast import Broadcast, _broadcastRegistry +from pyspark.util import is_remote_only from pyspark.errors import PySparkRuntimeError -from pyspark.core.files import SparkFiles from pyspark.util import local_connect_and_auth from pyspark.serializers import ( read_bool, @@ -59,8 +57,11 @@ def add_path(path: str) -> None: def read_command(serializer: FramedSerializer, file: IO) -> Any: +if not is_remote_only(): +from pyspark.core.broadcast import Broadcast + command = serializer._read_with_length(file) -if isinstance(command, Broadcast): +if not is_remote_only() and isinstance(command, Broadcast): command = serializer.loads(command.value) return command @@ -125,8 +126,12 @@ def setup_spark_files(infile: IO) -> None: """ # fetch name of workdir spark_files_dir = utf8_deserializer.loads(infile) -SparkFiles._root_directory = spark_files_dir -SparkFiles._is_running_on_worker = True + +if not is_remote_only(): +from pyspark.core.files import SparkFiles + +SparkFiles._root_directory = spark_files_dir +SparkFiles._is_running_on_worker = True # fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH add_path(spark_files_dir) # *.py files that were added will be copied here @@ -142,6 +147,9 @@ def setup_broadcasts(infile: IO) -> None: """ Set up broadcasted variables. """ +if not is_remote_only(): +from pyspark.core.broadcast import Broadcast, _broadcastRegistry + # fetch names and values of broadcast variables needs_broadcast_decryption_server = read_bool(infile) num_broadcast_variables = read_int(infile) @@ -175,6 +183,11 @@ def send_accumulator_updates(outfile: IO) -> None: """ Send the accumulator updates back to JVM. """ -write_int(len(_accumulatorRegistry), outfile) -for aid, accum in _accumulatorRegistry.items(): -pickleSer._write_with_length((aid, accum._value), outfile) +if not is_remote_only(): +from pyspark.accumulators import _accumulatorRegistry + +write_int(len(_accumulatorRegistry), outfile) +for aid, accum in _accumulatorRegistry.items(): +pickleSer._write_with_length((aid, accum._value), outfile) +else: +write_int(0, outfile) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated (644687b66e1a -> 4d9dbb35aacb)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 644687b66e1a [SPARK-47709][BUILD] Upgrade tink to 1.13.0 add 4d9dbb35aacb [SPARK-46722][CONNECT][SS][TESTS][FOLLOW-UP] Drop the tables after tests finished No new revisions were added by this update. Summary of changes: .../sql/tests/connect/streaming/test_parity_listener.py | 12 ++-- 1 file changed, 10 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated (d5620cbe9a2e -> aeb082e06091)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from d5620cbe9a2e [SPARK-47289][SQL] Allow extensions to log extended information in explain plan add aeb082e06091 [SPARK-47081][CONNECT][TESTS][FOLLOW-UP] Skip the flaky doctests for now No new revisions were added by this update. Summary of changes: python/pyspark/sql/session.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch branch-3.5 updated: [SPARK-47734][PYTHON][TESTS] Fix flaky DataFrame.writeStream doctest by stopping streaming query
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 3cb6a44a8d91 [SPARK-47734][PYTHON][TESTS] Fix flaky DataFrame.writeStream doctest by stopping streaming query 3cb6a44a8d91 is described below commit 3cb6a44a8d9112fb53a28ccaedf8bbc648cdf92a Author: Josh Rosen AuthorDate: Fri Apr 5 11:14:42 2024 +0900 [SPARK-47734][PYTHON][TESTS] Fix flaky DataFrame.writeStream doctest by stopping streaming query ### What changes were proposed in this pull request? This PR deflakes the `pyspark.sql.dataframe.DataFrame.writeStream` doctest. PR https://github.com/apache/spark/pull/45298 aimed to fix that test but misdiagnosed the root issue. The problem is not that concurrent tests were colliding on a temporary directory. Rather, the issue is specific to the `DataFrame.writeStream` test's logic: that test is starting a streaming query that writes files to the temporary directory, the exits the temp directory context manager without first stopping the streaming query. That creates a race condition where the context manager [...] ``` File "/__w/spark/spark/python/pyspark/sql/dataframe.py", line ?, in pyspark.sql.dataframe.DataFrame.writeStream Failed example: with tempfile.TemporaryDirectory() as d: # Create a table with Rate source. df.writeStream.toTable( "my_table", checkpointLocation=d) Exception raised: Traceback (most recent call last): File "/usr/lib/python3.11/doctest.py", line 1353, in __run exec(compile(example.source, filename, "single", File "", line 1, in with tempfile.TemporaryDirectory() as d: File "/usr/lib/python3.11/tempfile.py", line 1043, in __exit__ self.cleanup() File "/usr/lib/python3.11/tempfile.py", line 1047, in cleanup self._rmtree(self.name, ignore_errors=self._ignore_cleanup_errors) File "/usr/lib/python3.11/tempfile.py", line 1029, in _rmtree _rmtree(name, onerror=onerror) File "/usr/lib/python3.11/shutil.py", line 738, in rmtree onerror(os.rmdir, path, sys.exc_info()) File "/usr/lib/python3.11/shutil.py", line 736, in rmtree os.rmdir(path, dir_fd=dir_fd) OSError: [Errno 39] Directory not empty: '/__w/spark/spark/python/target/4f062b09-213f-4ac2-a10a-2d704990141b/tmp29irqweq' ``` In this PR, I update the doctest to properly stop the streaming query. ### Why are the changes needed? Fix flaky test. ### Does this PR introduce _any_ user-facing change? No, test-only. Small user-facing doc change, but one that is consistent with other doctest examples. ### How was this patch tested? Manually ran updated test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45885 from JoshRosen/fix-flaky-writestream-doctest. Authored-by: Josh Rosen Signed-off-by: Hyukjin Kwon (cherry picked from commit 0107435cb39d68eccf8a6900c3c781665deca38b) Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/dataframe.py | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 7c382ab1c5a5..97f60967da70 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -529,6 +529,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): Examples +>>> import time >>> import tempfile >>> df = spark.readStream.format("rate").load() >>> type(df.writeStream) @@ -536,9 +537,10 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): >>> with tempfile.TemporaryDirectory() as d: ... # Create a table with Rate source. -... df.writeStream.toTable( +... query = df.writeStream.toTable( ... "my_table", checkpointLocation=d) -<...streaming.query.StreamingQuery object at 0x...> +... time.sleep(3) +... query.stop() """ return DataStreamWriter(self) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated (b9ca91dde94c -> 0107435cb39d)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from b9ca91dde94c [SPARK-47712][CONNECT] Allow connect plugins to create and process Datasets add 0107435cb39d [SPARK-47734][PYTHON][TESTS] Fix flaky DataFrame.writeStream doctest by stopping streaming query No new revisions were added by this update. Summary of changes: python/pyspark/sql/dataframe.py | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch branch-3.5 updated: [SPARK-47568][SS][3.5] Fix race condition between maintenance thread and load/commit for snapshot files
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 465a85375b1e [SPARK-47568][SS][3.5] Fix race condition between maintenance thread and load/commit for snapshot files 465a85375b1e is described below commit 465a85375b1e3b81b96eb365d4a68943478265c9 Author: Bhuwan Sahni AuthorDate: Fri Apr 5 10:51:24 2024 +0900 [SPARK-47568][SS][3.5] Fix race condition between maintenance thread and load/commit for snapshot files Backports https://github.com/apache/spark/pull/45724 to 3.5 ### What changes were proposed in this pull request? This PR fixes a race condition between the maintenance thread and task thread when change-log checkpointing is enabled, and ensure all snapshots are valid. 1. The maintenance thread currently relies on class variable lastSnapshot to find the latest checkpoint and uploads it to DFS. This checkpoint can be modified at commit time by Task thread if a new snapshot is created. 2. The task thread was not resetting the lastSnapshot at load time, which can result in newer snapshots (if a old version is loaded) being considered valid and uploaded to DFS. This results in VersionIdMismatch errors. ### Why are the changes needed? These are logical bugs which can cause `VersionIdMismatch` errors causing user to discard the snapshot and restart the query. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit test cases. ### Was this patch authored or co-authored using generative AI tooling? No Closes #45881 from sahnib/rocks-db-fix-3.5. Authored-by: Bhuwan Sahni Signed-off-by: Hyukjin Kwon --- .../sql/execution/streaming/state/RocksDB.scala| 65 ++ .../streaming/state/RocksDBFileManager.scala | 3 +- .../execution/streaming/state/RocksDBSuite.scala | 37 3 files changed, 81 insertions(+), 24 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 0c9738a6b081..301d978c9038 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming.state import java.io.File import java.util.Locale +import java.util.concurrent.TimeUnit import javax.annotation.concurrent.GuardedBy import scala.collection.{mutable, Map} @@ -152,19 +153,23 @@ class RocksDB( loadedVersion = latestSnapshotVersion // reset last snapshot version -lastSnapshotVersion = 0L +if (lastSnapshotVersion > latestSnapshotVersion) { + // discard any newer snapshots + lastSnapshotVersion = 0L + latestSnapshot = None +} openDB() numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) { - // we don't track the total number of rows - discard the number being track - -1L -} else if (metadata.numKeys < 0) { - // we track the total number of rows, but the snapshot doesn't have tracking number - // need to count keys now - countKeys() -} else { - metadata.numKeys -} +// we don't track the total number of rows - discard the number being track +-1L + } else if (metadata.numKeys < 0) { +// we track the total number of rows, but the snapshot doesn't have tracking number +// need to count keys now +countKeys() + } else { +metadata.numKeys + } if (loadedVersion != version) replayChangelog(version) // After changelog replay the numKeysOnWritingVersion will be updated to // the correct number of keys in the loaded version. @@ -359,16 +364,14 @@ class RocksDB( // background operations. val cp = Checkpoint.create(db) cp.createCheckpoint(checkpointDir.toString) - synchronized { -// if changelog checkpointing is disabled, the snapshot is uploaded synchronously -// inside the uploadSnapshot() called below. -// If changelog checkpointing is enabled, snapshot will be uploaded asynchronously -// during state store maintenance. -latestSnapshot.foreach(_.close()) -latestSnapshot = Some( - RocksDBSnapshot(checkpointDir, newVersion, numKeysOnWritingVersion)) -lastSnapshotVersion = newVersion - } + // if changelog chec
(spark) branch master updated: [SPARK-47712][CONNECT] Allow connect plugins to create and process Datasets
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new b9ca91dde94c [SPARK-47712][CONNECT] Allow connect plugins to create and process Datasets b9ca91dde94c is described below commit b9ca91dde94c5ac6eeae9bb5818099adbc93006c Author: Tom van Bussel AuthorDate: Fri Apr 5 10:42:43 2024 +0900 [SPARK-47712][CONNECT] Allow connect plugins to create and process Datasets ### What changes were proposed in this pull request? This PR adds new versions of `SparkSession.createDataset` and `SparkSession.createDataFrame` that take an `Array[Byte]` as input. The older versions that take a `protobuf.Any` are deprecated. This PR also adds new versions of `SparkConnectPlanner.transformRelation` and `SparkConnectPlanner.transformExpression` that take an `Array[Byte]`. ### Why are the changes needed? Without these changes it's difficult to create plugins for Spark Connect. The methods above used to take a protobuf class that is shaded as input, meaning that that plugins had to shade these classes in the exact same way. Now they can just serialize the protobuf object to bytes and pass that in instead. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Tests were added ### Was this patch authored or co-authored using generative AI tooling? No Closes #45850 from tomvanbussel/SPARK-47712. Authored-by: Tom van Bussel Signed-off-by: Hyukjin Kwon --- .../main/scala/org/apache/spark/sql/Column.scala | 6 + .../scala/org/apache/spark/sql/SparkSession.scala | 14 ++- .../org/apache/spark/sql/ClientDatasetSuite.scala | 14 ++- .../apache/spark/sql/PlanGenerationTestSuite.scala | 26 +++-- .../expression_extension_deprecated.explain| 2 ++ .../relation_extension_deprecated.explain | 1 + .../queries/expression_extension_deprecated.json | 26 + .../expression_extension_deprecated.proto.bin | Bin 0 -> 127 bytes .../queries/relation_extension_deprecated.json | 16 + .../relation_extension_deprecated.proto.bin| Bin 0 -> 108 bytes .../sql/connect/planner/SparkConnectPlanner.scala | 11 + .../plugin/SparkConnectPluginRegistrySuite.scala | 5 ++-- 12 files changed, 114 insertions(+), 7 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala index dec699f4f1a8..c23d49440248 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala @@ -1351,10 +1351,16 @@ private[sql] object Column { } @DeveloperApi + @deprecated("Use forExtension(Array[Byte]) instead", "4.0.0") def apply(extension: com.google.protobuf.Any): Column = { apply(_.setExtension(extension)) } + @DeveloperApi + def forExtension(extension: Array[Byte]): Column = { +apply(_.setExtension(com.google.protobuf.Any.parseFrom(extension))) + } + private[sql] def fn(name: String, inputs: Column*): Column = { fn(name, isDistinct = false, inputs: _*) } diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala index adee5b33fb4e..1e467a864442 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -496,17 +496,29 @@ class SparkSession private[sql] ( } @DeveloperApi + @deprecated("Use newDataFrame(Array[Byte]) instead", "4.0.0") def newDataFrame(extension: com.google.protobuf.Any): DataFrame = { -newDataset(extension, UnboundRowEncoder) +newDataFrame(_.setExtension(extension)) } @DeveloperApi + @deprecated("Use newDataFrame(Array[Byte], AgnosticEncoder[T]) instead", "4.0.0") def newDataset[T]( extension: com.google.protobuf.Any, encoder: AgnosticEncoder[T]): Dataset[T] = { newDataset(encoder)(_.setExtension(extension)) } + @DeveloperApi + def newDataFrame(extension: Array[Byte]): DataFrame = { +newDataFrame(_.setExtension(com.google.protobuf.Any.parseFrom(extension))) + } + + @DeveloperApi + def newDataset[T](extension: Array[Byte], encoder: AgnosticEncoder[T]): Dataset[T] = { + newDataset(encoder)(_.setExtension(com.google.protobuf.Any.parseFrom(extension))) + } + private[sql] def newCommand[T](f: proto.Command.Builder =>
(spark) branch master updated: [SPARK-47081][CONNECT][FOLLOW-UP] Add the `shell` module into PyPI package
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 404d58c74e16 [SPARK-47081][CONNECT][FOLLOW-UP] Add the `shell` module into PyPI package 404d58c74e16 is described below commit 404d58c74e1623b29f51f3f6b967ac5d0185db13 Author: Hyukjin Kwon AuthorDate: Fri Apr 5 10:02:51 2024 +0900 [SPARK-47081][CONNECT][FOLLOW-UP] Add the `shell` module into PyPI package ### What changes were proposed in this pull request? This PR is a followup of https://github.com/apache/spark/pull/45150 that adds the new `shell` module into PyPI package. ### Why are the changes needed? So PyPI package contains `shell` module. ### Does this PR introduce _any_ user-facing change? No, the main change has not been released yet. ### How was this patch tested? The test case will be added at https://github.com/apache/spark/pull/45870. It was found out during working on that PR. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45882 from HyukjinKwon/SPARK-47081-followup. Lead-authored-by: Hyukjin Kwon Co-authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- python/packaging/classic/setup.py | 1 + python/packaging/connect/setup.py | 1 + 2 files changed, 2 insertions(+) diff --git a/python/packaging/classic/setup.py b/python/packaging/classic/setup.py index 5242f749d622..8844d225d1b8 100755 --- a/python/packaging/classic/setup.py +++ b/python/packaging/classic/setup.py @@ -275,6 +275,7 @@ try: "pyspark.sql.connect.client", "pyspark.sql.connect.functions", "pyspark.sql.connect.proto", +"pyspark.sql.connect.shell", "pyspark.sql.connect.streaming", "pyspark.sql.connect.streaming.worker", "pyspark.sql.functions", diff --git a/python/packaging/connect/setup.py b/python/packaging/connect/setup.py index f77074a1bb20..157bc86c192e 100755 --- a/python/packaging/connect/setup.py +++ b/python/packaging/connect/setup.py @@ -103,6 +103,7 @@ try: "pyspark.sql.connect.client", "pyspark.sql.connect.functions", "pyspark.sql.connect.proto", +"pyspark.sql.connect.shell", "pyspark.sql.connect.streaming", "pyspark.sql.connect.streaming.worker", "pyspark.sql.functions", - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated (f6999df0c7f0 -> bffb02d14fa4)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from f6999df0c7f0 [SPARK-47081][CONNECT] Support Query Execution Progress add bffb02d14fa4 [SPARK-47565][PYTHON] PySpark worker pool crash resilience No new revisions were added by this update. Summary of changes: .../spark/api/python/PythonWorkerFactory.scala | 29 +++--- python/pyspark/tests/test_worker.py| 16 2 files changed, 36 insertions(+), 9 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-47724][PYTHON][TESTS] Add an environment variable for testing remote pure Python library
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new d272a1b4367e [SPARK-47724][PYTHON][TESTS] Add an environment variable for testing remote pure Python library d272a1b4367e is described below commit d272a1b4367edac03ba1de91551573c8903dd8d8 Author: Hyukjin Kwon AuthorDate: Thu Apr 4 12:15:11 2024 +0900 [SPARK-47724][PYTHON][TESTS] Add an environment variable for testing remote pure Python library ### What changes were proposed in this pull request? This PR proposes to add an environment variable called `SPARK_CONNECT_TESTING_REMOTE` to set `remote` URL. ### Why are the changes needed? In order to test pure Python library with a remote server. ### Does this PR introduce _any_ user-facing change? No, test-only. ### How was this patch tested? Manually tested. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45868 from HyukjinKwon/SPARK-47724. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- python/pyspark/testing/connectutils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/testing/connectutils.py b/python/pyspark/testing/connectutils.py index bfe54b33f569..da6b861e925a 100644 --- a/python/pyspark/testing/connectutils.py +++ b/python/pyspark/testing/connectutils.py @@ -174,7 +174,7 @@ class ReusedConnectTestCase(unittest.TestCase, SQLTestUtils, PySparkErrorTestUti @classmethod def master(cls): -return "local[4]" +return os.environ.get("SPARK_CONNECT_TESTING_REMOTE", "local[4]") @classmethod def setUpClass(cls): - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated (447f8aff6c26 -> 678aeb7ef708)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 447f8aff6c26 [SPARK-47720][CORE] Update `spark.speculation.multiplier` to 3 and `spark.speculation.quantile` to 0.9 add 678aeb7ef708 [SPARK-47683][PYTHON][BUILD] Decouple PySpark core API to pyspark.core package No new revisions were added by this update. Summary of changes: dev/lint-python| 2 +- dev/make-distribution.sh | 2 +- dev/reformat-python| 2 +- dev/run-pip-tests | 6 +- dev/sparktestsupport/modules.py| 10 +- docs/building-spark.md | 2 +- docs/rdd-programming-guide.md | 2 +- examples/src/main/python/avro_inputformat.py | 2 +- examples/src/main/python/parquet_inputformat.py| 2 +- examples/src/main/python/sort.py | 2 +- .../python/streaming/network_wordjoinsentiments.py | 2 +- .../streaming/recoverable_network_wordcount.py | 5 +- .../main/python/streaming/sql_network_wordcount.py | 3 +- pom.xml| 3 +- python/.gitignore | 3 + python/{ => packaging/classic}/setup.cfg | 0 python/{ => packaging/classic}/setup.py| 34 ++- python/{ => packaging/connect}/setup.cfg | 0 python/packaging/connect/setup.py | 166 + python/pyspark/__init__.py | 30 ++- python/pyspark/accumulators.py | 2 +- .../python => python/pyspark/core}/__init__.py | 0 python/pyspark/{ => core}/broadcast.py | 11 +- python/pyspark/{ => core}/conf.py | 14 +- python/pyspark/{ => core}/context.py | 15 +- python/pyspark/{ => core}/files.py | 2 +- python/pyspark/{ => core}/rdd.py | 167 + python/pyspark/{ => core}/status.py| 0 python/pyspark/errors/exceptions/captured.py | 54 +++- python/pyspark/java_gateway.py | 65 + python/pyspark/ml/common.py| 4 +- python/pyspark/ml/torch/distributor.py | 2 +- python/pyspark/mllib/clustering.py | 2 +- python/pyspark/mllib/common.py | 4 +- python/pyspark/mllib/evaluation.py | 2 +- python/pyspark/mllib/feature.py| 4 +- python/pyspark/mllib/fpm.py| 2 +- python/pyspark/mllib/random.py | 4 +- python/pyspark/mllib/recommendation.py | 2 +- python/pyspark/mllib/regression.py | 4 +- python/pyspark/mllib/stat/KernelDensity.py | 2 +- python/pyspark/mllib/stat/_statistics.py | 2 +- python/pyspark/mllib/tree.py | 2 +- python/pyspark/mllib/util.py | 4 +- python/pyspark/profiler.py | 2 +- python/pyspark/resource/profile.py | 13 +- python/pyspark/resource/requests.py| 17 +- python/pyspark/serializers.py | 5 +- python/pyspark/shell.py| 2 +- python/pyspark/sql/avro/functions.py | 4 +- python/pyspark/sql/column.py | 41 ++-- python/pyspark/sql/conf.py | 9 +- python/pyspark/sql/connect/client/core.py | 2 +- python/pyspark/sql/connect/dataframe.py| 2 +- python/pyspark/sql/connect/group.py| 2 +- python/pyspark/sql/connect/session.py | 3 +- .../streaming/worker/foreach_batch_worker.py | 2 +- .../connect/streaming/worker/listener_worker.py| 2 +- python/pyspark/sql/connect/udf.py | 2 +- python/pyspark/sql/connect/udtf.py | 2 +- python/pyspark/sql/context.py | 29 ++- python/pyspark/sql/dataframe.py| 122 - python/pyspark/sql/functions/builtin.py| 15 +- python/pyspark/sql/group.py| 5 +- python/pyspark/sql/observation.py | 12 +- python/pyspark/sql/pandas/conversion.py| 2 +- python/pyspark/sql/pandas/functions.py | 2 +- python/pyspark/sql/pandas/functions.pyi| 2 +- python/pyspark/sql/pandas/group_ops.py | 2 +- python/pyspark/sql/pandas/map_ops.py | 7 +- python/pyspark/sql/pandas/typehints.py | 2 +- python/pyspark/sql/protobuf/functions.py | 4 +- python/pyspark/sql/readwriter.py | 38 ++-
(spark) branch master updated: [SPARK-47708][CONNECT] Do not log gRPC exception to stderr in PySpark
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new d87ac8ef49db [SPARK-47708][CONNECT] Do not log gRPC exception to stderr in PySpark d87ac8ef49db is described below commit d87ac8ef49dbd7a14d7a774b4ace1ab681a1bb01 Author: Nemanja Boric AuthorDate: Thu Apr 4 10:28:34 2024 +0900 [SPARK-47708][CONNECT] Do not log gRPC exception to stderr in PySpark ### What changes were proposed in this pull request? Currently if there's any gRPC exception, instead of just handling it, the PySpark's gRPC error handler is going to print it out to the stderr, not allowing the user to cleanly ignore the exception by using try/except control flow statement. In this PR we are removing the logger.exception call and we rely on the downstream exception mechanism to report this to the user. ### Why are the changes needed? Without this change, there's no way that the user ignores the gRPC error without piping the stderr to /dev/null or equivalent. ### Does this PR introduce _any_ user-facing change? Yes, the stderr will not have the exception trace written twice. ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45840 from nemanja-boric-databricks/no-log. Authored-by: Nemanja Boric Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/connect/client/core.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/pyspark/sql/connect/client/core.py b/python/pyspark/sql/connect/client/core.py index c8cf12f40708..b8ba8bd21dec 100644 --- a/python/pyspark/sql/connect/client/core.py +++ b/python/pyspark/sql/connect/client/core.py @@ -1668,7 +1668,6 @@ class SparkConnectClient(object): --- Throws the appropriate internal Python exception. """ -logger.exception("GRPC Error received") # We have to cast the value here because, a RpcError is a Call as well. # https://grpc.github.io/grpc/python/grpc.html#grpc.UnaryUnaryMultiCallable.__call__ status = rpc_status.from_call(cast(grpc.Call, rpc_error)) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-47722][SS] Wait until RocksDB background work finish before closing
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 49eefc5d1d92 [SPARK-47722][SS] Wait until RocksDB background work finish before closing 49eefc5d1d92 is described below commit 49eefc5d1d9255ca8db624925d813cc29460f4c7 Author: Wei Liu AuthorDate: Thu Apr 4 08:45:27 2024 +0900 [SPARK-47722][SS] Wait until RocksDB background work finish before closing ### What changes were proposed in this pull request? When closing the rocksdb instance, we need to wait until all background work finish. If not, the following error could be observed: ``` 24/03/29 06:47:11 INFO RocksDB StateStoreId(opId=0,partId=0,name=default): [NativeRocksDB-2] [/error_handler.cc:396] Background IO error IO error: No such file or directory: While open a file for appending: /ephemeral/tmp/spark-efd53c17-2b8a-4f80-aca0-b767dc06be3d/StateStoreId(opId=0,partId=0,name=default)-732271d8-03b3-4046-a911-5797804df25c/workingDir-3a85e625-e4fb-4a78-b668-941ca16cc7a2/08.sst: No such file or directory 24/03/29 06:47:11 ERROR RocksDB StateStoreId(opId=0,partId=0,name=default): [NativeRocksDB-3] [/db_impl/db_impl_compaction_flush.cc:3021] Waiting after background flush error: IO error: No such file or directory: While open a file for appending: /ephemeral/tmp/spark-efd53c17-2b8a-4f80-aca0-b767dc06be3d/StateStoreId(opId=0,partId=0,name=default)-732271d8-03b3-4046-a911-5797804df25c/workingDir-3a85e625-e4fb-4a78-b668-941ca16cc7a2/08.sst: No such file or directoryAccumulated backgrou [...] 24/03/29 11:54:09 INFO ShutdownHookManager: Deleting directory /ephemeral/tmp/spark-b5dac908-59cc-4276-80f7-34dab79716b7/StateStoreId(opId=0,partId=0,name=default)-702d3c8f-245e-4119-a763-b8e963d07e7b 24/03/29 06:47:12 INFO ShutdownHookManager: Deleting directory /ephemeral/tmp/spark-efd53c17-2b8a-4f80-aca0-b767dc06be3d/StateStoreId(opId=0,partId=4,name=default)-0eb30b1b-b92f-4744-aff6-85f9efd2bcf2 24/03/29 06:47:12 INFO ShutdownHookManager: Deleting directory /ephemeral/tmp/streaming.metadata-d281c16c-89c7-49b3-b65a-6eb2de6ddb6f pthread lock: Invalid argument ``` In the source code, after this error is thrown, there is a sleep for 1 second and then re lock the original mutex: https://github.com/facebook/rocksdb/blob/e46ab9d4f0a0e63bfc668421e2994efa918d6570/db/db_impl/db_impl_compaction_flush.cc#L2613 From the logs of RocksDB and ShutdownHookManager , we can see that exactly 1 second after rocks db throws, the pthread lock: Invalid argument is thrown. So it is likely that this mutex throws. ### Why are the changes needed? Bug fix for a transient issue ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing test should be enough. ### Was this patch authored or co-authored using generative AI tooling? No Closes #45863 from WweiL/SPARK-47722-rocksdb-cleanup. Authored-by: Wei Liu Signed-off-by: Hyukjin Kwon --- .../scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index fcefc1666f3a..c6fb9699cf33 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -886,6 +886,8 @@ class RocksDB( colFamilyNameToHandleMap.values.map(handle => handle.close) colFamilyNameToHandleMap.clear() + // Cancel and wait until all background work finishes + db.cancelAllBackgroundWork(true) // Close the DB instance db.close() db = null - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-47721][DOC] Guidelines for the Structured Logging Framework
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new fbe6b1dba2ab [SPARK-47721][DOC] Guidelines for the Structured Logging Framework fbe6b1dba2ab is described below commit fbe6b1dba2abab03fef2fbbac4640c4c41153e71 Author: Gengliang Wang AuthorDate: Thu Apr 4 08:46:21 2024 +0900 [SPARK-47721][DOC] Guidelines for the Structured Logging Framework ### What changes were proposed in this pull request? As suggested in https://github.com/apache/spark/pull/45834/files#r1549565157, I am creating initial guidelines for the structured logging framework. ### Why are the changes needed? We need guidelines to align the logging migration works in the community. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? It's just doc change. ### Was this patch authored or co-authored using generative AI tooling? Yes. Generated-by: GitHub Copilot 1.2.17.2887 Closes #45862 from gengliangwang/logREADME. Authored-by: Gengliang Wang Signed-off-by: Hyukjin Kwon --- .../src/main/scala/org/apache/spark/internal/README.md | 13 + 1 file changed, 13 insertions(+) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/README.md b/common/utils/src/main/scala/org/apache/spark/internal/README.md new file mode 100644 index ..ed3d77333806 --- /dev/null +++ b/common/utils/src/main/scala/org/apache/spark/internal/README.md @@ -0,0 +1,13 @@ +# Guidelines for the Structured Logging Framework + +## LogKey + +LogKeys serve as identifiers for mapped diagnostic contexts (MDC) within logs. Follow these guidelines when adding new LogKeys: +* Define all structured logging keys in `LogKey.scala`, and sort them alphabetically for ease of search. +* Use `UPPER_SNAKE_CASE` for key names. +* Key names should be both simple and broad, yet include specific identifiers like `STAGE_ID`, `TASK_ID`, and `JOB_ID` when needed for clarity. For instance, use `MAX_ATTEMPTS` as a general key instead of creating separate keys for each scenario such as `EXECUTOR_STATE_SYNC_MAX_ATTEMPTS` and `MAX_TASK_FAILURES`. This balances simplicity with the detail needed for effective logging. +* Use abbreviations in names if they are widely understood, such as `APP_ID` for APPLICATION_ID, and `K8S` for KUBERNETES. + +## Exceptions + +To ensure logs are compatible with Spark SQL and log analysis tools, avoid `Exception.printStackTrace()`. Use `logError`, `logWarning`, and `logInfo` methods from the `Logging` trait to log exceptions, maintaining structured and parsable logs. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated (7dec5eb14644 -> 6a0555c39b2d)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 7dec5eb14644 [SPARK-47705][INFRA] Sort LogKey alphabetically and build a test to ensure it add 6a0555c39b2d [SPARK-47700][SQL] Fix formatting of error messages with treeNode No new revisions were added by this update. Summary of changes: .../utils/src/main/resources/error/error-classes.json | 18 -- .../scala/org/apache/spark/SparkThrowableSuite.scala | 2 ++ ...pported-subquery-expression-category-error-class.md | 18 -- 3 files changed, 26 insertions(+), 12 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated (62f90ec6d32f -> 6e3a42515067)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 62f90ec6d32f [SPARK-47452][INFRA][FOLLOWUP] Enforce to install `six` to `Python 3.10` add 6e3a42515067 [MINOR][DOCS] replace `-formatted code with tags inside configuration.md tables No new revisions were added by this update. Summary of changes: docs/configuration.md | 26 +- 1 file changed, 13 insertions(+), 13 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org