[spark] branch master updated: [SPARK-44554][INFRA][FOLLOWUP] Install `IPython` for the Python linter check in the daily test for branch-3.3
This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 55b07b1367a [SPARK-44554][INFRA][FOLLOWUP] Install `IPython` for the Python linter check in the daily test for branch-3.3 55b07b1367a is described below commit 55b07b1367a78d5431ce2043fcd590fc4409d566 Author: yangjie01 AuthorDate: Thu Aug 10 12:48:20 2023 +0800 [SPARK-44554][INFRA][FOLLOWUP] Install `IPython` for the Python linter check in the daily test for branch-3.3 ### What changes were proposed in this pull request? This pr aims to install `IPython` for the Python linter check in the daily test for branch-3.3. ### Why are the changes needed? To fix the Python linter check in the daily test of branch-3.3. https://github.com/apache/spark/actions/runs/5805901293/job/15737802378 https://github.com/apache/spark/assets/1475305/59896b6a-b710-4772-9139-189c7ea73fe5;> ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Monitor GA Closes #42413 from LuciferYang/SPARK-44554-FOLLOWUP. Authored-by: yangjie01 Signed-off-by: yangjie01 --- .github/workflows/build_and_test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 5f0505ab98f..6321fd37b1a 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -664,7 +664,7 @@ jobs: # SPARK-44554: Copy from https://github.com/apache/spark/blob/073d0b60d31bf68ebacdc005f59b928a5902670f/.github/workflows/build_and_test.yml#L501-L508 # Should delete this section after SPARK 3.3 EOL. python3.9 -m pip install 'flake8==3.9.0' pydata_sphinx_theme 'mypy==0.920' 'pytest==7.1.3' 'pytest-mypy-plugins==1.9.3' numpydoc 'jinja2<3.0.0' 'black==21.12b0' -python3.9 -m pip install 'pandas-stubs==1.2.0.53' +python3.9 -m pip install 'pandas-stubs==1.2.0.53' ipython - name: Install Python linter dependencies for branch-3.4 if: inputs.branch == 'branch-3.4' run: | - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (97cfe45c157 -> 5c94565e75c)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 97cfe45c157 [SPARK-44701][PYTHON][TESTS][FOLLOWUP] Keep `torch` in 3.5 daily GA add 5c94565e75c [MINOR][BUILD] Skip `deepspeed` in requirements on MacOS No new revisions were added by this update. Summary of changes: dev/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-44701][PYTHON][TESTS][FOLLOWUP] Keep `torch` in 3.5 daily GA
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 97cfe45c157 [SPARK-44701][PYTHON][TESTS][FOLLOWUP] Keep `torch` in 3.5 daily GA 97cfe45c157 is described below commit 97cfe45c157e0b0c179b887617d46567eec6b46a Author: Ruifeng Zheng AuthorDate: Thu Aug 10 12:11:48 2023 +0900 [SPARK-44701][PYTHON][TESTS][FOLLOWUP] Keep `torch` in 3.5 daily GA ### What changes were proposed in this pull request? after https://github.com/apache/spark/pull/42375, the `pyspark-connect` in 3.5 daily GA still fails (see https://github.com/apache/spark/actions/runs/5805901293) We don't have similar issue in 3.3 and 3.4. ### Why are the changes needed? I take a cursory look, `torch` is a must in pyspark-ml now, if we want to fix this failure, need to touch many files to change the imports. I think it not worthwhile, so make this change. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? ci Closes #42412 from zhengruifeng/test_without_torch. Authored-by: Ruifeng Zheng Signed-off-by: Hyukjin Kwon --- .github/workflows/build_and_test.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index b4559dea42b..5f0505ab98f 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -378,6 +378,7 @@ jobs: SKIP_MIMA: true SKIP_PACKAGING: true METASPACE_SIZE: 1g + BRANCH: ${{ inputs.branch }} steps: - name: Checkout Spark repository uses: actions/checkout@v3 @@ -418,7 +419,7 @@ jobs: - name: Free up disk space shell: 'script -q -e -c "bash {0}"' run: | -if [[ "$MODULES_TO_TEST" != *"pyspark-ml"* ]]; then +if [[ "$MODULES_TO_TEST" != *"pyspark-ml"* ]] && [[ "$BRANCH" != "branch-3.5" ]]; then # uninstall libraries dedicated for ML testing python3.9 -m pip uninstall -y torch torchvision torcheval torchtnt tensorboard mlflow fi - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.5 updated: [SPARK-44461][SS][PYTHON][CONNECT] Verify Python Version for spark connect streaming workers
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 6b55d618d36 [SPARK-44461][SS][PYTHON][CONNECT] Verify Python Version for spark connect streaming workers 6b55d618d36 is described below commit 6b55d618d36bdd296b3883916328d26863e94b8a Author: Wei Liu AuthorDate: Thu Aug 10 12:08:13 2023 +0900 [SPARK-44461][SS][PYTHON][CONNECT] Verify Python Version for spark connect streaming workers ### What changes were proposed in this pull request? Add python version check for spark connect streaming `foreachBatch_worker` and `listener_worker` ### Why are the changes needed? Necessary check ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? I believe it can be skipped here Closes #42421 from WweiL/SPARK-44461-verify-python-ver. Authored-by: Wei Liu Signed-off-by: Hyukjin Kwon (cherry picked from commit 0eeb6042859e82cedaa0807abe5d6be0229ecd09) Signed-off-by: Hyukjin Kwon --- .../main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala | 2 +- python/pyspark/sql/connect/streaming/query.py | 3 ++- python/pyspark/sql/connect/streaming/readwriter.py | 3 ++- python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py | 3 +++ python/pyspark/sql/connect/streaming/worker/listener_worker.py | 3 +++ 5 files changed, 11 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala index 1a75965eb92..cddda6fb7a7 100644 --- a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala @@ -83,7 +83,7 @@ private[spark] class StreamingPythonRunner( val stream = new BufferedOutputStream(pythonWorker.get.getOutputStream, bufferSize) val dataOut = new DataOutputStream(stream) -// TODO(SPARK-44461): verify python version +PythonWorkerUtils.writePythonVersion(pythonVer, dataOut) // Send sessionId PythonRDD.writeUTF(sessionId, dataOut) diff --git a/python/pyspark/sql/connect/streaming/query.py b/python/pyspark/sql/connect/streaming/query.py index 59e98e7bc30..021d27e939d 100644 --- a/python/pyspark/sql/connect/streaming/query.py +++ b/python/pyspark/sql/connect/streaming/query.py @@ -23,6 +23,7 @@ from pyspark.errors import StreamingQueryException, PySparkValueError import pyspark.sql.connect.proto as pb2 from pyspark.serializers import CloudPickleSerializer from pyspark.sql.connect import proto +from pyspark.sql.connect.utils import get_python_ver from pyspark.sql.streaming import StreamingQueryListener from pyspark.sql.streaming.query import ( StreamingQuery as PySparkStreamingQuery, @@ -237,7 +238,7 @@ class StreamingQueryManager: cmd = pb2.StreamingQueryManagerCommand() expr = proto.PythonUDF() expr.command = CloudPickleSerializer().dumps(listener) -expr.python_ver = "%d.%d" % sys.version_info[:2] +expr.python_ver = get_python_ver() cmd.add_listener.python_listener_payload.CopyFrom(expr) cmd.add_listener.id = listener._id self._execute_streaming_query_manager_cmd(cmd) diff --git a/python/pyspark/sql/connect/streaming/readwriter.py b/python/pyspark/sql/connect/streaming/readwriter.py index c8cd408404f..89097fcf43a 100644 --- a/python/pyspark/sql/connect/streaming/readwriter.py +++ b/python/pyspark/sql/connect/streaming/readwriter.py @@ -31,6 +31,7 @@ from pyspark.sql.streaming.readwriter import ( DataStreamReader as PySparkDataStreamReader, DataStreamWriter as PySparkDataStreamWriter, ) +from pyspark.sql.connect.utils import get_python_ver from pyspark.sql.types import Row, StructType from pyspark.errors import PySparkTypeError, PySparkValueError @@ -499,7 +500,7 @@ class DataStreamWriter: self._write_proto.foreach_batch.python_function.command = CloudPickleSerializer().dumps( func ) -self._write_proto.foreach_batch.python_function.python_ver = "%d.%d" % sys.version_info[:2] +self._write_proto.foreach_batch.python_function.python_ver = get_python_ver() return self foreachBatch.__doc__ = PySparkDataStreamWriter.foreachBatch.__doc__ diff --git a/python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py b/python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py index 48a9848de40..cf61463cd68 100644 --- a/python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py +++ b/python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py @@ -31,12 +31,15 @@
[spark] branch master updated: [SPARK-44461][SS][PYTHON][CONNECT] Verify Python Version for spark connect streaming workers
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 0eeb6042859 [SPARK-44461][SS][PYTHON][CONNECT] Verify Python Version for spark connect streaming workers 0eeb6042859 is described below commit 0eeb6042859e82cedaa0807abe5d6be0229ecd09 Author: Wei Liu AuthorDate: Thu Aug 10 12:08:13 2023 +0900 [SPARK-44461][SS][PYTHON][CONNECT] Verify Python Version for spark connect streaming workers ### What changes were proposed in this pull request? Add python version check for spark connect streaming `foreachBatch_worker` and `listener_worker` ### Why are the changes needed? Necessary check ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? I believe it can be skipped here Closes #42421 from WweiL/SPARK-44461-verify-python-ver. Authored-by: Wei Liu Signed-off-by: Hyukjin Kwon --- .../main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala | 2 +- python/pyspark/sql/connect/streaming/query.py | 3 ++- python/pyspark/sql/connect/streaming/readwriter.py | 3 ++- python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py | 3 +++ python/pyspark/sql/connect/streaming/worker/listener_worker.py | 3 +++ 5 files changed, 11 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala index a079743c847..fdfe388db2d 100644 --- a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala @@ -81,7 +81,7 @@ private[spark] class StreamingPythonRunner( val stream = new BufferedOutputStream(pythonWorker.get.getOutputStream, bufferSize) val dataOut = new DataOutputStream(stream) -// TODO(SPARK-44461): verify python version +PythonWorkerUtils.writePythonVersion(pythonVer, dataOut) // Send sessionId PythonRDD.writeUTF(sessionId, dataOut) diff --git a/python/pyspark/sql/connect/streaming/query.py b/python/pyspark/sql/connect/streaming/query.py index 59e98e7bc30..021d27e939d 100644 --- a/python/pyspark/sql/connect/streaming/query.py +++ b/python/pyspark/sql/connect/streaming/query.py @@ -23,6 +23,7 @@ from pyspark.errors import StreamingQueryException, PySparkValueError import pyspark.sql.connect.proto as pb2 from pyspark.serializers import CloudPickleSerializer from pyspark.sql.connect import proto +from pyspark.sql.connect.utils import get_python_ver from pyspark.sql.streaming import StreamingQueryListener from pyspark.sql.streaming.query import ( StreamingQuery as PySparkStreamingQuery, @@ -237,7 +238,7 @@ class StreamingQueryManager: cmd = pb2.StreamingQueryManagerCommand() expr = proto.PythonUDF() expr.command = CloudPickleSerializer().dumps(listener) -expr.python_ver = "%d.%d" % sys.version_info[:2] +expr.python_ver = get_python_ver() cmd.add_listener.python_listener_payload.CopyFrom(expr) cmd.add_listener.id = listener._id self._execute_streaming_query_manager_cmd(cmd) diff --git a/python/pyspark/sql/connect/streaming/readwriter.py b/python/pyspark/sql/connect/streaming/readwriter.py index c8cd408404f..89097fcf43a 100644 --- a/python/pyspark/sql/connect/streaming/readwriter.py +++ b/python/pyspark/sql/connect/streaming/readwriter.py @@ -31,6 +31,7 @@ from pyspark.sql.streaming.readwriter import ( DataStreamReader as PySparkDataStreamReader, DataStreamWriter as PySparkDataStreamWriter, ) +from pyspark.sql.connect.utils import get_python_ver from pyspark.sql.types import Row, StructType from pyspark.errors import PySparkTypeError, PySparkValueError @@ -499,7 +500,7 @@ class DataStreamWriter: self._write_proto.foreach_batch.python_function.command = CloudPickleSerializer().dumps( func ) -self._write_proto.foreach_batch.python_function.python_ver = "%d.%d" % sys.version_info[:2] +self._write_proto.foreach_batch.python_function.python_ver = get_python_ver() return self foreachBatch.__doc__ = PySparkDataStreamWriter.foreachBatch.__doc__ diff --git a/python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py b/python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py index 48a9848de40..cf61463cd68 100644 --- a/python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py +++ b/python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py @@ -31,12 +31,15 @@ from pyspark.serializers import ( from pyspark import worker from pyspark.sql import SparkSession from typing
[spark] branch master updated (3b3e0fdc61c -> 1846991b20a)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 3b3e0fdc61c [SPARK-42621][PS] Add inclusive parameter for pd.date_range add 1846991b20a [SPARK-44732][SQL] Built-in XML data source support No new revisions were added by this update. Summary of changes: dev/.rat-excludes |1 + dev/deps/spark-deps-hadoop-3-hive-2.3 |2 + pom.xml| 12 + sql/core/pom.xml |8 + ...org.apache.spark.sql.sources.DataSourceRegister |1 + .../execution/datasources/xml/DefaultSource.scala | 113 + .../datasources/xml/XmlDataToCatalyst.scala| 62 + .../execution/datasources/xml/XmlInputFormat.scala | 341 ++ .../sql/execution/datasources/xml/XmlOptions.scala | 85 + .../sql/execution/datasources/xml/XmlReader.scala | 205 ++ .../execution/datasources/xml/XmlRelation.scala| 86 + .../sql/execution/datasources/xml/functions.scala | 42 + .../sql/execution/datasources/xml/package.scala| 162 + .../datasources/xml/parsers/StaxXmlGenerator.scala | 159 + .../datasources/xml/parsers/StaxXmlParser.scala| 375 ++ .../xml/parsers/StaxXmlParserUtils.scala | 179 + .../datasources/xml/util/InferSchema.scala | 336 ++ .../xml/util/PartialResultException.scala | 29 + .../execution/datasources/xml/util/TypeCast.scala | 297 ++ .../datasources/xml/util/ValidatorUtil.scala | 55 + .../datasources/xml/util/XSDToSchema.scala | 280 ++ .../execution/datasources/xml/util/XmlFile.scala | 163 + .../execution/datasources/xml/JavaXmlSuite.java| 111 + .../test-data/xml-resources/ages-mixed-types.xml | 15 + .../test-data/xml-resources/ages-with-spaces.xml | 20 + .../resources/test-data/xml-resources/ages.xml | 14 + .../xml-resources/attributesStartWithNewLine.xml | 11 + .../xml-resources/attributesStartWithNewLineCR.xml |1 + .../xml-resources/attributesStartWithNewLineLF.xml | 11 + .../resources/test-data/xml-resources/basket.xml | 12 + .../resources/test-data/xml-resources/basket.xsd | 17 + .../test-data/xml-resources/basket_invalid.xml | 14 + .../xml-resources/books-attributes-in-no-child.xml | 75 + .../books-complicated-null-attribute.xml | 60 + .../test-data/xml-resources/books-complicated.xml | 60 + .../xml-resources/books-malformed-attributes.xml | 43 + .../test-data/xml-resources/books-namespaces.xml | 12 + .../test-data/xml-resources/books-nested-array.xml | 130 + .../xml-resources/books-nested-object.xml | 144 + .../xml-resources/books-unicode-in-tag-name.xml| 24 + .../resources/test-data/xml-resources/books.xml| 136 + .../test-data/xml-resources/cars-attribute.xml |9 + .../test-data/xml-resources/cars-iso-8859-1.xml| 21 + .../test-data/xml-resources/cars-malformed.xml | 20 + .../xml-resources/cars-mixed-attr-no-child.xml | 25 + .../xml-resources/cars-no-indentation.xml |2 + .../xml-resources/cars-unbalanced-elements.xml | 19 + .../resources/test-data/xml-resources/cars.xml | 21 + .../resources/test-data/xml-resources/cars.xml.bz2 | Bin 0 -> 229 bytes .../resources/test-data/xml-resources/cars.xml.gz | Bin 0 -> 210 bytes .../resources/test-data/xml-resources/catalog.xsd | 41 + .../resources/test-data/xml-resources/choice.xsd | 12 + .../xml-resources/complex-content-extension.xsd| 25 + .../xml-resources/datatypes-valid-and-invalid.xml | 31 + .../resources/test-data/xml-resources/date.xml |5 + .../xml-resources/decimal-with-restriction.xsd | 18 + .../resources/test-data/xml-resources/empty.xml|0 .../test-data/xml-resources/feed-with-spaces.xml | 15 + .../test-data/xml-resources/fias_house.large.xml | 3621 .../xml-resources/fias_house.large.xml.bz2 | Bin 0 -> 30761 bytes .../xml-resources/fias_house.large.xml.gz | Bin 0 -> 8568 bytes .../test-data/xml-resources/fias_house.xml | 182 + .../test-data/xml-resources/fias_house.xml.bz2 | Bin 0 -> 4571 bytes .../test-data/xml-resources/fias_house.xml.gz | Bin 0 -> 5069 bytes .../test-data/xml-resources/gps-empty-field.xml| 20 + .../xml-resources/include-example/first.xsd|5 + .../xml-resources/include-example/second.xsd | 15 + .../resources/test-data/xml-resources/long.xsd | 10 + .../xml-resources/manual_schema_corrupt_record.xml | 30 + .../test-data/xml-resources/map-attribute.xml |7 + .../test-data/xml-resources/mixed_children.xml |5 + .../test-data/xml-resources/mixed_children_2.xml |5 + .../xml-resources/mixed_children_as_string.xml |9 +
[spark] branch master updated: [SPARK-42621][PS] Add inclusive parameter for pd.date_range
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 3b3e0fdc61c [SPARK-42621][PS] Add inclusive parameter for pd.date_range 3b3e0fdc61c is described below commit 3b3e0fdc61cf659bb97f94d9b12b8dcdad999e62 Author: Zhyhimont Dmitry AuthorDate: Thu Aug 10 11:05:11 2023 +0900 [SPARK-42621][PS] Add inclusive parameter for pd.date_range ### What changes were proposed in this pull request? Add inclusive parameter for pd.date_range to support the pandas 2.0.0 ### Why are the changes needed? When pandas 2.0.0 is released, we should match the behavior in pandas API on Spark. ### Does this PR introduce any user-facing change? yes, the API changes Before: ps.date_range(start='2017-01-01', end='2017-01-04', closed=None) After: ps.date_range(start='2017-01-01', end='2017-01-04', inclusive="both") ### How was this patch tested? Unit tests were updated Closes #40665 from dzhigimont/SPARK-42621_ZH. Lead-authored-by: Zhyhimont Dmitry Co-authored-by: Zhyhimont Dmitry Signed-off-by: Hyukjin Kwon --- python/pyspark/pandas/namespace.py| 32 +-- python/pyspark/pandas/tests/test_namespace.py | 25 + 2 files changed, 55 insertions(+), 2 deletions(-) diff --git a/python/pyspark/pandas/namespace.py b/python/pyspark/pandas/namespace.py index ba93e5a3ee5..fddf1bec63f 100644 --- a/python/pyspark/pandas/namespace.py +++ b/python/pyspark/pandas/namespace.py @@ -1751,8 +1751,6 @@ def to_datetime( ) -# TODO(SPARK-42621): Add `inclusive` parameter. -# See https://github.com/pandas-dev/pandas/issues/40245 def date_range( start: Union[str, Any] = None, end: Union[str, Any] = None, @@ -1761,6 +1759,7 @@ def date_range( tz: Optional[Union[str, tzinfo]] = None, normalize: bool = False, name: Optional[str] = None, +inclusive: str = "both", **kwargs: Any, ) -> DatetimeIndex: """ @@ -1784,6 +1783,11 @@ def date_range( Normalize start/end dates to midnight before generating date range. name : str, default None Name of the resulting DatetimeIndex. +inclusive : {"both", "neither", "left", "right"}, default "both" +Include boundaries; Whether to set each bound as closed or open. + +.. versionadded:: 4.0.0 + **kwargs For compatibility. Has no effect on the result. @@ -1867,6 +1871,29 @@ def date_range( DatetimeIndex(['2018-01-31', '2018-04-30', '2018-07-31', '2018-10-31', '2019-01-31'], dtype='datetime64[ns]', freq=None) + +`inclusive` controls whether to include `start` and `end` that are on the +boundary. The default includes boundary points on either end. + +>>> ps.date_range( +... start='2017-01-01', end='2017-01-04', inclusive="both" +... ) # doctest: +NORMALIZE_WHITESPACE +DatetimeIndex(['2017-01-01', '2017-01-02', '2017-01-03', '2017-01-04'], + dtype='datetime64[ns]', freq=None) + +Use ``inclusive='left'`` to exclude `end` if it falls on the boundary. + +>>> ps.date_range( +... start='2017-01-01', end='2017-01-04', inclusive='left' +... ) # doctest: +NORMALIZE_WHITESPACE +DatetimeIndex(['2017-01-01', '2017-01-02', '2017-01-03'], dtype='datetime64[ns]', freq=None) + +Use ``inclusive='right'`` to exclude `start` if it falls on the boundary. + +>>> ps.date_range( +... start='2017-01-01', end='2017-01-04', inclusive='right' +... ) # doctest: +NORMALIZE_WHITESPACE +DatetimeIndex(['2017-01-02', '2017-01-03', '2017-01-04'], dtype='datetime64[ns]', freq=None) """ assert freq not in ["N", "ns"], "nanoseconds is not supported" assert tz is None, "Localized DatetimeIndex is not supported" @@ -1882,6 +1909,7 @@ def date_range( tz=tz, normalize=normalize, name=name, +inclusive=inclusive, **kwargs, ) ), diff --git a/python/pyspark/pandas/tests/test_namespace.py b/python/pyspark/pandas/tests/test_namespace.py index d1d1e1af935..5c202046717 100644 --- a/python/pyspark/pandas/tests/test_namespace.py +++ b/python/pyspark/pandas/tests/test_namespace.py @@ -221,6 +221,31 @@ class NamespaceTestsMixin: pd.date_range(start="1/1/2018", periods=5, freq=pd.offsets.MonthEnd(3)), ) +self.assert_eq( +ps.date_range(start="2017-01-01", end="2017-01-04", inclusive="left"), +pd.date_range(start="2017-01-01", end="2017-01-04", inclusive="left"), +) + +self.assert_eq( +ps.date_range(start="2017-01-01", end="2017-01-04", inclusive="right"), +
[spark] branch branch-3.5 updated: [SPARK-44747][CONNECT] Add missing SparkSession.Builder methods
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 6e4abe11442 [SPARK-44747][CONNECT] Add missing SparkSession.Builder methods 6e4abe11442 is described below commit 6e4abe11442f2986412909b4ebb4c13487df27b6 Author: Herman van Hovell AuthorDate: Thu Aug 10 09:49:45 2023 +0900 [SPARK-44747][CONNECT] Add missing SparkSession.Builder methods ### What changes were proposed in this pull request? This PR adds a couple methods to SparkSession.Builder: - `conf` - this group of methods allows you to set runtime configurations on the Spark Connect Session. - `master` - this is a no-op, it is only added for compatibility. - `appName` - this is a no-op, it is only added for compatibility. - `enableHiveSupport ` - this is a no-op, it is only added for compatibility. ### Why are the changes needed? We want to maximize compatiblity with the existing API in sql/core. ### Does this PR introduce _any_ user-facing change? Yes. It adds a couple of builder methods. ### How was this patch tested? Add tests to `SparkSessionSuite` and `SparkSessionE2ESuite`. Closes #42419 from hvanhovell/SPARK-44747. Authored-by: Herman van Hovell Signed-off-by: Hyukjin Kwon (cherry picked from commit d27496eb3bf962981e37f989ba486d847745444f) Signed-off-by: Hyukjin Kwon --- .../scala/org/apache/spark/sql/SparkSession.scala | 91 +- .../apache/spark/sql/SparkSessionE2ESuite.scala| 46 +++ .../org/apache/spark/sql/SparkSessionSuite.scala | 10 +++ .../CheckConnectJvmClientCompatibility.scala | 6 -- 4 files changed, 146 insertions(+), 7 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala index 7367ed153f7..e902e04e246 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -781,6 +781,7 @@ object SparkSession extends Logging { class Builder() extends Logging { private val builder = SparkConnectClient.builder() private var client: SparkConnectClient = _ +private[this] val options = new scala.collection.mutable.HashMap[String, String] def remote(connectionString: String): Builder = { builder.connectionString(connectionString) @@ -804,6 +805,84 @@ object SparkSession extends Logging { this } +/** + * Sets a config option. Options set using this method are automatically propagated to the + * Spark Connect session. Only runtime options are supported. + * + * @since 3.5.0 + */ +def config(key: String, value: String): Builder = synchronized { + options += key -> value + this +} + +/** + * Sets a config option. Options set using this method are automatically propagated to the + * Spark Connect session. Only runtime options are supported. + * + * @since 3.5.0 + */ +def config(key: String, value: Long): Builder = synchronized { + options += key -> value.toString + this +} + +/** + * Sets a config option. Options set using this method are automatically propagated to the + * Spark Connect session. Only runtime options are supported. + * + * @since 3.5.0 + */ +def config(key: String, value: Double): Builder = synchronized { + options += key -> value.toString + this +} + +/** + * Sets a config option. Options set using this method are automatically propagated to the + * Spark Connect session. Only runtime options are supported. + * + * @since 3.5.0 + */ +def config(key: String, value: Boolean): Builder = synchronized { + options += key -> value.toString + this +} + +/** + * Sets a config a map of options. Options set using this method are automatically propagated + * to the Spark Connect session. Only runtime options are supported. + * + * @since 3.5.0 + */ +def config(map: Map[String, Any]): Builder = synchronized { + map.foreach { kv: (String, Any) => +{ + options += kv._1 -> kv._2.toString +} + } + this +} + +/** + * Sets a config option. Options set using this method are automatically propagated to both + * `SparkConf` and SparkSession's own configuration. + * + * @since 3.5.0 + */ +def config(map: java.util.Map[String, Any]): Builder = synchronized { + config(map.asScala.toMap) +} + +@deprecated("enableHiveSupport does not work in Spark Connect") +def enableHiveSupport(): Builder = this + +
[spark] branch master updated: [SPARK-44747][CONNECT] Add missing SparkSession.Builder methods
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new d27496eb3bf [SPARK-44747][CONNECT] Add missing SparkSession.Builder methods d27496eb3bf is described below commit d27496eb3bf962981e37f989ba486d847745444f Author: Herman van Hovell AuthorDate: Thu Aug 10 09:49:45 2023 +0900 [SPARK-44747][CONNECT] Add missing SparkSession.Builder methods ### What changes were proposed in this pull request? This PR adds a couple methods to SparkSession.Builder: - `conf` - this group of methods allows you to set runtime configurations on the Spark Connect Session. - `master` - this is a no-op, it is only added for compatibility. - `appName` - this is a no-op, it is only added for compatibility. - `enableHiveSupport ` - this is a no-op, it is only added for compatibility. ### Why are the changes needed? We want to maximize compatiblity with the existing API in sql/core. ### Does this PR introduce _any_ user-facing change? Yes. It adds a couple of builder methods. ### How was this patch tested? Add tests to `SparkSessionSuite` and `SparkSessionE2ESuite`. Closes #42419 from hvanhovell/SPARK-44747. Authored-by: Herman van Hovell Signed-off-by: Hyukjin Kwon --- .../scala/org/apache/spark/sql/SparkSession.scala | 91 +- .../apache/spark/sql/SparkSessionE2ESuite.scala| 46 +++ .../org/apache/spark/sql/SparkSessionSuite.scala | 10 +++ .../CheckConnectJvmClientCompatibility.scala | 6 -- 4 files changed, 146 insertions(+), 7 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala index 7367ed153f7..e902e04e246 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -781,6 +781,7 @@ object SparkSession extends Logging { class Builder() extends Logging { private val builder = SparkConnectClient.builder() private var client: SparkConnectClient = _ +private[this] val options = new scala.collection.mutable.HashMap[String, String] def remote(connectionString: String): Builder = { builder.connectionString(connectionString) @@ -804,6 +805,84 @@ object SparkSession extends Logging { this } +/** + * Sets a config option. Options set using this method are automatically propagated to the + * Spark Connect session. Only runtime options are supported. + * + * @since 3.5.0 + */ +def config(key: String, value: String): Builder = synchronized { + options += key -> value + this +} + +/** + * Sets a config option. Options set using this method are automatically propagated to the + * Spark Connect session. Only runtime options are supported. + * + * @since 3.5.0 + */ +def config(key: String, value: Long): Builder = synchronized { + options += key -> value.toString + this +} + +/** + * Sets a config option. Options set using this method are automatically propagated to the + * Spark Connect session. Only runtime options are supported. + * + * @since 3.5.0 + */ +def config(key: String, value: Double): Builder = synchronized { + options += key -> value.toString + this +} + +/** + * Sets a config option. Options set using this method are automatically propagated to the + * Spark Connect session. Only runtime options are supported. + * + * @since 3.5.0 + */ +def config(key: String, value: Boolean): Builder = synchronized { + options += key -> value.toString + this +} + +/** + * Sets a config a map of options. Options set using this method are automatically propagated + * to the Spark Connect session. Only runtime options are supported. + * + * @since 3.5.0 + */ +def config(map: Map[String, Any]): Builder = synchronized { + map.foreach { kv: (String, Any) => +{ + options += kv._1 -> kv._2.toString +} + } + this +} + +/** + * Sets a config option. Options set using this method are automatically propagated to both + * `SparkConf` and SparkSession's own configuration. + * + * @since 3.5.0 + */ +def config(map: java.util.Map[String, Any]): Builder = synchronized { + config(map.asScala.toMap) +} + +@deprecated("enableHiveSupport does not work in Spark Connect") +def enableHiveSupport(): Builder = this + +@deprecated("master does not work in Spark Connect, please use remote instead") +def master(master:
[spark] branch branch-3.5 updated: [SPARK-44740][CONNECT] Support specifying `session_id` in SPARK_REMOTE connection string
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 576d1698c7b [SPARK-44740][CONNECT] Support specifying `session_id` in SPARK_REMOTE connection string 576d1698c7b is described below commit 576d1698c7b52b2d9ce00fa2fc5912c92e8bbe67 Author: Martin Grund AuthorDate: Thu Aug 10 08:41:13 2023 +0900 [SPARK-44740][CONNECT] Support specifying `session_id` in SPARK_REMOTE connection string ### What changes were proposed in this pull request? To support cross-language session sharing in Spark connect, we need to be able to inject the session ID into the connection string because on the server side, the client-provided session ID is used already together with the user id. ``` SparkSession.builder.remote("sc://localhost/;session_id=abcdefg").getOrCreate() ``` ### Why are the changes needed? ease of use ### Does this PR introduce _any_ user-facing change? Adds a way to configure the Spark Connect connection string with `session_id` ### How was this patch tested? Added UT for the parameter. Closes #42415 from grundprinzip/SPARK-44740. Authored-by: Martin Grund Signed-off-by: Hyukjin Kwon (cherry picked from commit 7af4e358f3f4902cc9601e56c2662b8921a925d6) Signed-off-by: Hyukjin Kwon --- .../sql/connect/client/SparkConnectClient.scala| 22 ++-- .../connect/client/SparkConnectClientParser.scala | 3 +++ .../SparkConnectClientBuilderParseTestSuite.scala | 4 +++ .../connect/client/SparkConnectClientSuite.scala | 6 + connector/connect/docs/client-connection-string.md | 11 python/pyspark/sql/connect/client/core.py | 30 +++--- .../sql/tests/connect/client/test_client.py| 7 + .../sql/tests/connect/test_connect_basic.py| 18 - 8 files changed, 94 insertions(+), 7 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala index a028df536cf..637499f090c 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala @@ -56,7 +56,7 @@ private[sql] class SparkConnectClient( // Generate a unique session ID for this client. This UUID must be unique to allow // concurrent Spark sessions of the same user. If the channel is closed, creating // a new client will create a new session ID. - private[sql] val sessionId: String = UUID.randomUUID.toString + private[sql] val sessionId: String = configuration.sessionId.getOrElse(UUID.randomUUID.toString) private[client] val artifactManager: ArtifactManager = { new ArtifactManager(configuration, sessionId, bstub, stub) @@ -432,6 +432,7 @@ object SparkConnectClient { val PARAM_USE_SSL = "use_ssl" val PARAM_TOKEN = "token" val PARAM_USER_AGENT = "user_agent" + val PARAM_SESSION_ID = "session_id" } private def verifyURI(uri: URI): Unit = { @@ -463,6 +464,21 @@ object SparkConnectClient { this } +def sessionId(value: String): Builder = { + try { +UUID.fromString(value).toString + } catch { +case e: IllegalArgumentException => + throw new IllegalArgumentException( +"Parameter value 'session_id' must be a valid UUID format.", +e) + } + _configuration = _configuration.copy(sessionId = Some(value)) + this +} + +def sessionId: Option[String] = _configuration.sessionId + def userAgent: String = _configuration.userAgent def option(key: String, value: String): Builder = { @@ -490,6 +506,7 @@ object SparkConnectClient { case URIParams.PARAM_TOKEN => token(value) case URIParams.PARAM_USE_SSL => if (java.lang.Boolean.valueOf(value)) enableSsl() else disableSsl() + case URIParams.PARAM_SESSION_ID => sessionId(value) case _ => option(key, value) } } @@ -576,7 +593,8 @@ object SparkConnectClient { userAgent: String = DEFAULT_USER_AGENT, retryPolicy: GrpcRetryHandler.RetryPolicy = GrpcRetryHandler.RetryPolicy(), useReattachableExecute: Boolean = true, - interceptors: List[ClientInterceptor] = List.empty) { + interceptors: List[ClientInterceptor] = List.empty, + sessionId: Option[String] = None) { def userContext: proto.UserContext = { val builder = proto.UserContext.newBuilder() diff --git
[spark] branch master updated: [SPARK-44740][CONNECT] Support specifying `session_id` in SPARK_REMOTE connection string
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 7af4e358f3f [SPARK-44740][CONNECT] Support specifying `session_id` in SPARK_REMOTE connection string 7af4e358f3f is described below commit 7af4e358f3f4902cc9601e56c2662b8921a925d6 Author: Martin Grund AuthorDate: Thu Aug 10 08:41:13 2023 +0900 [SPARK-44740][CONNECT] Support specifying `session_id` in SPARK_REMOTE connection string ### What changes were proposed in this pull request? To support cross-language session sharing in Spark connect, we need to be able to inject the session ID into the connection string because on the server side, the client-provided session ID is used already together with the user id. ``` SparkSession.builder.remote("sc://localhost/;session_id=abcdefg").getOrCreate() ``` ### Why are the changes needed? ease of use ### Does this PR introduce _any_ user-facing change? Adds a way to configure the Spark Connect connection string with `session_id` ### How was this patch tested? Added UT for the parameter. Closes #42415 from grundprinzip/SPARK-44740. Authored-by: Martin Grund Signed-off-by: Hyukjin Kwon --- .../sql/connect/client/SparkConnectClient.scala| 22 ++-- .../connect/client/SparkConnectClientParser.scala | 3 +++ .../SparkConnectClientBuilderParseTestSuite.scala | 4 +++ .../connect/client/SparkConnectClientSuite.scala | 6 + connector/connect/docs/client-connection-string.md | 11 python/pyspark/sql/connect/client/core.py | 30 +++--- .../sql/tests/connect/client/test_client.py| 7 + .../sql/tests/connect/test_connect_basic.py| 18 - 8 files changed, 94 insertions(+), 7 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala index a028df536cf..637499f090c 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala @@ -56,7 +56,7 @@ private[sql] class SparkConnectClient( // Generate a unique session ID for this client. This UUID must be unique to allow // concurrent Spark sessions of the same user. If the channel is closed, creating // a new client will create a new session ID. - private[sql] val sessionId: String = UUID.randomUUID.toString + private[sql] val sessionId: String = configuration.sessionId.getOrElse(UUID.randomUUID.toString) private[client] val artifactManager: ArtifactManager = { new ArtifactManager(configuration, sessionId, bstub, stub) @@ -432,6 +432,7 @@ object SparkConnectClient { val PARAM_USE_SSL = "use_ssl" val PARAM_TOKEN = "token" val PARAM_USER_AGENT = "user_agent" + val PARAM_SESSION_ID = "session_id" } private def verifyURI(uri: URI): Unit = { @@ -463,6 +464,21 @@ object SparkConnectClient { this } +def sessionId(value: String): Builder = { + try { +UUID.fromString(value).toString + } catch { +case e: IllegalArgumentException => + throw new IllegalArgumentException( +"Parameter value 'session_id' must be a valid UUID format.", +e) + } + _configuration = _configuration.copy(sessionId = Some(value)) + this +} + +def sessionId: Option[String] = _configuration.sessionId + def userAgent: String = _configuration.userAgent def option(key: String, value: String): Builder = { @@ -490,6 +506,7 @@ object SparkConnectClient { case URIParams.PARAM_TOKEN => token(value) case URIParams.PARAM_USE_SSL => if (java.lang.Boolean.valueOf(value)) enableSsl() else disableSsl() + case URIParams.PARAM_SESSION_ID => sessionId(value) case _ => option(key, value) } } @@ -576,7 +593,8 @@ object SparkConnectClient { userAgent: String = DEFAULT_USER_AGENT, retryPolicy: GrpcRetryHandler.RetryPolicy = GrpcRetryHandler.RetryPolicy(), useReattachableExecute: Boolean = true, - interceptors: List[ClientInterceptor] = List.empty) { + interceptors: List[ClientInterceptor] = List.empty, + sessionId: Option[String] = None) { def userContext: proto.UserContext = { val builder = proto.UserContext.newBuilder() diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClientParser.scala
[spark] branch branch-3.4 updated: [SPARK-44745][DOCS][K8S] Document shuffle data recovery from the remounted K8s PVCs
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new a846a225ced [SPARK-44745][DOCS][K8S] Document shuffle data recovery from the remounted K8s PVCs a846a225ced is described below commit a846a225cedf61187ece611143b45806837bf0bc Author: Dongjoon Hyun AuthorDate: Wed Aug 9 15:25:33 2023 -0700 [SPARK-44745][DOCS][K8S] Document shuffle data recovery from the remounted K8s PVCs ### What changes were proposed in this pull request? This PR aims to document an example of shuffle data recovery configuration from the remounted K8s PVCs. ### Why are the changes needed? This will help the users use this feature more easily. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manual review because this is a doc-only change. ![Screenshot 2023-08-09 at 1 39 48 PM](https://github.com/apache/spark/assets/9700541/8cc7240b-570d-4c2e-b90a-54795c18df0a) ``` $ kubectl logs -f xxx-exec-16 | grep Kube ... 23/08/09 21:09:21 INFO KubernetesLocalDiskShuffleExecutorComponents: Try to recover shuffle data. 23/08/09 21:09:21 INFO KubernetesLocalDiskShuffleExecutorComponents: Found 192 files 23/08/09 21:09:21 INFO KubernetesLocalDiskShuffleExecutorComponents: Try to recover /data/spark-x/executor-x/blockmgr-41a810ea-9503-447b-afc7-1cb104cd03cf/11/shuffle_0_11160_0.data 23/08/09 21:09:21 INFO KubernetesLocalDiskShuffleExecutorComponents: Try to recover /data/spark-x/executor-x/blockmgr-41a810ea-9503-447b-afc7-1cb104cd03cf/0e/shuffle_0_10063_0.data 23/08/09 21:09:21 INFO KubernetesLocalDiskShuffleExecutorComponents: Try to recover /data/spark-x/executor-x/blockmgr-41a810ea-9503-447b-afc7-1cb104cd03cf/0e/shuffle_0_10283_0.data 23/08/09 21:09:21 INFO KubernetesLocalDiskShuffleExecutorComponents: Ignore a non-shuffle block file. ``` Closes #42417 from dongjoon-hyun/SPARK-44745. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun (cherry picked from commit 4db378fae30733cbd2be41e95a3cd8ad2184e06f) Signed-off-by: Dongjoon Hyun --- docs/running-on-kubernetes.md | 7 +++ 1 file changed, 7 insertions(+) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 71754dbc6f9..7a35236afa0 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -394,6 +394,13 @@ spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount. spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly=false ``` +To enable shuffle data recovery feature via the built-in `KubernetesLocalDiskShuffleDataIO` plugin, we need to have the followings. You may want to enable `spark.kubernetes.driver.waitToReusePersistentVolumeClaim` additionally. + +``` +spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path=/data/spark-x/executor-x +spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO +``` + If no volume is set as local storage, Spark uses temporary scratch space to spill data to disk during shuffles and other operations. When using Kubernetes as the resource manager the pods will be created with an [emptyDir](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir) volume mounted for each directory listed in `spark.local.dir` or the environment variable `SPARK_LOCAL_DIRS` . If no directories are explicitly specified then a default directory is created and configured [...] `emptyDir` volumes use the ephemeral storage feature of Kubernetes and do not persist beyond the life of the pod. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.5 updated: [SPARK-44745][DOCS][K8S] Document shuffle data recovery from the remounted K8s PVCs
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 4d67f2c59c6 [SPARK-44745][DOCS][K8S] Document shuffle data recovery from the remounted K8s PVCs 4d67f2c59c6 is described below commit 4d67f2c59c660cd1081f6c1bd3002fffca28e936 Author: Dongjoon Hyun AuthorDate: Wed Aug 9 15:25:33 2023 -0700 [SPARK-44745][DOCS][K8S] Document shuffle data recovery from the remounted K8s PVCs ### What changes were proposed in this pull request? This PR aims to document an example of shuffle data recovery configuration from the remounted K8s PVCs. ### Why are the changes needed? This will help the users use this feature more easily. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manual review because this is a doc-only change. ![Screenshot 2023-08-09 at 1 39 48 PM](https://github.com/apache/spark/assets/9700541/8cc7240b-570d-4c2e-b90a-54795c18df0a) ``` $ kubectl logs -f xxx-exec-16 | grep Kube ... 23/08/09 21:09:21 INFO KubernetesLocalDiskShuffleExecutorComponents: Try to recover shuffle data. 23/08/09 21:09:21 INFO KubernetesLocalDiskShuffleExecutorComponents: Found 192 files 23/08/09 21:09:21 INFO KubernetesLocalDiskShuffleExecutorComponents: Try to recover /data/spark-x/executor-x/blockmgr-41a810ea-9503-447b-afc7-1cb104cd03cf/11/shuffle_0_11160_0.data 23/08/09 21:09:21 INFO KubernetesLocalDiskShuffleExecutorComponents: Try to recover /data/spark-x/executor-x/blockmgr-41a810ea-9503-447b-afc7-1cb104cd03cf/0e/shuffle_0_10063_0.data 23/08/09 21:09:21 INFO KubernetesLocalDiskShuffleExecutorComponents: Try to recover /data/spark-x/executor-x/blockmgr-41a810ea-9503-447b-afc7-1cb104cd03cf/0e/shuffle_0_10283_0.data 23/08/09 21:09:21 INFO KubernetesLocalDiskShuffleExecutorComponents: Ignore a non-shuffle block file. ``` Closes #42417 from dongjoon-hyun/SPARK-44745. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun (cherry picked from commit 4db378fae30733cbd2be41e95a3cd8ad2184e06f) Signed-off-by: Dongjoon Hyun --- docs/running-on-kubernetes.md | 7 +++ 1 file changed, 7 insertions(+) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index d3953592c4e..707a76196f3 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -394,6 +394,13 @@ spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount. spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly=false ``` +To enable shuffle data recovery feature via the built-in `KubernetesLocalDiskShuffleDataIO` plugin, we need to have the followings. You may want to enable `spark.kubernetes.driver.waitToReusePersistentVolumeClaim` additionally. + +``` +spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path=/data/spark-x/executor-x +spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO +``` + If no volume is set as local storage, Spark uses temporary scratch space to spill data to disk during shuffles and other operations. When using Kubernetes as the resource manager the pods will be created with an [emptyDir](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir) volume mounted for each directory listed in `spark.local.dir` or the environment variable `SPARK_LOCAL_DIRS` . If no directories are explicitly specified then a default directory is created and configured [...] `emptyDir` volumes use the ephemeral storage feature of Kubernetes and do not persist beyond the life of the pod. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-44745][DOCS][K8S] Document shuffle data recovery from the remounted K8s PVCs
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 4db378fae30 [SPARK-44745][DOCS][K8S] Document shuffle data recovery from the remounted K8s PVCs 4db378fae30 is described below commit 4db378fae30733cbd2be41e95a3cd8ad2184e06f Author: Dongjoon Hyun AuthorDate: Wed Aug 9 15:25:33 2023 -0700 [SPARK-44745][DOCS][K8S] Document shuffle data recovery from the remounted K8s PVCs ### What changes were proposed in this pull request? This PR aims to document an example of shuffle data recovery configuration from the remounted K8s PVCs. ### Why are the changes needed? This will help the users use this feature more easily. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manual review because this is a doc-only change. ![Screenshot 2023-08-09 at 1 39 48 PM](https://github.com/apache/spark/assets/9700541/8cc7240b-570d-4c2e-b90a-54795c18df0a) ``` $ kubectl logs -f xxx-exec-16 | grep Kube ... 23/08/09 21:09:21 INFO KubernetesLocalDiskShuffleExecutorComponents: Try to recover shuffle data. 23/08/09 21:09:21 INFO KubernetesLocalDiskShuffleExecutorComponents: Found 192 files 23/08/09 21:09:21 INFO KubernetesLocalDiskShuffleExecutorComponents: Try to recover /data/spark-x/executor-x/blockmgr-41a810ea-9503-447b-afc7-1cb104cd03cf/11/shuffle_0_11160_0.data 23/08/09 21:09:21 INFO KubernetesLocalDiskShuffleExecutorComponents: Try to recover /data/spark-x/executor-x/blockmgr-41a810ea-9503-447b-afc7-1cb104cd03cf/0e/shuffle_0_10063_0.data 23/08/09 21:09:21 INFO KubernetesLocalDiskShuffleExecutorComponents: Try to recover /data/spark-x/executor-x/blockmgr-41a810ea-9503-447b-afc7-1cb104cd03cf/0e/shuffle_0_10283_0.data 23/08/09 21:09:21 INFO KubernetesLocalDiskShuffleExecutorComponents: Ignore a non-shuffle block file. ``` Closes #42417 from dongjoon-hyun/SPARK-44745. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- docs/running-on-kubernetes.md | 7 +++ 1 file changed, 7 insertions(+) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index d3953592c4e..707a76196f3 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -394,6 +394,13 @@ spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount. spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly=false ``` +To enable shuffle data recovery feature via the built-in `KubernetesLocalDiskShuffleDataIO` plugin, we need to have the followings. You may want to enable `spark.kubernetes.driver.waitToReusePersistentVolumeClaim` additionally. + +``` +spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path=/data/spark-x/executor-x +spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO +``` + If no volume is set as local storage, Spark uses temporary scratch space to spill data to disk during shuffles and other operations. When using Kubernetes as the resource manager the pods will be created with an [emptyDir](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir) volume mounted for each directory listed in `spark.local.dir` or the environment variable `SPARK_LOCAL_DIRS` . If no directories are explicitly specified then a default directory is created and configured [...] `emptyDir` volumes use the ephemeral storage feature of Kubernetes and do not persist beyond the life of the pod. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-44503][SQL] Project any PARTITION BY expressions not already returned from Python UDTF TABLE arguments
This is an automated email from the ASF dual-hosted git repository. ueshin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 87298db43d9 [SPARK-44503][SQL] Project any PARTITION BY expressions not already returned from Python UDTF TABLE arguments 87298db43d9 is described below commit 87298db43d9a33fa3a3986f274442a17aad74dc3 Author: Daniel Tenedorio AuthorDate: Wed Aug 9 10:27:07 2023 -0700 [SPARK-44503][SQL] Project any PARTITION BY expressions not already returned from Python UDTF TABLE arguments ### What changes were proposed in this pull request? This PR adds a projection when any Python UDTF TABLE argument contains PARTITION BY expressions that are not simple attributes that are already present in the output of the relation. For example: ``` CREATE TABLE t(d DATE, y INT) USING PARQUET; INSERT INTO t VALUES ... SELECT * FROM UDTF(TABLE(t) PARTITION BY EXTRACT(YEAR FROM d) ORDER BY y ASC); ``` This will generate a plan like: ``` +- Sort (y ASC) +- RepartitionByExpressions (partition_by_0) +- Project (t.d, t.y, EXTRACT(YEAR FROM t.d) AS partition_by_0) +- LogicalRelation "t" ``` ### Why are the changes needed? We project the PARTITION BY expressions so that their resulting values appear in attributes that the Python UDTF interpreter can simply inspect in order to know when the partition boundaries have changed. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? This PR adds unit test coverage. Closes #42351 from dtenedor/partition-by-execution. Authored-by: Daniel Tenedorio Signed-off-by: Takuya UESHIN --- .../FunctionTableSubqueryArgumentExpression.scala | 77 +++-- .../sql/execution/python/PythonUDTFSuite.scala | 127 +++-- 2 files changed, 184 insertions(+), 20 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/FunctionTableSubqueryArgumentExpression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/FunctionTableSubqueryArgumentExpression.scala index e7a4888125d..daa0751eedf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/FunctionTableSubqueryArgumentExpression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/FunctionTableSubqueryArgumentExpression.scala @@ -104,23 +104,80 @@ case class FunctionTableSubqueryArgumentExpression( // the query plan. var subquery = plan if (partitionByExpressions.nonEmpty) { - subquery = RepartitionByExpression( -partitionExpressions = partitionByExpressions, -child = subquery, -optNumPartitions = None) + // Add a projection to project each of the partitioning expressions that it is not a simple + // attribute that is already present in the plan output. Then add a sort operation by the + // partition keys (plus any explicit ORDER BY items) since after the hash-based shuffle + // operation, the rows from several partitions may arrive interleaved. In this way, the Python + // UDTF evaluator is able to inspect the values of the partitioning expressions for adjacent + // rows in order to determine when each partition ends and the next one begins. + subquery = Project( +projectList = subquery.output ++ extraProjectedPartitioningExpressions, +child = subquery) + val partitioningAttributes = partitioningExpressionIndexes.map(i => subquery.output(i)) + subquery = Sort( +order = partitioningAttributes.map(e => SortOrder(e, Ascending)) ++ orderByExpressions, +global = false, +child = RepartitionByExpression( + partitionExpressions = partitioningAttributes, + optNumPartitions = None, + child = subquery)) } if (withSinglePartition) { subquery = Repartition( numPartitions = 1, shuffle = true, child = subquery) -} -if (orderByExpressions.nonEmpty) { - subquery = Sort( -order = orderByExpressions, -global = false, -child = subquery) + if (orderByExpressions.nonEmpty) { +subquery = Sort( + order = orderByExpressions, + global = false, + child = subquery) + } } Project(Seq(Alias(CreateStruct(subquery.output), "c")()), subquery) } + + /** + * These are the indexes of the PARTITION BY expressions within the concatenation of the child's + * output attributes and the [[extraProjectedPartitioningExpressions]]. We send these indexes to + * the Python UDTF evaluator so it knows which expressions to compare on adjacent rows to know + * when the partition has
[spark] branch branch-3.5 updated: [SPARK-44720][CONNECT] Make Dataset use Encoder instead of AgnosticEncoder
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 6eca5da8d3f [SPARK-44720][CONNECT] Make Dataset use Encoder instead of AgnosticEncoder 6eca5da8d3f is described below commit 6eca5da8d3fba6d1e385f06494030996241937fa Author: Herman van Hovell AuthorDate: Wed Aug 9 15:58:18 2023 +0200 [SPARK-44720][CONNECT] Make Dataset use Encoder instead of AgnosticEncoder ### What changes were proposed in this pull request? Make the Spark Connect Dataset use Encoder instead of AgnosticEncoder ### Why are the changes needed? We want to improve binary compatibility between the Spark Connect Scala Client and the original sql/core APIs. ### Does this PR introduce _any_ user-facing change? Yes. It changes the type of `Dataset.encoder` from `AgnosticEncoder` to `Encoder`. ### How was this patch tested? Existing tests. Closes #42396 from hvanhovell/SPARK-44720. Authored-by: Herman van Hovell Signed-off-by: Herman van Hovell (cherry picked from commit be9ffb37585fe421705ceaa52fe49b89c50703a3) Signed-off-by: Herman van Hovell --- .../main/scala/org/apache/spark/sql/Dataset.scala | 87 -- .../apache/spark/sql/KeyValueGroupedDataset.scala | 6 +- .../spark/sql/streaming/DataStreamWriter.scala | 2 +- .../CheckConnectJvmClientCompatibility.scala | 3 - 4 files changed, 50 insertions(+), 48 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala index 5f263903c8b..2d72ea6bda8 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -128,11 +128,13 @@ import org.apache.spark.util.SparkClassUtils class Dataset[T] private[sql] ( val sparkSession: SparkSession, @DeveloperApi val plan: proto.Plan, -val encoder: AgnosticEncoder[T]) +val encoder: Encoder[T]) extends Serializable { // Make sure we don't forget to set plan id. assert(plan.getRoot.getCommon.hasPlanId) + private[sql] val agnosticEncoder: AgnosticEncoder[T] = encoderFor(encoder) + override def toString: String = { try { val builder = new mutable.StringBuilder @@ -828,7 +830,7 @@ class Dataset[T] private[sql] ( } private def buildSort(global: Boolean, sortExprs: Seq[Column]): Dataset[T] = { -sparkSession.newDataset(encoder) { builder => +sparkSession.newDataset(agnosticEncoder) { builder => builder.getSortBuilder .setInput(plan.getRoot) .setIsGlobal(global) @@ -878,8 +880,8 @@ class Dataset[T] private[sql] ( ProductEncoder[(T, U)]( ClassTag(SparkClassUtils.getContextOrSparkClassLoader.loadClass(s"scala.Tuple2")), Seq( - EncoderField(s"_1", this.encoder, leftNullable, Metadata.empty), - EncoderField(s"_2", other.encoder, rightNullable, Metadata.empty))) + EncoderField(s"_1", this.agnosticEncoder, leftNullable, Metadata.empty), + EncoderField(s"_2", other.agnosticEncoder, rightNullable, Metadata.empty))) sparkSession.newDataset(tupleEncoder) { builder => val joinBuilder = builder.getJoinBuilder @@ -889,8 +891,8 @@ class Dataset[T] private[sql] ( .setJoinType(joinTypeValue) .setJoinCondition(condition.expr) .setJoinDataType(joinBuilder.getJoinDataTypeBuilder - .setIsLeftStruct(this.encoder.isStruct) - .setIsRightStruct(other.encoder.isStruct)) + .setIsLeftStruct(this.agnosticEncoder.isStruct) + .setIsRightStruct(other.agnosticEncoder.isStruct)) } } @@ -1010,13 +1012,13 @@ class Dataset[T] private[sql] ( * @since 3.4.0 */ @scala.annotation.varargs - def hint(name: String, parameters: Any*): Dataset[T] = sparkSession.newDataset(encoder) { -builder => + def hint(name: String, parameters: Any*): Dataset[T] = +sparkSession.newDataset(agnosticEncoder) { builder => builder.getHintBuilder .setInput(plan.getRoot) .setName(name) .addAllParameters(parameters.map(p => functions.lit(p).expr).asJava) - } +} private def getPlanId: Option[Long] = if (plan.getRoot.hasCommon && plan.getRoot.getCommon.hasPlanId) { @@ -1056,7 +1058,7 @@ class Dataset[T] private[sql] ( * @group typedrel * @since 3.4.0 */ - def as(alias: String): Dataset[T] = sparkSession.newDataset(encoder) { builder => + def as(alias: String): Dataset[T] = sparkSession.newDataset(agnosticEncoder) { builder => builder.getSubqueryAliasBuilder .setInput(plan.getRoot) .setAlias(alias) @@ -1238,8 +1240,9 @@
[spark] branch master updated: [SPARK-44720][CONNECT] Make Dataset use Encoder instead of AgnosticEncoder
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new be9ffb37585 [SPARK-44720][CONNECT] Make Dataset use Encoder instead of AgnosticEncoder be9ffb37585 is described below commit be9ffb37585fe421705ceaa52fe49b89c50703a3 Author: Herman van Hovell AuthorDate: Wed Aug 9 15:58:18 2023 +0200 [SPARK-44720][CONNECT] Make Dataset use Encoder instead of AgnosticEncoder ### What changes were proposed in this pull request? Make the Spark Connect Dataset use Encoder instead of AgnosticEncoder ### Why are the changes needed? We want to improve binary compatibility between the Spark Connect Scala Client and the original sql/core APIs. ### Does this PR introduce _any_ user-facing change? Yes. It changes the type of `Dataset.encoder` from `AgnosticEncoder` to `Encoder`. ### How was this patch tested? Existing tests. Closes #42396 from hvanhovell/SPARK-44720. Authored-by: Herman van Hovell Signed-off-by: Herman van Hovell --- .../main/scala/org/apache/spark/sql/Dataset.scala | 87 -- .../apache/spark/sql/KeyValueGroupedDataset.scala | 6 +- .../spark/sql/streaming/DataStreamWriter.scala | 2 +- .../CheckConnectJvmClientCompatibility.scala | 3 - 4 files changed, 50 insertions(+), 48 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala index 5f263903c8b..2d72ea6bda8 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -128,11 +128,13 @@ import org.apache.spark.util.SparkClassUtils class Dataset[T] private[sql] ( val sparkSession: SparkSession, @DeveloperApi val plan: proto.Plan, -val encoder: AgnosticEncoder[T]) +val encoder: Encoder[T]) extends Serializable { // Make sure we don't forget to set plan id. assert(plan.getRoot.getCommon.hasPlanId) + private[sql] val agnosticEncoder: AgnosticEncoder[T] = encoderFor(encoder) + override def toString: String = { try { val builder = new mutable.StringBuilder @@ -828,7 +830,7 @@ class Dataset[T] private[sql] ( } private def buildSort(global: Boolean, sortExprs: Seq[Column]): Dataset[T] = { -sparkSession.newDataset(encoder) { builder => +sparkSession.newDataset(agnosticEncoder) { builder => builder.getSortBuilder .setInput(plan.getRoot) .setIsGlobal(global) @@ -878,8 +880,8 @@ class Dataset[T] private[sql] ( ProductEncoder[(T, U)]( ClassTag(SparkClassUtils.getContextOrSparkClassLoader.loadClass(s"scala.Tuple2")), Seq( - EncoderField(s"_1", this.encoder, leftNullable, Metadata.empty), - EncoderField(s"_2", other.encoder, rightNullable, Metadata.empty))) + EncoderField(s"_1", this.agnosticEncoder, leftNullable, Metadata.empty), + EncoderField(s"_2", other.agnosticEncoder, rightNullable, Metadata.empty))) sparkSession.newDataset(tupleEncoder) { builder => val joinBuilder = builder.getJoinBuilder @@ -889,8 +891,8 @@ class Dataset[T] private[sql] ( .setJoinType(joinTypeValue) .setJoinCondition(condition.expr) .setJoinDataType(joinBuilder.getJoinDataTypeBuilder - .setIsLeftStruct(this.encoder.isStruct) - .setIsRightStruct(other.encoder.isStruct)) + .setIsLeftStruct(this.agnosticEncoder.isStruct) + .setIsRightStruct(other.agnosticEncoder.isStruct)) } } @@ -1010,13 +1012,13 @@ class Dataset[T] private[sql] ( * @since 3.4.0 */ @scala.annotation.varargs - def hint(name: String, parameters: Any*): Dataset[T] = sparkSession.newDataset(encoder) { -builder => + def hint(name: String, parameters: Any*): Dataset[T] = +sparkSession.newDataset(agnosticEncoder) { builder => builder.getHintBuilder .setInput(plan.getRoot) .setName(name) .addAllParameters(parameters.map(p => functions.lit(p).expr).asJava) - } +} private def getPlanId: Option[Long] = if (plan.getRoot.hasCommon && plan.getRoot.getCommon.hasPlanId) { @@ -1056,7 +1058,7 @@ class Dataset[T] private[sql] ( * @group typedrel * @since 3.4.0 */ - def as(alias: String): Dataset[T] = sparkSession.newDataset(encoder) { builder => + def as(alias: String): Dataset[T] = sparkSession.newDataset(agnosticEncoder) { builder => builder.getSubqueryAliasBuilder .setInput(plan.getRoot) .setAlias(alias) @@ -1238,8 +1240,9 @@ class Dataset[T] private[sql] ( * @group typedrel * @since 3.4.0 */ - def filter(condition: Column):
[spark] branch branch-3.5 updated: [SPARK-43429][CONNECT] Deflake SparkSessionSuite
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 767b2b5a8dc [SPARK-43429][CONNECT] Deflake SparkSessionSuite 767b2b5a8dc is described below commit 767b2b5a8dc8d655ab6787845a87556f15456aaa Author: Herman van Hovell AuthorDate: Wed Aug 9 20:42:20 2023 +0900 [SPARK-43429][CONNECT] Deflake SparkSessionSuite ### What changes were proposed in this pull request? This PR tries to fix flakiness in the `SparkSessionSuite.active session in multiple threads` test. There was a chance that modification could happen before the other thread could check the state. This PR decouples modifcations from checks. ### Why are the changes needed? Flaky tests are no bueno. ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? It is a test. Closes #42406 from hvanhovell/SPARK-43429-deflake. Authored-by: Herman van Hovell Signed-off-by: Hyukjin Kwon (cherry picked from commit 27c5a1f9f0e322fad0da300afdb75eadd8224b15) Signed-off-by: Hyukjin Kwon --- .../org/apache/spark/sql/SparkSessionSuite.scala | 32 ++ 1 file changed, 32 insertions(+) diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionSuite.scala index f06744399f8..2d7ded2d688 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionSuite.scala @@ -171,42 +171,74 @@ class SparkSessionSuite extends ConnectFunSuite { try { val script1 = execute { phaser => +// Step 0 - check initial state phaser.arriveAndAwaitAdvance() assert(SparkSession.getDefaultSession.contains(session1)) assert(SparkSession.getActiveSession.contains(session2)) +// Step 1 - new active session in script 2 +phaser.arriveAndAwaitAdvance() + +// Step2 - script 1 is unchanged, script 2 has new active session phaser.arriveAndAwaitAdvance() assert(SparkSession.getDefaultSession.contains(session1)) assert(SparkSession.getActiveSession.contains(session2)) + +// Step 3 - close session 1, no more default session in both scripts +phaser.arriveAndAwaitAdvance() session1.close() +// Step 4 - no default session, same active session. phaser.arriveAndAwaitAdvance() assert(SparkSession.getDefaultSession.isEmpty) assert(SparkSession.getActiveSession.contains(session2)) + +// Step 5 - clear active session in script 1 +phaser.arriveAndAwaitAdvance() SparkSession.clearActiveSession() +// Step 6 - no default/no active session in script 1, script2 unchanged. phaser.arriveAndAwaitAdvance() assert(SparkSession.getDefaultSession.isEmpty) assert(SparkSession.getActiveSession.isEmpty) + +// Step 7 - close active session in script2 +phaser.arriveAndAwaitAdvance() } val script2 = execute { phaser => +// Step 0 - check initial state phaser.arriveAndAwaitAdvance() assert(SparkSession.getDefaultSession.contains(session1)) assert(SparkSession.getActiveSession.contains(session2)) + +// Step 1 - new active session in script 2 +phaser.arriveAndAwaitAdvance() SparkSession.clearActiveSession() val internalSession = SparkSession.builder().remote(connectionString3).getOrCreate() +// Step2 - script 1 is unchanged, script 2 has new active session phaser.arriveAndAwaitAdvance() assert(SparkSession.getDefaultSession.contains(session1)) assert(SparkSession.getActiveSession.contains(internalSession)) +// Step 3 - close session 1, no more default session in both scripts +phaser.arriveAndAwaitAdvance() + +// Step 4 - no default session, same active session. phaser.arriveAndAwaitAdvance() assert(SparkSession.getDefaultSession.isEmpty) assert(SparkSession.getActiveSession.contains(internalSession)) +// Step 5 - clear active session in script 1 +phaser.arriveAndAwaitAdvance() + +// Step 6 - no default/no active session in script 1, script2 unchanged. phaser.arriveAndAwaitAdvance() assert(SparkSession.getDefaultSession.isEmpty) assert(SparkSession.getActiveSession.contains(internalSession)) + +// Step 7 - close active session in script2 +phaser.arriveAndAwaitAdvance() internalSession.close() assert(SparkSession.getActiveSession.isEmpty) }
[spark] branch master updated: [SPARK-43429][CONNECT] Deflake SparkSessionSuite
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 27c5a1f9f0e [SPARK-43429][CONNECT] Deflake SparkSessionSuite 27c5a1f9f0e is described below commit 27c5a1f9f0e322fad0da300afdb75eadd8224b15 Author: Herman van Hovell AuthorDate: Wed Aug 9 20:42:20 2023 +0900 [SPARK-43429][CONNECT] Deflake SparkSessionSuite ### What changes were proposed in this pull request? This PR tries to fix flakiness in the `SparkSessionSuite.active session in multiple threads` test. There was a chance that modification could happen before the other thread could check the state. This PR decouples modifcations from checks. ### Why are the changes needed? Flaky tests are no bueno. ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? It is a test. Closes #42406 from hvanhovell/SPARK-43429-deflake. Authored-by: Herman van Hovell Signed-off-by: Hyukjin Kwon --- .../org/apache/spark/sql/SparkSessionSuite.scala | 32 ++ 1 file changed, 32 insertions(+) diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionSuite.scala index f06744399f8..2d7ded2d688 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionSuite.scala @@ -171,42 +171,74 @@ class SparkSessionSuite extends ConnectFunSuite { try { val script1 = execute { phaser => +// Step 0 - check initial state phaser.arriveAndAwaitAdvance() assert(SparkSession.getDefaultSession.contains(session1)) assert(SparkSession.getActiveSession.contains(session2)) +// Step 1 - new active session in script 2 +phaser.arriveAndAwaitAdvance() + +// Step2 - script 1 is unchanged, script 2 has new active session phaser.arriveAndAwaitAdvance() assert(SparkSession.getDefaultSession.contains(session1)) assert(SparkSession.getActiveSession.contains(session2)) + +// Step 3 - close session 1, no more default session in both scripts +phaser.arriveAndAwaitAdvance() session1.close() +// Step 4 - no default session, same active session. phaser.arriveAndAwaitAdvance() assert(SparkSession.getDefaultSession.isEmpty) assert(SparkSession.getActiveSession.contains(session2)) + +// Step 5 - clear active session in script 1 +phaser.arriveAndAwaitAdvance() SparkSession.clearActiveSession() +// Step 6 - no default/no active session in script 1, script2 unchanged. phaser.arriveAndAwaitAdvance() assert(SparkSession.getDefaultSession.isEmpty) assert(SparkSession.getActiveSession.isEmpty) + +// Step 7 - close active session in script2 +phaser.arriveAndAwaitAdvance() } val script2 = execute { phaser => +// Step 0 - check initial state phaser.arriveAndAwaitAdvance() assert(SparkSession.getDefaultSession.contains(session1)) assert(SparkSession.getActiveSession.contains(session2)) + +// Step 1 - new active session in script 2 +phaser.arriveAndAwaitAdvance() SparkSession.clearActiveSession() val internalSession = SparkSession.builder().remote(connectionString3).getOrCreate() +// Step2 - script 1 is unchanged, script 2 has new active session phaser.arriveAndAwaitAdvance() assert(SparkSession.getDefaultSession.contains(session1)) assert(SparkSession.getActiveSession.contains(internalSession)) +// Step 3 - close session 1, no more default session in both scripts +phaser.arriveAndAwaitAdvance() + +// Step 4 - no default session, same active session. phaser.arriveAndAwaitAdvance() assert(SparkSession.getDefaultSession.isEmpty) assert(SparkSession.getActiveSession.contains(internalSession)) +// Step 5 - clear active session in script 1 +phaser.arriveAndAwaitAdvance() + +// Step 6 - no default/no active session in script 1, script2 unchanged. phaser.arriveAndAwaitAdvance() assert(SparkSession.getDefaultSession.isEmpty) assert(SparkSession.getActiveSession.contains(internalSession)) + +// Step 7 - close active session in script2 +phaser.arriveAndAwaitAdvance() internalSession.close() assert(SparkSession.getActiveSession.isEmpty) } - To unsubscribe, e-mail:
[spark] branch master updated: [SPARK-42620][PS] Add `inclusive` parameter for (DataFrame|Series).between_time
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 0b757d3b610 [SPARK-42620][PS] Add `inclusive` parameter for (DataFrame|Series).between_time 0b757d3b610 is described below commit 0b757d3b610a90e70828d5e21521810f923c6aed Author: zhyhimont AuthorDate: Wed Aug 9 20:35:15 2023 +0900 [SPARK-42620][PS] Add `inclusive` parameter for (DataFrame|Series).between_time ### What changes were proposed in this pull request? Add `inclusive` parameter for (DataFrame|Series).between_time to support the pandas 2.0.0 ### Why are the changes needed? When pandas 2.0.0 is released, we should match the behavior in pandas API on Spark. ### Does this PR introduce _any_ user-facing change? yes, the API changes Before: ` (DataFrame|Series).between_time(start_time, end_time, include_start, include_end, axis)` After: ` (DataFrame|Series).between_time(start_time, end_time, inclusive, axis)` ### How was this patch tested? Unit tests were updated Closes #40370 from dzhigimont/SPARK-42620-ZH. Lead-authored-by: zhyhimont Co-authored-by: Zhyhimont Dmitry Co-authored-by: Zhyhimont Dmitry Signed-off-by: Hyukjin Kwon --- .../source/migration_guide/pyspark_upgrade.rst | 2 ++ python/pyspark/errors/error_classes.py | 5 python/pyspark/pandas/frame.py | 30 +++- python/pyspark/pandas/series.py| 18 .../pyspark/pandas/tests/frame/test_reindexing.py | 33 +++--- python/pyspark/pandas/tests/series/test_compute.py | 32 ++--- 6 files changed, 86 insertions(+), 34 deletions(-) diff --git a/python/docs/source/migration_guide/pyspark_upgrade.rst b/python/docs/source/migration_guide/pyspark_upgrade.rst index 1b247d46227..da49719579a 100644 --- a/python/docs/source/migration_guide/pyspark_upgrade.rst +++ b/python/docs/source/migration_guide/pyspark_upgrade.rst @@ -32,6 +32,8 @@ Upgrading from PySpark 3.5 to 4.0 * In Spark 4.0, ``na_sentinel`` parameter from ``Index.factorize`` and `Series.factorize`` has been removed from pandas API on Spark, use ``use_na_sentinel`` instead. * In Spark 4.0, ``inplace`` parameter from ``Categorical.add_categories``, ``Categorical.remove_categories``, ``Categorical.set_categories``, ``Categorical.rename_categories``, ``Categorical.reorder_categories``, ``Categorical.as_ordered``, ``Categorical.as_unordered`` have been removed from pandas API on Spark. * In Spark 4.0, ``closed`` parameter from ``ps.date_range`` has been removed from pandas API on Spark. +* In Spark 4.0, ``include_start`` and ``include_end`` parameters from ``DataFrame.between_time`` have been removed from pandas API on Spark, use ``inclusive`` instead. +* In Spark 4.0, ``include_start`` and ``include_end`` parameters from ``Series.between_time`` have been removed from pandas API on Spark, use ``inclusive`` instead. Upgrading from PySpark 3.3 to 3.4 diff --git a/python/pyspark/errors/error_classes.py b/python/pyspark/errors/error_classes.py index bc32afeb87a..c8033478362 100644 --- a/python/pyspark/errors/error_classes.py +++ b/python/pyspark/errors/error_classes.py @@ -853,6 +853,11 @@ ERROR_CLASSES_JSON = """ "Value `` cannot be accessed inside tasks." ] }, + "VALUE_NOT_ALLOWED" : { +"message" : [ + "Value for `` has to be amongst the following values: ." +] + }, "VALUE_NOT_ANY_OR_ALL" : { "message" : [ "Value for `` must be 'any' or 'all', got ''." diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py index 65c43eb7cf4..8fbe1b8f926 100644 --- a/python/pyspark/pandas/frame.py +++ b/python/pyspark/pandas/frame.py @@ -59,6 +59,8 @@ from pandas.api.types import ( # type: ignore[attr-defined] ) from pandas.tseries.frequencies import DateOffset, to_offset +from pyspark.errors import PySparkValueError + if TYPE_CHECKING: from pandas.io.formats.style import Styler @@ -3501,14 +3503,11 @@ defaultdict(, {'col..., 'col...})] ).resolved_copy return DataFrame(internal) -# TODO(SPARK-42620): Add `inclusive` parameter and replace `include_start` & `include_end`. -# See https://github.com/pandas-dev/pandas/issues/43248 def between_time( self, start_time: Union[datetime.time, str], end_time: Union[datetime.time, str], -include_start: bool = True, -include_end: bool = True, +inclusive: str = "both", axis: Axis = 0, ) -> "DataFrame": """ @@ -3523,15 +3522,10 @@ defaultdict(, {'col..., 'col...})] Initial time as a time filter limit. end_time : datetime.time or str
[spark] branch branch-3.5 updated: Revert "[SPARK-43660][CONNECT][PS][FOLLOWUP] Remove JVM dependency for resample"
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 39b3dd72bec Revert "[SPARK-43660][CONNECT][PS][FOLLOWUP] Remove JVM dependency for resample" 39b3dd72bec is described below commit 39b3dd72bec3f78b55348007dbeaab67d15165f6 Author: Hyukjin Kwon AuthorDate: Wed Aug 9 20:34:19 2023 +0900 Revert "[SPARK-43660][CONNECT][PS][FOLLOWUP] Remove JVM dependency for resample" This reverts commit 2e566a5b6d3871315b9115753ebb1aab75cdbe13. --- python/pyspark/pandas/resample.py | 26 +- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/python/pyspark/pandas/resample.py b/python/pyspark/pandas/resample.py index 0d2c3cc753c..30f8c9d3169 100644 --- a/python/pyspark/pandas/resample.py +++ b/python/pyspark/pandas/resample.py @@ -67,6 +67,7 @@ from pyspark.pandas.utils import ( scol_for, verify_temp_column_name, ) +from pyspark.sql.utils import is_remote from pyspark.pandas.spark.functions import timestampdiff @@ -144,15 +145,22 @@ class Resampler(Generic[FrameLike], metaclass=ABCMeta): def get_make_interval( # type: ignore[return] self, unit: str, col: Union[Column, int, float] ) -> Column: -col = col if not isinstance(col, (int, float)) else F.lit(col) # type: ignore[assignment] -if unit == "MONTH": -return F.make_interval(months=col) # type: ignore -if unit == "HOUR": -return F.make_interval(hours=col) # type: ignore -if unit == "MINUTE": -return F.make_interval(mins=col) # type: ignore -if unit == "SECOND": -return F.make_interval(secs=col) # type: ignore +if is_remote(): +from pyspark.sql.connect.functions import lit, make_interval + +col = col if not isinstance(col, (int, float)) else lit(col) # type: ignore[assignment] +if unit == "MONTH": +return make_interval(months=col) # type: ignore +if unit == "HOUR": +return make_interval(hours=col) # type: ignore +if unit == "MINUTE": +return make_interval(mins=col) # type: ignore +if unit == "SECOND": +return make_interval(secs=col) # type: ignore +else: +sql_utils = SparkContext._active_spark_context._jvm.PythonSQLUtils +col = col._jc if isinstance(col, Column) else F.lit(col)._jc +return sql_utils.makeInterval(unit, col) def _bin_timestamp(self, origin: pd.Timestamp, ts_scol: Column) -> Column: key_type = self._resamplekey_type - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: Revert "[SPARK-43660][CONNECT][PS][FOLLOWUP] Remove JVM dependency for resample"
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new e5e0f389dad Revert "[SPARK-43660][CONNECT][PS][FOLLOWUP] Remove JVM dependency for resample" e5e0f389dad is described below commit e5e0f389dad678e664d5e9897d40a8765a0dc2e5 Author: Hyukjin Kwon AuthorDate: Wed Aug 9 20:33:50 2023 +0900 Revert "[SPARK-43660][CONNECT][PS][FOLLOWUP] Remove JVM dependency for resample" This reverts commit b19a83e236b2732c2af6d0a4d79ebcbca4105c59. --- python/pyspark/pandas/resample.py | 26 +- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/python/pyspark/pandas/resample.py b/python/pyspark/pandas/resample.py index 0d2c3cc753c..30f8c9d3169 100644 --- a/python/pyspark/pandas/resample.py +++ b/python/pyspark/pandas/resample.py @@ -67,6 +67,7 @@ from pyspark.pandas.utils import ( scol_for, verify_temp_column_name, ) +from pyspark.sql.utils import is_remote from pyspark.pandas.spark.functions import timestampdiff @@ -144,15 +145,22 @@ class Resampler(Generic[FrameLike], metaclass=ABCMeta): def get_make_interval( # type: ignore[return] self, unit: str, col: Union[Column, int, float] ) -> Column: -col = col if not isinstance(col, (int, float)) else F.lit(col) # type: ignore[assignment] -if unit == "MONTH": -return F.make_interval(months=col) # type: ignore -if unit == "HOUR": -return F.make_interval(hours=col) # type: ignore -if unit == "MINUTE": -return F.make_interval(mins=col) # type: ignore -if unit == "SECOND": -return F.make_interval(secs=col) # type: ignore +if is_remote(): +from pyspark.sql.connect.functions import lit, make_interval + +col = col if not isinstance(col, (int, float)) else lit(col) # type: ignore[assignment] +if unit == "MONTH": +return make_interval(months=col) # type: ignore +if unit == "HOUR": +return make_interval(hours=col) # type: ignore +if unit == "MINUTE": +return make_interval(mins=col) # type: ignore +if unit == "SECOND": +return make_interval(secs=col) # type: ignore +else: +sql_utils = SparkContext._active_spark_context._jvm.PythonSQLUtils +col = col._jc if isinstance(col, Column) else F.lit(col)._jc +return sql_utils.makeInterval(unit, col) def _bin_timestamp(self, origin: pd.Timestamp, ts_scol: Column) -> Column: key_type = self._resamplekey_type - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.5 updated: [SPARK-43660][CONNECT][PS][FOLLOWUP] Remove JVM dependency for resample
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 2e566a5b6d3 [SPARK-43660][CONNECT][PS][FOLLOWUP] Remove JVM dependency for resample 2e566a5b6d3 is described below commit 2e566a5b6d3871315b9115753ebb1aab75cdbe13 Author: itholic AuthorDate: Wed Aug 9 20:32:48 2023 +0900 [SPARK-43660][CONNECT][PS][FOLLOWUP] Remove JVM dependency for resample ### What changes were proposed in this pull request? This is follow-up for https://github.com/apache/spark/pull/41877 to remove JVM dependency. ### Why are the changes needed? To remove JVM dependency from Pandas API on Spark with Spark Connect. ### Does this PR introduce _any_ user-facing change? No, it's internal handling. ### How was this patch tested? The existing UT. Closes #42410 from itholic/resample_followup. Authored-by: itholic Signed-off-by: Hyukjin Kwon (cherry picked from commit b19a83e236b2732c2af6d0a4d79ebcbca4105c59) Signed-off-by: Hyukjin Kwon --- python/pyspark/pandas/resample.py | 26 +- 1 file changed, 9 insertions(+), 17 deletions(-) diff --git a/python/pyspark/pandas/resample.py b/python/pyspark/pandas/resample.py index 30f8c9d3169..0d2c3cc753c 100644 --- a/python/pyspark/pandas/resample.py +++ b/python/pyspark/pandas/resample.py @@ -67,7 +67,6 @@ from pyspark.pandas.utils import ( scol_for, verify_temp_column_name, ) -from pyspark.sql.utils import is_remote from pyspark.pandas.spark.functions import timestampdiff @@ -145,22 +144,15 @@ class Resampler(Generic[FrameLike], metaclass=ABCMeta): def get_make_interval( # type: ignore[return] self, unit: str, col: Union[Column, int, float] ) -> Column: -if is_remote(): -from pyspark.sql.connect.functions import lit, make_interval - -col = col if not isinstance(col, (int, float)) else lit(col) # type: ignore[assignment] -if unit == "MONTH": -return make_interval(months=col) # type: ignore -if unit == "HOUR": -return make_interval(hours=col) # type: ignore -if unit == "MINUTE": -return make_interval(mins=col) # type: ignore -if unit == "SECOND": -return make_interval(secs=col) # type: ignore -else: -sql_utils = SparkContext._active_spark_context._jvm.PythonSQLUtils -col = col._jc if isinstance(col, Column) else F.lit(col)._jc -return sql_utils.makeInterval(unit, col) +col = col if not isinstance(col, (int, float)) else F.lit(col) # type: ignore[assignment] +if unit == "MONTH": +return F.make_interval(months=col) # type: ignore +if unit == "HOUR": +return F.make_interval(hours=col) # type: ignore +if unit == "MINUTE": +return F.make_interval(mins=col) # type: ignore +if unit == "SECOND": +return F.make_interval(secs=col) # type: ignore def _bin_timestamp(self, origin: pd.Timestamp, ts_scol: Column) -> Column: key_type = self._resamplekey_type - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (4f25df1dc25 -> b19a83e236b)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 4f25df1dc25 [SPARK-43979][SQL][FOLLOWUP] transformUpWithNewOutput` should only be used with new outputs add b19a83e236b [SPARK-43660][CONNECT][PS][FOLLOWUP] Remove JVM dependency for resample No new revisions were added by this update. Summary of changes: python/pyspark/pandas/resample.py | 26 +- 1 file changed, 9 insertions(+), 17 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-43979][SQL][FOLLOWUP] transformUpWithNewOutput` should only be used with new outputs
This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 4f25df1dc25 [SPARK-43979][SQL][FOLLOWUP] transformUpWithNewOutput` should only be used with new outputs 4f25df1dc25 is described below commit 4f25df1dc25cc4f002107821cc67e35c1fe0e42c Author: Wenchen Fan AuthorDate: Wed Aug 9 19:16:37 2023 +0800 [SPARK-43979][SQL][FOLLOWUP] transformUpWithNewOutput` should only be used with new outputs ### What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/41475 . It's risky to use `transformUpWithNewOutput` with existing attribute ids. If the plan contains duplicated attribute ids somewhere, then we will hit conflicting attributes and an assertion error will be thrown by `QueryPlan#transformUpWithNewOutput`. This PR takes a different approach. We canonicalize the plan first and then remove the alias-only project. Then we don't need `transformUpWithNewOutput` anymore as all attribute ids are normalized in the canonicalized plan. ### Why are the changes needed? fix potential bugs ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing tests Closes #42408 from cloud-fan/collect-metrics. Authored-by: Wenchen Fan Signed-off-by: Kent Yao --- .../apache/spark/sql/catalyst/analysis/CheckAnalysis.scala | 12 ++-- 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index fee5660017c..0b953fc2b61 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -1080,13 +1080,13 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB def check(plan: LogicalPlan): Unit = plan.foreach { node => node match { case metrics @ CollectMetrics(name, _, _) => - val simplifiedMetrics = simplifyPlanForCollectedMetrics(metrics) + val simplifiedMetrics = simplifyPlanForCollectedMetrics(metrics.canonicalized) metricsMap.get(name) match { case Some(other) => - val simplifiedOther = simplifyPlanForCollectedMetrics(other) + val simplifiedOther = simplifyPlanForCollectedMetrics(other.canonicalized) // Exact duplicates are allowed. They can be the result // of a CTE that is used multiple times or a self join. - if (!simplifiedMetrics.sameResult(simplifiedOther)) { + if (simplifiedMetrics != simplifiedOther) { failAnalysis( errorClass = "DUPLICATED_METRICS_NAME", messageParameters = Map("metricName" -> name)) @@ -,7 +,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB * duplicates metric definition. */ private def simplifyPlanForCollectedMetrics(plan: LogicalPlan): LogicalPlan = { -plan.transformUpWithNewOutput { +plan.resolveOperators { case p: Project if p.projectList.size == p.child.output.size => val assignExprIdOnly = p.projectList.zip(p.child.output).forall { case (left: Alias, right: Attribute) => @@ -1119,9 +1119,9 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB case _ => false } if (assignExprIdOnly) { - (p.child, p.output.zip(p.child.output)) + p.child } else { - (p, Nil) + p } } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.5 updated: [SPARK-43979][SQL][FOLLOWUP] transformUpWithNewOutput` should only be used with new outputs
This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new b8204d1b89e [SPARK-43979][SQL][FOLLOWUP] transformUpWithNewOutput` should only be used with new outputs b8204d1b89e is described below commit b8204d1b89eea0c32e5269fb155651157e2c96e8 Author: Wenchen Fan AuthorDate: Wed Aug 9 19:16:37 2023 +0800 [SPARK-43979][SQL][FOLLOWUP] transformUpWithNewOutput` should only be used with new outputs ### What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/41475 . It's risky to use `transformUpWithNewOutput` with existing attribute ids. If the plan contains duplicated attribute ids somewhere, then we will hit conflicting attributes and an assertion error will be thrown by `QueryPlan#transformUpWithNewOutput`. This PR takes a different approach. We canonicalize the plan first and then remove the alias-only project. Then we don't need `transformUpWithNewOutput` anymore as all attribute ids are normalized in the canonicalized plan. ### Why are the changes needed? fix potential bugs ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing tests Closes #42408 from cloud-fan/collect-metrics. Authored-by: Wenchen Fan Signed-off-by: Kent Yao (cherry picked from commit 4f25df1dc25cc4f002107821cc67e35c1fe0e42c) Signed-off-by: Kent Yao --- .../apache/spark/sql/catalyst/analysis/CheckAnalysis.scala | 12 ++-- 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index e198fd58953..848749f9e3b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -1071,13 +1071,13 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB def check(plan: LogicalPlan): Unit = plan.foreach { node => node match { case metrics @ CollectMetrics(name, _, _) => - val simplifiedMetrics = simplifyPlanForCollectedMetrics(metrics) + val simplifiedMetrics = simplifyPlanForCollectedMetrics(metrics.canonicalized) metricsMap.get(name) match { case Some(other) => - val simplifiedOther = simplifyPlanForCollectedMetrics(other) + val simplifiedOther = simplifyPlanForCollectedMetrics(other.canonicalized) // Exact duplicates are allowed. They can be the result // of a CTE that is used multiple times or a self join. - if (!simplifiedMetrics.sameResult(simplifiedOther)) { + if (simplifiedMetrics != simplifiedOther) { failAnalysis( errorClass = "DUPLICATED_METRICS_NAME", messageParameters = Map("metricName" -> name)) @@ -1102,7 +1102,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB * duplicates metric definition. */ private def simplifyPlanForCollectedMetrics(plan: LogicalPlan): LogicalPlan = { -plan.transformUpWithNewOutput { +plan.resolveOperators { case p: Project if p.projectList.size == p.child.output.size => val assignExprIdOnly = p.projectList.zip(p.child.output).forall { case (left: Alias, right: Attribute) => @@ -1110,9 +1110,9 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB case _ => false } if (assignExprIdOnly) { - (p.child, p.output.zip(p.child.output)) + p.child } else { - (p, Nil) + p } } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.5 updated: [SPARK-44738][PYTHON][CONNECT] Add missing client metadata to calls
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 03dfd4747ae [SPARK-44738][PYTHON][CONNECT] Add missing client metadata to calls 03dfd4747ae is described below commit 03dfd4747ae3b59e7e3ac348c30e82b63e1ed269 Author: Martin Grund AuthorDate: Wed Aug 9 18:08:23 2023 +0900 [SPARK-44738][PYTHON][CONNECT] Add missing client metadata to calls ### What changes were proposed in this pull request? The refactoring for the re-attachable execution missed properly propagating the client metadata for the individual RPC calls. ### Why are the changes needed? Compatibility. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing UT Closes #42409 from grundprinzip/SPARK-44738. Authored-by: Martin Grund Signed-off-by: Hyukjin Kwon (cherry picked from commit c73660c3e7279f61fe6e2f6bbf88f410f7ce25a1) Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/connect/client/reattach.py | 11 +++ 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/connect/client/reattach.py b/python/pyspark/sql/connect/client/reattach.py index c5c45904c9b..c6b1beaa121 100644 --- a/python/pyspark/sql/connect/client/reattach.py +++ b/python/pyspark/sql/connect/client/reattach.py @@ -135,7 +135,9 @@ class ExecutePlanResponseReattachableIterator(Generator): if not attempt.is_first_try(): # on retry, the iterator is borked, so we need a new one self._iterator = iter( - self._stub.ReattachExecute(self._create_reattach_execute_request()) +self._stub.ReattachExecute( +self._create_reattach_execute_request(), metadata=self._metadata +) ) if self._current is None: @@ -154,7 +156,8 @@ class ExecutePlanResponseReattachableIterator(Generator): while not has_next: self._iterator = iter( self._stub.ReattachExecute( -self._create_reattach_execute_request() + self._create_reattach_execute_request(), +metadata=self._metadata, ) ) # shouldn't change @@ -192,7 +195,7 @@ class ExecutePlanResponseReattachableIterator(Generator): can_retry=SparkConnectClient.retry_exception, **self._retry_policy ): with attempt: -self._stub.ReleaseExecute(request) +self._stub.ReleaseExecute(request, metadata=self._metadata) except Exception as e: warnings.warn(f"ReleaseExecute failed with exception: {e}.") @@ -220,7 +223,7 @@ class ExecutePlanResponseReattachableIterator(Generator): can_retry=SparkConnectClient.retry_exception, **self._retry_policy ): with attempt: -self._stub.ReleaseExecute(request) +self._stub.ReleaseExecute(request, metadata=self._metadata) except Exception as e: warnings.warn(f"ReleaseExecute failed with exception: {e}.") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-44738][PYTHON][CONNECT] Add missing client metadata to calls
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new c73660c3e72 [SPARK-44738][PYTHON][CONNECT] Add missing client metadata to calls c73660c3e72 is described below commit c73660c3e7279f61fe6e2f6bbf88f410f7ce25a1 Author: Martin Grund AuthorDate: Wed Aug 9 18:08:23 2023 +0900 [SPARK-44738][PYTHON][CONNECT] Add missing client metadata to calls ### What changes were proposed in this pull request? The refactoring for the re-attachable execution missed properly propagating the client metadata for the individual RPC calls. ### Why are the changes needed? Compatibility. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing UT Closes #42409 from grundprinzip/SPARK-44738. Authored-by: Martin Grund Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/connect/client/reattach.py | 11 +++ 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/connect/client/reattach.py b/python/pyspark/sql/connect/client/reattach.py index c5c45904c9b..c6b1beaa121 100644 --- a/python/pyspark/sql/connect/client/reattach.py +++ b/python/pyspark/sql/connect/client/reattach.py @@ -135,7 +135,9 @@ class ExecutePlanResponseReattachableIterator(Generator): if not attempt.is_first_try(): # on retry, the iterator is borked, so we need a new one self._iterator = iter( - self._stub.ReattachExecute(self._create_reattach_execute_request()) +self._stub.ReattachExecute( +self._create_reattach_execute_request(), metadata=self._metadata +) ) if self._current is None: @@ -154,7 +156,8 @@ class ExecutePlanResponseReattachableIterator(Generator): while not has_next: self._iterator = iter( self._stub.ReattachExecute( -self._create_reattach_execute_request() + self._create_reattach_execute_request(), +metadata=self._metadata, ) ) # shouldn't change @@ -192,7 +195,7 @@ class ExecutePlanResponseReattachableIterator(Generator): can_retry=SparkConnectClient.retry_exception, **self._retry_policy ): with attempt: -self._stub.ReleaseExecute(request) +self._stub.ReleaseExecute(request, metadata=self._metadata) except Exception as e: warnings.warn(f"ReleaseExecute failed with exception: {e}.") @@ -220,7 +223,7 @@ class ExecutePlanResponseReattachableIterator(Generator): can_retry=SparkConnectClient.retry_exception, **self._retry_policy ): with attempt: -self._stub.ReleaseExecute(request) +self._stub.ReleaseExecute(request, metadata=self._metadata) except Exception as e: warnings.warn(f"ReleaseExecute failed with exception: {e}.") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.5 updated: [SPARK-44551][SQL] Fix behavior of null IN (empty list) in expression execution
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 7520fc160aa [SPARK-44551][SQL] Fix behavior of null IN (empty list) in expression execution 7520fc160aa is described below commit 7520fc160aa75360e29d5527e2a59c5605f144d4 Author: Jack Chen AuthorDate: Wed Aug 9 15:10:51 2023 +0800 [SPARK-44551][SQL] Fix behavior of null IN (empty list) in expression execution ### What changes were proposed in this pull request? `null IN (empty list)` incorrectly evaluates to null, when it should evaluate to false. (The reason it should be false is because a IN (b1, b2) is defined as a = b1 OR a = b2, and an empty IN list is treated as an empty OR which is false. This is specified by ANSI SQL.) Many places in Spark execution (In, InSet, InSubquery) and optimization (OptimizeIn, NullPropagation) implemented this wrong behavior. This is a longstanding correctness issue which has existed since null support for IN expressions was first added to Spark. This PR fixes Spark execution (In, InSet, InSubquery). See previous PR https://github.com/apache/spark/pull/42007 for optimization fixes. The behavior is under a flag, which will be available to revert to the legacy behavior if needed. This flag is set to disable the new behavior until all of the fix PRs are complete. See [this doc](https://docs.google.com/document/d/1k8AY8oyT-GI04SnP7eXttPDnDj-Ek-c3luF2zL6DPNU/edit) for more information. ### Why are the changes needed? Fix wrong SQL semantics ### Does this PR introduce _any_ user-facing change? Not yet, but will fix wrong SQL semantics when enabled ### How was this patch tested? Unit tests PredicateSuite and tests added in previous PR https://github.com/apache/spark/pull/42007 Closes #42163 from jchen5/null-in-empty-exec. Authored-by: Jack Chen Signed-off-by: Wenchen Fan (cherry picked from commit f9058d69b2af774e78677ac4cad55c7c91eb42ae) Signed-off-by: Wenchen Fan --- .../sql/catalyst/expressions/predicates.scala | 196 - .../spark/sql/catalyst/optimizer/expressions.scala | 4 +- .../sql/catalyst/expressions/PredicateSuite.scala | 35 ++-- .../subquery/in-subquery/in-null-semantics.sql | 3 +- .../subquery/in-subquery/in-null-semantics.sql.out | 4 +- .../scala/org/apache/spark/sql/EmptyInSuite.scala | 3 +- 6 files changed, 144 insertions(+), 101 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index ee2ba7c73d1..31b872e04ce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -468,6 +468,8 @@ case class In(value: Expression, list: Seq[Expression]) extends Predicate { override def foldable: Boolean = children.forall(_.foldable) final override val nodePatterns: Seq[TreePattern] = Seq(IN) + private val legacyNullInEmptyBehavior = +SQLConf.get.getConf(SQLConf.LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR) override lazy val canonicalized: Expression = { val basic = withNewChildren(children.map(_.canonicalized)).asInstanceOf[In] @@ -481,88 +483,104 @@ case class In(value: Expression, list: Seq[Expression]) extends Predicate { override def toString: String = s"$value IN ${list.mkString("(", ",", ")")}" override def eval(input: InternalRow): Any = { -val evaluatedValue = value.eval(input) -if (evaluatedValue == null) { - null +if (list.isEmpty && !legacyNullInEmptyBehavior) { + // IN (empty list) is always false under current behavior. + // Under legacy behavior it's null if the left side is null, otherwise false (SPARK-44550). + false } else { - var hasNull = false - list.foreach { e => -val v = e.eval(input) -if (v == null) { - hasNull = true -} else if (ordering.equiv(v, evaluatedValue)) { - return true -} - } - if (hasNull) { + val evaluatedValue = value.eval(input) + if (evaluatedValue == null) { null } else { -false +var hasNull = false +list.foreach { e => + val v = e.eval(input) + if (v == null) { +hasNull = true + } else if (ordering.equiv(v, evaluatedValue)) { +return true + } +} +if (hasNull) { + null +} else { + false +} } } } override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -val javaDataType =
[spark] branch master updated: [SPARK-44551][SQL] Fix behavior of null IN (empty list) in expression execution
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f9058d69b2a [SPARK-44551][SQL] Fix behavior of null IN (empty list) in expression execution f9058d69b2a is described below commit f9058d69b2af774e78677ac4cad55c7c91eb42ae Author: Jack Chen AuthorDate: Wed Aug 9 15:10:51 2023 +0800 [SPARK-44551][SQL] Fix behavior of null IN (empty list) in expression execution ### What changes were proposed in this pull request? `null IN (empty list)` incorrectly evaluates to null, when it should evaluate to false. (The reason it should be false is because a IN (b1, b2) is defined as a = b1 OR a = b2, and an empty IN list is treated as an empty OR which is false. This is specified by ANSI SQL.) Many places in Spark execution (In, InSet, InSubquery) and optimization (OptimizeIn, NullPropagation) implemented this wrong behavior. This is a longstanding correctness issue which has existed since null support for IN expressions was first added to Spark. This PR fixes Spark execution (In, InSet, InSubquery). See previous PR https://github.com/apache/spark/pull/42007 for optimization fixes. The behavior is under a flag, which will be available to revert to the legacy behavior if needed. This flag is set to disable the new behavior until all of the fix PRs are complete. See [this doc](https://docs.google.com/document/d/1k8AY8oyT-GI04SnP7eXttPDnDj-Ek-c3luF2zL6DPNU/edit) for more information. ### Why are the changes needed? Fix wrong SQL semantics ### Does this PR introduce _any_ user-facing change? Not yet, but will fix wrong SQL semantics when enabled ### How was this patch tested? Unit tests PredicateSuite and tests added in previous PR https://github.com/apache/spark/pull/42007 Closes #42163 from jchen5/null-in-empty-exec. Authored-by: Jack Chen Signed-off-by: Wenchen Fan --- .../sql/catalyst/expressions/predicates.scala | 196 - .../spark/sql/catalyst/optimizer/expressions.scala | 4 +- .../sql/catalyst/expressions/PredicateSuite.scala | 35 ++-- .../subquery/in-subquery/in-null-semantics.sql | 3 +- .../subquery/in-subquery/in-null-semantics.sql.out | 4 +- .../scala/org/apache/spark/sql/EmptyInSuite.scala | 3 +- 6 files changed, 144 insertions(+), 101 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index ee2ba7c73d1..31b872e04ce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -468,6 +468,8 @@ case class In(value: Expression, list: Seq[Expression]) extends Predicate { override def foldable: Boolean = children.forall(_.foldable) final override val nodePatterns: Seq[TreePattern] = Seq(IN) + private val legacyNullInEmptyBehavior = +SQLConf.get.getConf(SQLConf.LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR) override lazy val canonicalized: Expression = { val basic = withNewChildren(children.map(_.canonicalized)).asInstanceOf[In] @@ -481,88 +483,104 @@ case class In(value: Expression, list: Seq[Expression]) extends Predicate { override def toString: String = s"$value IN ${list.mkString("(", ",", ")")}" override def eval(input: InternalRow): Any = { -val evaluatedValue = value.eval(input) -if (evaluatedValue == null) { - null +if (list.isEmpty && !legacyNullInEmptyBehavior) { + // IN (empty list) is always false under current behavior. + // Under legacy behavior it's null if the left side is null, otherwise false (SPARK-44550). + false } else { - var hasNull = false - list.foreach { e => -val v = e.eval(input) -if (v == null) { - hasNull = true -} else if (ordering.equiv(v, evaluatedValue)) { - return true -} - } - if (hasNull) { + val evaluatedValue = value.eval(input) + if (evaluatedValue == null) { null } else { -false +var hasNull = false +list.foreach { e => + val v = e.eval(input) + if (v == null) { +hasNull = true + } else if (ordering.equiv(v, evaluatedValue)) { +return true + } +} +if (hasNull) { + null +} else { + false +} } } } override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -val javaDataType = CodeGenerator.javaType(value.dataType) -val valueGen = value.genCode(ctx) -val listGen = list.map(_.genCode(ctx)) -//