[spark] branch master updated: [SPARK-44554][INFRA][FOLLOWUP] Install `IPython` for the Python linter check in the daily test for branch-3.3

2023-08-09 Thread yangjie01
This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 55b07b1367a [SPARK-44554][INFRA][FOLLOWUP] Install `IPython` for the 
Python linter check in the daily test for branch-3.3
55b07b1367a is described below

commit 55b07b1367a78d5431ce2043fcd590fc4409d566
Author: yangjie01 
AuthorDate: Thu Aug 10 12:48:20 2023 +0800

[SPARK-44554][INFRA][FOLLOWUP] Install `IPython` for the Python linter 
check in the daily test for branch-3.3

### What changes were proposed in this pull request?
This pr aims to install `IPython` for the Python linter check in the daily 
test for branch-3.3.

### Why are the changes needed?
To fix the Python linter check in the daily test of branch-3.3.

https://github.com/apache/spark/actions/runs/5805901293/job/15737802378

https://github.com/apache/spark/assets/1475305/59896b6a-b710-4772-9139-189c7ea73fe5;>

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Monitor GA

Closes #42413 from LuciferYang/SPARK-44554-FOLLOWUP.

Authored-by: yangjie01 
Signed-off-by: yangjie01 
---
 .github/workflows/build_and_test.yml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/.github/workflows/build_and_test.yml 
b/.github/workflows/build_and_test.yml
index 5f0505ab98f..6321fd37b1a 100644
--- a/.github/workflows/build_and_test.yml
+++ b/.github/workflows/build_and_test.yml
@@ -664,7 +664,7 @@ jobs:
 # SPARK-44554: Copy from 
https://github.com/apache/spark/blob/073d0b60d31bf68ebacdc005f59b928a5902670f/.github/workflows/build_and_test.yml#L501-L508
 # Should delete this section after SPARK 3.3 EOL.
 python3.9 -m pip install 'flake8==3.9.0' pydata_sphinx_theme 
'mypy==0.920' 'pytest==7.1.3' 'pytest-mypy-plugins==1.9.3' numpydoc 
'jinja2<3.0.0' 'black==21.12b0'
-python3.9 -m pip install 'pandas-stubs==1.2.0.53'
+python3.9 -m pip install 'pandas-stubs==1.2.0.53' ipython
 - name: Install Python linter dependencies for branch-3.4
   if: inputs.branch == 'branch-3.4'
   run: |


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (97cfe45c157 -> 5c94565e75c)

2023-08-09 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 97cfe45c157 [SPARK-44701][PYTHON][TESTS][FOLLOWUP] Keep `torch` in 3.5 
daily GA
 add 5c94565e75c [MINOR][BUILD] Skip `deepspeed` in requirements on MacOS

No new revisions were added by this update.

Summary of changes:
 dev/requirements.txt | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-44701][PYTHON][TESTS][FOLLOWUP] Keep `torch` in 3.5 daily GA

2023-08-09 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 97cfe45c157 [SPARK-44701][PYTHON][TESTS][FOLLOWUP] Keep `torch` in 3.5 
daily GA
97cfe45c157 is described below

commit 97cfe45c157e0b0c179b887617d46567eec6b46a
Author: Ruifeng Zheng 
AuthorDate: Thu Aug 10 12:11:48 2023 +0900

[SPARK-44701][PYTHON][TESTS][FOLLOWUP] Keep `torch` in 3.5 daily GA

### What changes were proposed in this pull request?
after https://github.com/apache/spark/pull/42375, the `pyspark-connect` in 
3.5 daily GA still fails (see 
https://github.com/apache/spark/actions/runs/5805901293)

We don't have similar issue in 3.3 and 3.4.

### Why are the changes needed?
I take a cursory look, `torch` is a must in pyspark-ml now, if we want to 
fix this failure, need to touch many files to change the imports.

I think it not worthwhile, so make this change.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
ci

Closes #42412 from zhengruifeng/test_without_torch.

Authored-by: Ruifeng Zheng 
Signed-off-by: Hyukjin Kwon 
---
 .github/workflows/build_and_test.yml | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/.github/workflows/build_and_test.yml 
b/.github/workflows/build_and_test.yml
index b4559dea42b..5f0505ab98f 100644
--- a/.github/workflows/build_and_test.yml
+++ b/.github/workflows/build_and_test.yml
@@ -378,6 +378,7 @@ jobs:
   SKIP_MIMA: true
   SKIP_PACKAGING: true
   METASPACE_SIZE: 1g
+  BRANCH: ${{ inputs.branch }}
 steps:
 - name: Checkout Spark repository
   uses: actions/checkout@v3
@@ -418,7 +419,7 @@ jobs:
 - name: Free up disk space
   shell: 'script -q -e -c "bash {0}"'
   run: |
-if [[ "$MODULES_TO_TEST" != *"pyspark-ml"* ]]; then
+if [[ "$MODULES_TO_TEST" != *"pyspark-ml"* ]] && [[ "$BRANCH" != 
"branch-3.5" ]]; then
   # uninstall libraries dedicated for ML testing
   python3.9 -m pip uninstall -y torch torchvision torcheval torchtnt 
tensorboard mlflow
 fi


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.5 updated: [SPARK-44461][SS][PYTHON][CONNECT] Verify Python Version for spark connect streaming workers

2023-08-09 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 6b55d618d36 [SPARK-44461][SS][PYTHON][CONNECT] Verify Python Version 
for spark connect streaming workers
6b55d618d36 is described below

commit 6b55d618d36bdd296b3883916328d26863e94b8a
Author: Wei Liu 
AuthorDate: Thu Aug 10 12:08:13 2023 +0900

[SPARK-44461][SS][PYTHON][CONNECT] Verify Python Version for spark connect 
streaming workers

### What changes were proposed in this pull request?

Add python version check for spark connect streaming `foreachBatch_worker` 
and `listener_worker`

### Why are the changes needed?

Necessary check

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

I believe it can be skipped here

Closes #42421 from WweiL/SPARK-44461-verify-python-ver.

Authored-by: Wei Liu 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit 0eeb6042859e82cedaa0807abe5d6be0229ecd09)
Signed-off-by: Hyukjin Kwon 
---
 .../main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala | 2 +-
 python/pyspark/sql/connect/streaming/query.py  | 3 ++-
 python/pyspark/sql/connect/streaming/readwriter.py | 3 ++-
 python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py | 3 +++
 python/pyspark/sql/connect/streaming/worker/listener_worker.py | 3 +++
 5 files changed, 11 insertions(+), 3 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala 
b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala
index 1a75965eb92..cddda6fb7a7 100644
--- 
a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala
+++ 
b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala
@@ -83,7 +83,7 @@ private[spark] class StreamingPythonRunner(
 val stream = new BufferedOutputStream(pythonWorker.get.getOutputStream, 
bufferSize)
 val dataOut = new DataOutputStream(stream)
 
-// TODO(SPARK-44461): verify python version
+PythonWorkerUtils.writePythonVersion(pythonVer, dataOut)
 
 // Send sessionId
 PythonRDD.writeUTF(sessionId, dataOut)
diff --git a/python/pyspark/sql/connect/streaming/query.py 
b/python/pyspark/sql/connect/streaming/query.py
index 59e98e7bc30..021d27e939d 100644
--- a/python/pyspark/sql/connect/streaming/query.py
+++ b/python/pyspark/sql/connect/streaming/query.py
@@ -23,6 +23,7 @@ from pyspark.errors import StreamingQueryException, 
PySparkValueError
 import pyspark.sql.connect.proto as pb2
 from pyspark.serializers import CloudPickleSerializer
 from pyspark.sql.connect import proto
+from pyspark.sql.connect.utils import get_python_ver
 from pyspark.sql.streaming import StreamingQueryListener
 from pyspark.sql.streaming.query import (
 StreamingQuery as PySparkStreamingQuery,
@@ -237,7 +238,7 @@ class StreamingQueryManager:
 cmd = pb2.StreamingQueryManagerCommand()
 expr = proto.PythonUDF()
 expr.command = CloudPickleSerializer().dumps(listener)
-expr.python_ver = "%d.%d" % sys.version_info[:2]
+expr.python_ver = get_python_ver()
 cmd.add_listener.python_listener_payload.CopyFrom(expr)
 cmd.add_listener.id = listener._id
 self._execute_streaming_query_manager_cmd(cmd)
diff --git a/python/pyspark/sql/connect/streaming/readwriter.py 
b/python/pyspark/sql/connect/streaming/readwriter.py
index c8cd408404f..89097fcf43a 100644
--- a/python/pyspark/sql/connect/streaming/readwriter.py
+++ b/python/pyspark/sql/connect/streaming/readwriter.py
@@ -31,6 +31,7 @@ from pyspark.sql.streaming.readwriter import (
 DataStreamReader as PySparkDataStreamReader,
 DataStreamWriter as PySparkDataStreamWriter,
 )
+from pyspark.sql.connect.utils import get_python_ver
 from pyspark.sql.types import Row, StructType
 from pyspark.errors import PySparkTypeError, PySparkValueError
 
@@ -499,7 +500,7 @@ class DataStreamWriter:
 self._write_proto.foreach_batch.python_function.command = 
CloudPickleSerializer().dumps(
 func
 )
-self._write_proto.foreach_batch.python_function.python_ver = "%d.%d" % 
sys.version_info[:2]
+self._write_proto.foreach_batch.python_function.python_ver = 
get_python_ver()
 return self
 
 foreachBatch.__doc__ = PySparkDataStreamWriter.foreachBatch.__doc__
diff --git a/python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py 
b/python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py
index 48a9848de40..cf61463cd68 100644
--- a/python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py
+++ b/python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py
@@ -31,12 +31,15 @@ 

[spark] branch master updated: [SPARK-44461][SS][PYTHON][CONNECT] Verify Python Version for spark connect streaming workers

2023-08-09 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 0eeb6042859 [SPARK-44461][SS][PYTHON][CONNECT] Verify Python Version 
for spark connect streaming workers
0eeb6042859 is described below

commit 0eeb6042859e82cedaa0807abe5d6be0229ecd09
Author: Wei Liu 
AuthorDate: Thu Aug 10 12:08:13 2023 +0900

[SPARK-44461][SS][PYTHON][CONNECT] Verify Python Version for spark connect 
streaming workers

### What changes were proposed in this pull request?

Add python version check for spark connect streaming `foreachBatch_worker` 
and `listener_worker`

### Why are the changes needed?

Necessary check

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

I believe it can be skipped here

Closes #42421 from WweiL/SPARK-44461-verify-python-ver.

Authored-by: Wei Liu 
Signed-off-by: Hyukjin Kwon 
---
 .../main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala | 2 +-
 python/pyspark/sql/connect/streaming/query.py  | 3 ++-
 python/pyspark/sql/connect/streaming/readwriter.py | 3 ++-
 python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py | 3 +++
 python/pyspark/sql/connect/streaming/worker/listener_worker.py | 3 +++
 5 files changed, 11 insertions(+), 3 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala 
b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala
index a079743c847..fdfe388db2d 100644
--- 
a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala
+++ 
b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala
@@ -81,7 +81,7 @@ private[spark] class StreamingPythonRunner(
 val stream = new BufferedOutputStream(pythonWorker.get.getOutputStream, 
bufferSize)
 val dataOut = new DataOutputStream(stream)
 
-// TODO(SPARK-44461): verify python version
+PythonWorkerUtils.writePythonVersion(pythonVer, dataOut)
 
 // Send sessionId
 PythonRDD.writeUTF(sessionId, dataOut)
diff --git a/python/pyspark/sql/connect/streaming/query.py 
b/python/pyspark/sql/connect/streaming/query.py
index 59e98e7bc30..021d27e939d 100644
--- a/python/pyspark/sql/connect/streaming/query.py
+++ b/python/pyspark/sql/connect/streaming/query.py
@@ -23,6 +23,7 @@ from pyspark.errors import StreamingQueryException, 
PySparkValueError
 import pyspark.sql.connect.proto as pb2
 from pyspark.serializers import CloudPickleSerializer
 from pyspark.sql.connect import proto
+from pyspark.sql.connect.utils import get_python_ver
 from pyspark.sql.streaming import StreamingQueryListener
 from pyspark.sql.streaming.query import (
 StreamingQuery as PySparkStreamingQuery,
@@ -237,7 +238,7 @@ class StreamingQueryManager:
 cmd = pb2.StreamingQueryManagerCommand()
 expr = proto.PythonUDF()
 expr.command = CloudPickleSerializer().dumps(listener)
-expr.python_ver = "%d.%d" % sys.version_info[:2]
+expr.python_ver = get_python_ver()
 cmd.add_listener.python_listener_payload.CopyFrom(expr)
 cmd.add_listener.id = listener._id
 self._execute_streaming_query_manager_cmd(cmd)
diff --git a/python/pyspark/sql/connect/streaming/readwriter.py 
b/python/pyspark/sql/connect/streaming/readwriter.py
index c8cd408404f..89097fcf43a 100644
--- a/python/pyspark/sql/connect/streaming/readwriter.py
+++ b/python/pyspark/sql/connect/streaming/readwriter.py
@@ -31,6 +31,7 @@ from pyspark.sql.streaming.readwriter import (
 DataStreamReader as PySparkDataStreamReader,
 DataStreamWriter as PySparkDataStreamWriter,
 )
+from pyspark.sql.connect.utils import get_python_ver
 from pyspark.sql.types import Row, StructType
 from pyspark.errors import PySparkTypeError, PySparkValueError
 
@@ -499,7 +500,7 @@ class DataStreamWriter:
 self._write_proto.foreach_batch.python_function.command = 
CloudPickleSerializer().dumps(
 func
 )
-self._write_proto.foreach_batch.python_function.python_ver = "%d.%d" % 
sys.version_info[:2]
+self._write_proto.foreach_batch.python_function.python_ver = 
get_python_ver()
 return self
 
 foreachBatch.__doc__ = PySparkDataStreamWriter.foreachBatch.__doc__
diff --git a/python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py 
b/python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py
index 48a9848de40..cf61463cd68 100644
--- a/python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py
+++ b/python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py
@@ -31,12 +31,15 @@ from pyspark.serializers import (
 from pyspark import worker
 from pyspark.sql import SparkSession
 from typing 

[spark] branch master updated (3b3e0fdc61c -> 1846991b20a)

2023-08-09 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 3b3e0fdc61c [SPARK-42621][PS] Add inclusive parameter for pd.date_range
 add 1846991b20a [SPARK-44732][SQL] Built-in XML data source support

No new revisions were added by this update.

Summary of changes:
 dev/.rat-excludes  |1 +
 dev/deps/spark-deps-hadoop-3-hive-2.3  |2 +
 pom.xml|   12 +
 sql/core/pom.xml   |8 +
 ...org.apache.spark.sql.sources.DataSourceRegister |1 +
 .../execution/datasources/xml/DefaultSource.scala  |  113 +
 .../datasources/xml/XmlDataToCatalyst.scala|   62 +
 .../execution/datasources/xml/XmlInputFormat.scala |  341 ++
 .../sql/execution/datasources/xml/XmlOptions.scala |   85 +
 .../sql/execution/datasources/xml/XmlReader.scala  |  205 ++
 .../execution/datasources/xml/XmlRelation.scala|   86 +
 .../sql/execution/datasources/xml/functions.scala  |   42 +
 .../sql/execution/datasources/xml/package.scala|  162 +
 .../datasources/xml/parsers/StaxXmlGenerator.scala |  159 +
 .../datasources/xml/parsers/StaxXmlParser.scala|  375 ++
 .../xml/parsers/StaxXmlParserUtils.scala   |  179 +
 .../datasources/xml/util/InferSchema.scala |  336 ++
 .../xml/util/PartialResultException.scala  |   29 +
 .../execution/datasources/xml/util/TypeCast.scala  |  297 ++
 .../datasources/xml/util/ValidatorUtil.scala   |   55 +
 .../datasources/xml/util/XSDToSchema.scala |  280 ++
 .../execution/datasources/xml/util/XmlFile.scala   |  163 +
 .../execution/datasources/xml/JavaXmlSuite.java|  111 +
 .../test-data/xml-resources/ages-mixed-types.xml   |   15 +
 .../test-data/xml-resources/ages-with-spaces.xml   |   20 +
 .../resources/test-data/xml-resources/ages.xml |   14 +
 .../xml-resources/attributesStartWithNewLine.xml   |   11 +
 .../xml-resources/attributesStartWithNewLineCR.xml |1 +
 .../xml-resources/attributesStartWithNewLineLF.xml |   11 +
 .../resources/test-data/xml-resources/basket.xml   |   12 +
 .../resources/test-data/xml-resources/basket.xsd   |   17 +
 .../test-data/xml-resources/basket_invalid.xml |   14 +
 .../xml-resources/books-attributes-in-no-child.xml |   75 +
 .../books-complicated-null-attribute.xml   |   60 +
 .../test-data/xml-resources/books-complicated.xml  |   60 +
 .../xml-resources/books-malformed-attributes.xml   |   43 +
 .../test-data/xml-resources/books-namespaces.xml   |   12 +
 .../test-data/xml-resources/books-nested-array.xml |  130 +
 .../xml-resources/books-nested-object.xml  |  144 +
 .../xml-resources/books-unicode-in-tag-name.xml|   24 +
 .../resources/test-data/xml-resources/books.xml|  136 +
 .../test-data/xml-resources/cars-attribute.xml |9 +
 .../test-data/xml-resources/cars-iso-8859-1.xml|   21 +
 .../test-data/xml-resources/cars-malformed.xml |   20 +
 .../xml-resources/cars-mixed-attr-no-child.xml |   25 +
 .../xml-resources/cars-no-indentation.xml  |2 +
 .../xml-resources/cars-unbalanced-elements.xml |   19 +
 .../resources/test-data/xml-resources/cars.xml |   21 +
 .../resources/test-data/xml-resources/cars.xml.bz2 |  Bin 0 -> 229 bytes
 .../resources/test-data/xml-resources/cars.xml.gz  |  Bin 0 -> 210 bytes
 .../resources/test-data/xml-resources/catalog.xsd  |   41 +
 .../resources/test-data/xml-resources/choice.xsd   |   12 +
 .../xml-resources/complex-content-extension.xsd|   25 +
 .../xml-resources/datatypes-valid-and-invalid.xml  |   31 +
 .../resources/test-data/xml-resources/date.xml |5 +
 .../xml-resources/decimal-with-restriction.xsd |   18 +
 .../resources/test-data/xml-resources/empty.xml|0
 .../test-data/xml-resources/feed-with-spaces.xml   |   15 +
 .../test-data/xml-resources/fias_house.large.xml   | 3621 
 .../xml-resources/fias_house.large.xml.bz2 |  Bin 0 -> 30761 bytes
 .../xml-resources/fias_house.large.xml.gz  |  Bin 0 -> 8568 bytes
 .../test-data/xml-resources/fias_house.xml |  182 +
 .../test-data/xml-resources/fias_house.xml.bz2 |  Bin 0 -> 4571 bytes
 .../test-data/xml-resources/fias_house.xml.gz  |  Bin 0 -> 5069 bytes
 .../test-data/xml-resources/gps-empty-field.xml|   20 +
 .../xml-resources/include-example/first.xsd|5 +
 .../xml-resources/include-example/second.xsd   |   15 +
 .../resources/test-data/xml-resources/long.xsd |   10 +
 .../xml-resources/manual_schema_corrupt_record.xml |   30 +
 .../test-data/xml-resources/map-attribute.xml  |7 +
 .../test-data/xml-resources/mixed_children.xml |5 +
 .../test-data/xml-resources/mixed_children_2.xml   |5 +
 .../xml-resources/mixed_children_as_string.xml |9 +
 

[spark] branch master updated: [SPARK-42621][PS] Add inclusive parameter for pd.date_range

2023-08-09 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 3b3e0fdc61c [SPARK-42621][PS] Add inclusive parameter for pd.date_range
3b3e0fdc61c is described below

commit 3b3e0fdc61cf659bb97f94d9b12b8dcdad999e62
Author: Zhyhimont Dmitry 
AuthorDate: Thu Aug 10 11:05:11 2023 +0900

[SPARK-42621][PS] Add inclusive parameter for pd.date_range

### What changes were proposed in this pull request?
Add inclusive parameter for pd.date_range to support the pandas 2.0.0

### Why are the changes needed?
When pandas 2.0.0 is released, we should match the behavior in pandas API 
on Spark.

### Does this PR introduce any user-facing change?
yes, the API changes

Before:
ps.date_range(start='2017-01-01', end='2017-01-04', closed=None)

After:
ps.date_range(start='2017-01-01', end='2017-01-04', inclusive="both")

### How was this patch tested?
Unit tests were updated

Closes #40665 from dzhigimont/SPARK-42621_ZH.

Lead-authored-by: Zhyhimont Dmitry 
Co-authored-by: Zhyhimont Dmitry 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/pandas/namespace.py| 32 +--
 python/pyspark/pandas/tests/test_namespace.py | 25 +
 2 files changed, 55 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/pandas/namespace.py 
b/python/pyspark/pandas/namespace.py
index ba93e5a3ee5..fddf1bec63f 100644
--- a/python/pyspark/pandas/namespace.py
+++ b/python/pyspark/pandas/namespace.py
@@ -1751,8 +1751,6 @@ def to_datetime(
 )
 
 
-# TODO(SPARK-42621): Add `inclusive` parameter.
-# See https://github.com/pandas-dev/pandas/issues/40245
 def date_range(
 start: Union[str, Any] = None,
 end: Union[str, Any] = None,
@@ -1761,6 +1759,7 @@ def date_range(
 tz: Optional[Union[str, tzinfo]] = None,
 normalize: bool = False,
 name: Optional[str] = None,
+inclusive: str = "both",
 **kwargs: Any,
 ) -> DatetimeIndex:
 """
@@ -1784,6 +1783,11 @@ def date_range(
 Normalize start/end dates to midnight before generating date range.
 name : str, default None
 Name of the resulting DatetimeIndex.
+inclusive : {"both", "neither", "left", "right"}, default "both"
+Include boundaries; Whether to set each bound as closed or open.
+
+.. versionadded:: 4.0.0
+
 **kwargs
 For compatibility. Has no effect on the result.
 
@@ -1867,6 +1871,29 @@ def date_range(
 DatetimeIndex(['2018-01-31', '2018-04-30', '2018-07-31', '2018-10-31',
'2019-01-31'],
   dtype='datetime64[ns]', freq=None)
+
+`inclusive` controls whether to include `start` and `end` that are on the
+boundary. The default includes boundary points on either end.
+
+>>> ps.date_range(
+... start='2017-01-01', end='2017-01-04', inclusive="both"
+... )  # doctest: +NORMALIZE_WHITESPACE
+DatetimeIndex(['2017-01-01', '2017-01-02', '2017-01-03', '2017-01-04'],
+   dtype='datetime64[ns]', freq=None)
+
+Use ``inclusive='left'`` to exclude `end` if it falls on the boundary.
+
+>>> ps.date_range(
+... start='2017-01-01', end='2017-01-04', inclusive='left'
+... )  # doctest: +NORMALIZE_WHITESPACE
+DatetimeIndex(['2017-01-01', '2017-01-02', '2017-01-03'], 
dtype='datetime64[ns]', freq=None)
+
+Use ``inclusive='right'`` to exclude `start` if it falls on the boundary.
+
+>>> ps.date_range(
+... start='2017-01-01', end='2017-01-04', inclusive='right'
+... )  # doctest: +NORMALIZE_WHITESPACE
+DatetimeIndex(['2017-01-02', '2017-01-03', '2017-01-04'], 
dtype='datetime64[ns]', freq=None)
 """
 assert freq not in ["N", "ns"], "nanoseconds is not supported"
 assert tz is None, "Localized DatetimeIndex is not supported"
@@ -1882,6 +1909,7 @@ def date_range(
 tz=tz,
 normalize=normalize,
 name=name,
+inclusive=inclusive,
 **kwargs,
 )
 ),
diff --git a/python/pyspark/pandas/tests/test_namespace.py 
b/python/pyspark/pandas/tests/test_namespace.py
index d1d1e1af935..5c202046717 100644
--- a/python/pyspark/pandas/tests/test_namespace.py
+++ b/python/pyspark/pandas/tests/test_namespace.py
@@ -221,6 +221,31 @@ class NamespaceTestsMixin:
 pd.date_range(start="1/1/2018", periods=5, 
freq=pd.offsets.MonthEnd(3)),
 )
 
+self.assert_eq(
+ps.date_range(start="2017-01-01", end="2017-01-04", 
inclusive="left"),
+pd.date_range(start="2017-01-01", end="2017-01-04", 
inclusive="left"),
+)
+
+self.assert_eq(
+ps.date_range(start="2017-01-01", end="2017-01-04", 
inclusive="right"),
+

[spark] branch branch-3.5 updated: [SPARK-44747][CONNECT] Add missing SparkSession.Builder methods

2023-08-09 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 6e4abe11442 [SPARK-44747][CONNECT] Add missing SparkSession.Builder 
methods
6e4abe11442 is described below

commit 6e4abe11442f2986412909b4ebb4c13487df27b6
Author: Herman van Hovell 
AuthorDate: Thu Aug 10 09:49:45 2023 +0900

[SPARK-44747][CONNECT] Add missing SparkSession.Builder methods

### What changes were proposed in this pull request?
This PR adds a couple methods to SparkSession.Builder:
- `conf` - this group of methods allows you to set runtime configurations 
on the Spark Connect Session.
- `master` - this is a no-op, it is only added for compatibility.
- `appName` - this is a no-op, it is only added for compatibility.
- `enableHiveSupport ` - this is a no-op, it is only added for 
compatibility.

### Why are the changes needed?
We want to maximize compatiblity with the existing API in sql/core.

### Does this PR introduce _any_ user-facing change?
Yes. It adds a couple of builder methods.

### How was this patch tested?
Add tests to `SparkSessionSuite` and `SparkSessionE2ESuite`.

Closes #42419 from hvanhovell/SPARK-44747.

Authored-by: Herman van Hovell 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit d27496eb3bf962981e37f989ba486d847745444f)
Signed-off-by: Hyukjin Kwon 
---
 .../scala/org/apache/spark/sql/SparkSession.scala  | 91 +-
 .../apache/spark/sql/SparkSessionE2ESuite.scala| 46 +++
 .../org/apache/spark/sql/SparkSessionSuite.scala   | 10 +++
 .../CheckConnectJvmClientCompatibility.scala   |  6 --
 4 files changed, 146 insertions(+), 7 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 7367ed153f7..e902e04e246 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -781,6 +781,7 @@ object SparkSession extends Logging {
   class Builder() extends Logging {
 private val builder = SparkConnectClient.builder()
 private var client: SparkConnectClient = _
+private[this] val options = new scala.collection.mutable.HashMap[String, 
String]
 
 def remote(connectionString: String): Builder = {
   builder.connectionString(connectionString)
@@ -804,6 +805,84 @@ object SparkSession extends Logging {
   this
 }
 
+/**
+ * Sets a config option. Options set using this method are automatically 
propagated to the
+ * Spark Connect session. Only runtime options are supported.
+ *
+ * @since 3.5.0
+ */
+def config(key: String, value: String): Builder = synchronized {
+  options += key -> value
+  this
+}
+
+/**
+ * Sets a config option. Options set using this method are automatically 
propagated to the
+ * Spark Connect session. Only runtime options are supported.
+ *
+ * @since 3.5.0
+ */
+def config(key: String, value: Long): Builder = synchronized {
+  options += key -> value.toString
+  this
+}
+
+/**
+ * Sets a config option. Options set using this method are automatically 
propagated to the
+ * Spark Connect session. Only runtime options are supported.
+ *
+ * @since 3.5.0
+ */
+def config(key: String, value: Double): Builder = synchronized {
+  options += key -> value.toString
+  this
+}
+
+/**
+ * Sets a config option. Options set using this method are automatically 
propagated to the
+ * Spark Connect session. Only runtime options are supported.
+ *
+ * @since 3.5.0
+ */
+def config(key: String, value: Boolean): Builder = synchronized {
+  options += key -> value.toString
+  this
+}
+
+/**
+ * Sets a config a map of options. Options set using this method are 
automatically propagated
+ * to the Spark Connect session. Only runtime options are supported.
+ *
+ * @since 3.5.0
+ */
+def config(map: Map[String, Any]): Builder = synchronized {
+  map.foreach { kv: (String, Any) =>
+{
+  options += kv._1 -> kv._2.toString
+}
+  }
+  this
+}
+
+/**
+ * Sets a config option. Options set using this method are automatically 
propagated to both
+ * `SparkConf` and SparkSession's own configuration.
+ *
+ * @since 3.5.0
+ */
+def config(map: java.util.Map[String, Any]): Builder = synchronized {
+  config(map.asScala.toMap)
+}
+
+@deprecated("enableHiveSupport does not work in Spark Connect")
+def enableHiveSupport(): Builder = this
+
+   

[spark] branch master updated: [SPARK-44747][CONNECT] Add missing SparkSession.Builder methods

2023-08-09 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new d27496eb3bf [SPARK-44747][CONNECT] Add missing SparkSession.Builder 
methods
d27496eb3bf is described below

commit d27496eb3bf962981e37f989ba486d847745444f
Author: Herman van Hovell 
AuthorDate: Thu Aug 10 09:49:45 2023 +0900

[SPARK-44747][CONNECT] Add missing SparkSession.Builder methods

### What changes were proposed in this pull request?
This PR adds a couple methods to SparkSession.Builder:
- `conf` - this group of methods allows you to set runtime configurations 
on the Spark Connect Session.
- `master` - this is a no-op, it is only added for compatibility.
- `appName` - this is a no-op, it is only added for compatibility.
- `enableHiveSupport ` - this is a no-op, it is only added for 
compatibility.

### Why are the changes needed?
We want to maximize compatiblity with the existing API in sql/core.

### Does this PR introduce _any_ user-facing change?
Yes. It adds a couple of builder methods.

### How was this patch tested?
Add tests to `SparkSessionSuite` and `SparkSessionE2ESuite`.

Closes #42419 from hvanhovell/SPARK-44747.

Authored-by: Herman van Hovell 
Signed-off-by: Hyukjin Kwon 
---
 .../scala/org/apache/spark/sql/SparkSession.scala  | 91 +-
 .../apache/spark/sql/SparkSessionE2ESuite.scala| 46 +++
 .../org/apache/spark/sql/SparkSessionSuite.scala   | 10 +++
 .../CheckConnectJvmClientCompatibility.scala   |  6 --
 4 files changed, 146 insertions(+), 7 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 7367ed153f7..e902e04e246 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -781,6 +781,7 @@ object SparkSession extends Logging {
   class Builder() extends Logging {
 private val builder = SparkConnectClient.builder()
 private var client: SparkConnectClient = _
+private[this] val options = new scala.collection.mutable.HashMap[String, 
String]
 
 def remote(connectionString: String): Builder = {
   builder.connectionString(connectionString)
@@ -804,6 +805,84 @@ object SparkSession extends Logging {
   this
 }
 
+/**
+ * Sets a config option. Options set using this method are automatically 
propagated to the
+ * Spark Connect session. Only runtime options are supported.
+ *
+ * @since 3.5.0
+ */
+def config(key: String, value: String): Builder = synchronized {
+  options += key -> value
+  this
+}
+
+/**
+ * Sets a config option. Options set using this method are automatically 
propagated to the
+ * Spark Connect session. Only runtime options are supported.
+ *
+ * @since 3.5.0
+ */
+def config(key: String, value: Long): Builder = synchronized {
+  options += key -> value.toString
+  this
+}
+
+/**
+ * Sets a config option. Options set using this method are automatically 
propagated to the
+ * Spark Connect session. Only runtime options are supported.
+ *
+ * @since 3.5.0
+ */
+def config(key: String, value: Double): Builder = synchronized {
+  options += key -> value.toString
+  this
+}
+
+/**
+ * Sets a config option. Options set using this method are automatically 
propagated to the
+ * Spark Connect session. Only runtime options are supported.
+ *
+ * @since 3.5.0
+ */
+def config(key: String, value: Boolean): Builder = synchronized {
+  options += key -> value.toString
+  this
+}
+
+/**
+ * Sets a config a map of options. Options set using this method are 
automatically propagated
+ * to the Spark Connect session. Only runtime options are supported.
+ *
+ * @since 3.5.0
+ */
+def config(map: Map[String, Any]): Builder = synchronized {
+  map.foreach { kv: (String, Any) =>
+{
+  options += kv._1 -> kv._2.toString
+}
+  }
+  this
+}
+
+/**
+ * Sets a config option. Options set using this method are automatically 
propagated to both
+ * `SparkConf` and SparkSession's own configuration.
+ *
+ * @since 3.5.0
+ */
+def config(map: java.util.Map[String, Any]): Builder = synchronized {
+  config(map.asScala.toMap)
+}
+
+@deprecated("enableHiveSupport does not work in Spark Connect")
+def enableHiveSupport(): Builder = this
+
+@deprecated("master does not work in Spark Connect, please use remote 
instead")
+def master(master: 

[spark] branch branch-3.5 updated: [SPARK-44740][CONNECT] Support specifying `session_id` in SPARK_REMOTE connection string

2023-08-09 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 576d1698c7b [SPARK-44740][CONNECT] Support specifying `session_id` in 
SPARK_REMOTE connection string
576d1698c7b is described below

commit 576d1698c7b52b2d9ce00fa2fc5912c92e8bbe67
Author: Martin Grund 
AuthorDate: Thu Aug 10 08:41:13 2023 +0900

[SPARK-44740][CONNECT] Support specifying `session_id` in SPARK_REMOTE 
connection string

### What changes were proposed in this pull request?
To support cross-language session sharing in Spark connect, we need to be 
able to inject the session ID into the connection string because on the server 
side, the client-provided session ID is used already together with the user id.

```

SparkSession.builder.remote("sc://localhost/;session_id=abcdefg").getOrCreate()
```

### Why are the changes needed?
ease of use

### Does this PR introduce _any_ user-facing change?
Adds a way to configure the Spark Connect connection string with 
`session_id`

### How was this patch tested?
Added UT for the parameter.

Closes #42415 from grundprinzip/SPARK-44740.

Authored-by: Martin Grund 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit 7af4e358f3f4902cc9601e56c2662b8921a925d6)
Signed-off-by: Hyukjin Kwon 
---
 .../sql/connect/client/SparkConnectClient.scala| 22 ++--
 .../connect/client/SparkConnectClientParser.scala  |  3 +++
 .../SparkConnectClientBuilderParseTestSuite.scala  |  4 +++
 .../connect/client/SparkConnectClientSuite.scala   |  6 +
 connector/connect/docs/client-connection-string.md | 11 
 python/pyspark/sql/connect/client/core.py  | 30 +++---
 .../sql/tests/connect/client/test_client.py|  7 +
 .../sql/tests/connect/test_connect_basic.py| 18 -
 8 files changed, 94 insertions(+), 7 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
index a028df536cf..637499f090c 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
@@ -56,7 +56,7 @@ private[sql] class SparkConnectClient(
   // Generate a unique session ID for this client. This UUID must be unique to 
allow
   // concurrent Spark sessions of the same user. If the channel is closed, 
creating
   // a new client will create a new session ID.
-  private[sql] val sessionId: String = UUID.randomUUID.toString
+  private[sql] val sessionId: String = 
configuration.sessionId.getOrElse(UUID.randomUUID.toString)
 
   private[client] val artifactManager: ArtifactManager = {
 new ArtifactManager(configuration, sessionId, bstub, stub)
@@ -432,6 +432,7 @@ object SparkConnectClient {
   val PARAM_USE_SSL = "use_ssl"
   val PARAM_TOKEN = "token"
   val PARAM_USER_AGENT = "user_agent"
+  val PARAM_SESSION_ID = "session_id"
 }
 
 private def verifyURI(uri: URI): Unit = {
@@ -463,6 +464,21 @@ object SparkConnectClient {
   this
 }
 
+def sessionId(value: String): Builder = {
+  try {
+UUID.fromString(value).toString
+  } catch {
+case e: IllegalArgumentException =>
+  throw new IllegalArgumentException(
+"Parameter value 'session_id' must be a valid UUID format.",
+e)
+  }
+  _configuration = _configuration.copy(sessionId = Some(value))
+  this
+}
+
+def sessionId: Option[String] = _configuration.sessionId
+
 def userAgent: String = _configuration.userAgent
 
 def option(key: String, value: String): Builder = {
@@ -490,6 +506,7 @@ object SparkConnectClient {
   case URIParams.PARAM_TOKEN => token(value)
   case URIParams.PARAM_USE_SSL =>
 if (java.lang.Boolean.valueOf(value)) enableSsl() else disableSsl()
+  case URIParams.PARAM_SESSION_ID => sessionId(value)
   case _ => option(key, value)
 }
   }
@@ -576,7 +593,8 @@ object SparkConnectClient {
   userAgent: String = DEFAULT_USER_AGENT,
   retryPolicy: GrpcRetryHandler.RetryPolicy = 
GrpcRetryHandler.RetryPolicy(),
   useReattachableExecute: Boolean = true,
-  interceptors: List[ClientInterceptor] = List.empty) {
+  interceptors: List[ClientInterceptor] = List.empty,
+  sessionId: Option[String] = None) {
 
 def userContext: proto.UserContext = {
   val builder = proto.UserContext.newBuilder()
diff --git 

[spark] branch master updated: [SPARK-44740][CONNECT] Support specifying `session_id` in SPARK_REMOTE connection string

2023-08-09 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 7af4e358f3f [SPARK-44740][CONNECT] Support specifying `session_id` in 
SPARK_REMOTE connection string
7af4e358f3f is described below

commit 7af4e358f3f4902cc9601e56c2662b8921a925d6
Author: Martin Grund 
AuthorDate: Thu Aug 10 08:41:13 2023 +0900

[SPARK-44740][CONNECT] Support specifying `session_id` in SPARK_REMOTE 
connection string

### What changes were proposed in this pull request?
To support cross-language session sharing in Spark connect, we need to be 
able to inject the session ID into the connection string because on the server 
side, the client-provided session ID is used already together with the user id.

```

SparkSession.builder.remote("sc://localhost/;session_id=abcdefg").getOrCreate()
```

### Why are the changes needed?
ease of use

### Does this PR introduce _any_ user-facing change?
Adds a way to configure the Spark Connect connection string with 
`session_id`

### How was this patch tested?
Added UT for the parameter.

Closes #42415 from grundprinzip/SPARK-44740.

Authored-by: Martin Grund 
Signed-off-by: Hyukjin Kwon 
---
 .../sql/connect/client/SparkConnectClient.scala| 22 ++--
 .../connect/client/SparkConnectClientParser.scala  |  3 +++
 .../SparkConnectClientBuilderParseTestSuite.scala  |  4 +++
 .../connect/client/SparkConnectClientSuite.scala   |  6 +
 connector/connect/docs/client-connection-string.md | 11 
 python/pyspark/sql/connect/client/core.py  | 30 +++---
 .../sql/tests/connect/client/test_client.py|  7 +
 .../sql/tests/connect/test_connect_basic.py| 18 -
 8 files changed, 94 insertions(+), 7 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
index a028df536cf..637499f090c 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
@@ -56,7 +56,7 @@ private[sql] class SparkConnectClient(
   // Generate a unique session ID for this client. This UUID must be unique to 
allow
   // concurrent Spark sessions of the same user. If the channel is closed, 
creating
   // a new client will create a new session ID.
-  private[sql] val sessionId: String = UUID.randomUUID.toString
+  private[sql] val sessionId: String = 
configuration.sessionId.getOrElse(UUID.randomUUID.toString)
 
   private[client] val artifactManager: ArtifactManager = {
 new ArtifactManager(configuration, sessionId, bstub, stub)
@@ -432,6 +432,7 @@ object SparkConnectClient {
   val PARAM_USE_SSL = "use_ssl"
   val PARAM_TOKEN = "token"
   val PARAM_USER_AGENT = "user_agent"
+  val PARAM_SESSION_ID = "session_id"
 }
 
 private def verifyURI(uri: URI): Unit = {
@@ -463,6 +464,21 @@ object SparkConnectClient {
   this
 }
 
+def sessionId(value: String): Builder = {
+  try {
+UUID.fromString(value).toString
+  } catch {
+case e: IllegalArgumentException =>
+  throw new IllegalArgumentException(
+"Parameter value 'session_id' must be a valid UUID format.",
+e)
+  }
+  _configuration = _configuration.copy(sessionId = Some(value))
+  this
+}
+
+def sessionId: Option[String] = _configuration.sessionId
+
 def userAgent: String = _configuration.userAgent
 
 def option(key: String, value: String): Builder = {
@@ -490,6 +506,7 @@ object SparkConnectClient {
   case URIParams.PARAM_TOKEN => token(value)
   case URIParams.PARAM_USE_SSL =>
 if (java.lang.Boolean.valueOf(value)) enableSsl() else disableSsl()
+  case URIParams.PARAM_SESSION_ID => sessionId(value)
   case _ => option(key, value)
 }
   }
@@ -576,7 +593,8 @@ object SparkConnectClient {
   userAgent: String = DEFAULT_USER_AGENT,
   retryPolicy: GrpcRetryHandler.RetryPolicy = 
GrpcRetryHandler.RetryPolicy(),
   useReattachableExecute: Boolean = true,
-  interceptors: List[ClientInterceptor] = List.empty) {
+  interceptors: List[ClientInterceptor] = List.empty,
+  sessionId: Option[String] = None) {
 
 def userContext: proto.UserContext = {
   val builder = proto.UserContext.newBuilder()
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClientParser.scala
 

[spark] branch branch-3.4 updated: [SPARK-44745][DOCS][K8S] Document shuffle data recovery from the remounted K8s PVCs

2023-08-09 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new a846a225ced [SPARK-44745][DOCS][K8S] Document shuffle data recovery 
from the remounted K8s PVCs
a846a225ced is described below

commit a846a225cedf61187ece611143b45806837bf0bc
Author: Dongjoon Hyun 
AuthorDate: Wed Aug 9 15:25:33 2023 -0700

[SPARK-44745][DOCS][K8S] Document shuffle data recovery from the remounted 
K8s PVCs

### What changes were proposed in this pull request?

This PR aims to document an example of shuffle data recovery configuration 
from the remounted K8s PVCs.

### Why are the changes needed?

This will help the users use this feature more easily.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manual review because this is a doc-only change.

![Screenshot 2023-08-09 at 1 39 48 
PM](https://github.com/apache/spark/assets/9700541/8cc7240b-570d-4c2e-b90a-54795c18df0a)

```
$ kubectl logs -f xxx-exec-16 | grep Kube
...
23/08/09 21:09:21 INFO KubernetesLocalDiskShuffleExecutorComponents: Try to 
recover shuffle data.
23/08/09 21:09:21 INFO KubernetesLocalDiskShuffleExecutorComponents: Found 
192 files
23/08/09 21:09:21 INFO KubernetesLocalDiskShuffleExecutorComponents: Try to 
recover 
/data/spark-x/executor-x/blockmgr-41a810ea-9503-447b-afc7-1cb104cd03cf/11/shuffle_0_11160_0.data
23/08/09 21:09:21 INFO KubernetesLocalDiskShuffleExecutorComponents: Try to 
recover 
/data/spark-x/executor-x/blockmgr-41a810ea-9503-447b-afc7-1cb104cd03cf/0e/shuffle_0_10063_0.data
23/08/09 21:09:21 INFO KubernetesLocalDiskShuffleExecutorComponents: Try to 
recover 
/data/spark-x/executor-x/blockmgr-41a810ea-9503-447b-afc7-1cb104cd03cf/0e/shuffle_0_10283_0.data
23/08/09 21:09:21 INFO KubernetesLocalDiskShuffleExecutorComponents: Ignore 
a non-shuffle block file.
```

Closes #42417 from dongjoon-hyun/SPARK-44745.

Authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 4db378fae30733cbd2be41e95a3cd8ad2184e06f)
Signed-off-by: Dongjoon Hyun 
---
 docs/running-on-kubernetes.md | 7 +++
 1 file changed, 7 insertions(+)

diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index 71754dbc6f9..7a35236afa0 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -394,6 +394,13 @@ 
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.
 
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly=false
 ```
 
+To enable shuffle data recovery feature via the built-in 
`KubernetesLocalDiskShuffleDataIO` plugin, we need to have the followings. You 
may want to enable `spark.kubernetes.driver.waitToReusePersistentVolumeClaim` 
additionally.
+
+```
+spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path=/data/spark-x/executor-x
+spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO
+```
+
 If no volume is set as local storage, Spark uses temporary scratch space to 
spill data to disk during shuffles and other operations. When using Kubernetes 
as the resource manager the pods will be created with an 
[emptyDir](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir) 
volume mounted for each directory listed in `spark.local.dir` or the 
environment variable `SPARK_LOCAL_DIRS` .  If no directories are explicitly 
specified then a default directory is created and configured  [...]
 
 `emptyDir` volumes use the ephemeral storage feature of Kubernetes and do not 
persist beyond the life of the pod.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.5 updated: [SPARK-44745][DOCS][K8S] Document shuffle data recovery from the remounted K8s PVCs

2023-08-09 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 4d67f2c59c6 [SPARK-44745][DOCS][K8S] Document shuffle data recovery 
from the remounted K8s PVCs
4d67f2c59c6 is described below

commit 4d67f2c59c660cd1081f6c1bd3002fffca28e936
Author: Dongjoon Hyun 
AuthorDate: Wed Aug 9 15:25:33 2023 -0700

[SPARK-44745][DOCS][K8S] Document shuffle data recovery from the remounted 
K8s PVCs

### What changes were proposed in this pull request?

This PR aims to document an example of shuffle data recovery configuration 
from the remounted K8s PVCs.

### Why are the changes needed?

This will help the users use this feature more easily.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manual review because this is a doc-only change.

![Screenshot 2023-08-09 at 1 39 48 
PM](https://github.com/apache/spark/assets/9700541/8cc7240b-570d-4c2e-b90a-54795c18df0a)

```
$ kubectl logs -f xxx-exec-16 | grep Kube
...
23/08/09 21:09:21 INFO KubernetesLocalDiskShuffleExecutorComponents: Try to 
recover shuffle data.
23/08/09 21:09:21 INFO KubernetesLocalDiskShuffleExecutorComponents: Found 
192 files
23/08/09 21:09:21 INFO KubernetesLocalDiskShuffleExecutorComponents: Try to 
recover 
/data/spark-x/executor-x/blockmgr-41a810ea-9503-447b-afc7-1cb104cd03cf/11/shuffle_0_11160_0.data
23/08/09 21:09:21 INFO KubernetesLocalDiskShuffleExecutorComponents: Try to 
recover 
/data/spark-x/executor-x/blockmgr-41a810ea-9503-447b-afc7-1cb104cd03cf/0e/shuffle_0_10063_0.data
23/08/09 21:09:21 INFO KubernetesLocalDiskShuffleExecutorComponents: Try to 
recover 
/data/spark-x/executor-x/blockmgr-41a810ea-9503-447b-afc7-1cb104cd03cf/0e/shuffle_0_10283_0.data
23/08/09 21:09:21 INFO KubernetesLocalDiskShuffleExecutorComponents: Ignore 
a non-shuffle block file.
```

Closes #42417 from dongjoon-hyun/SPARK-44745.

Authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 4db378fae30733cbd2be41e95a3cd8ad2184e06f)
Signed-off-by: Dongjoon Hyun 
---
 docs/running-on-kubernetes.md | 7 +++
 1 file changed, 7 insertions(+)

diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index d3953592c4e..707a76196f3 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -394,6 +394,13 @@ 
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.
 
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly=false
 ```
 
+To enable shuffle data recovery feature via the built-in 
`KubernetesLocalDiskShuffleDataIO` plugin, we need to have the followings. You 
may want to enable `spark.kubernetes.driver.waitToReusePersistentVolumeClaim` 
additionally.
+
+```
+spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path=/data/spark-x/executor-x
+spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO
+```
+
 If no volume is set as local storage, Spark uses temporary scratch space to 
spill data to disk during shuffles and other operations. When using Kubernetes 
as the resource manager the pods will be created with an 
[emptyDir](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir) 
volume mounted for each directory listed in `spark.local.dir` or the 
environment variable `SPARK_LOCAL_DIRS` .  If no directories are explicitly 
specified then a default directory is created and configured  [...]
 
 `emptyDir` volumes use the ephemeral storage feature of Kubernetes and do not 
persist beyond the life of the pod.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-44745][DOCS][K8S] Document shuffle data recovery from the remounted K8s PVCs

2023-08-09 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 4db378fae30 [SPARK-44745][DOCS][K8S] Document shuffle data recovery 
from the remounted K8s PVCs
4db378fae30 is described below

commit 4db378fae30733cbd2be41e95a3cd8ad2184e06f
Author: Dongjoon Hyun 
AuthorDate: Wed Aug 9 15:25:33 2023 -0700

[SPARK-44745][DOCS][K8S] Document shuffle data recovery from the remounted 
K8s PVCs

### What changes were proposed in this pull request?

This PR aims to document an example of shuffle data recovery configuration 
from the remounted K8s PVCs.

### Why are the changes needed?

This will help the users use this feature more easily.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manual review because this is a doc-only change.

![Screenshot 2023-08-09 at 1 39 48 
PM](https://github.com/apache/spark/assets/9700541/8cc7240b-570d-4c2e-b90a-54795c18df0a)

```
$ kubectl logs -f xxx-exec-16 | grep Kube
...
23/08/09 21:09:21 INFO KubernetesLocalDiskShuffleExecutorComponents: Try to 
recover shuffle data.
23/08/09 21:09:21 INFO KubernetesLocalDiskShuffleExecutorComponents: Found 
192 files
23/08/09 21:09:21 INFO KubernetesLocalDiskShuffleExecutorComponents: Try to 
recover 
/data/spark-x/executor-x/blockmgr-41a810ea-9503-447b-afc7-1cb104cd03cf/11/shuffle_0_11160_0.data
23/08/09 21:09:21 INFO KubernetesLocalDiskShuffleExecutorComponents: Try to 
recover 
/data/spark-x/executor-x/blockmgr-41a810ea-9503-447b-afc7-1cb104cd03cf/0e/shuffle_0_10063_0.data
23/08/09 21:09:21 INFO KubernetesLocalDiskShuffleExecutorComponents: Try to 
recover 
/data/spark-x/executor-x/blockmgr-41a810ea-9503-447b-afc7-1cb104cd03cf/0e/shuffle_0_10283_0.data
23/08/09 21:09:21 INFO KubernetesLocalDiskShuffleExecutorComponents: Ignore 
a non-shuffle block file.
```

Closes #42417 from dongjoon-hyun/SPARK-44745.

Authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
---
 docs/running-on-kubernetes.md | 7 +++
 1 file changed, 7 insertions(+)

diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index d3953592c4e..707a76196f3 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -394,6 +394,13 @@ 
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.
 
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly=false
 ```
 
+To enable shuffle data recovery feature via the built-in 
`KubernetesLocalDiskShuffleDataIO` plugin, we need to have the followings. You 
may want to enable `spark.kubernetes.driver.waitToReusePersistentVolumeClaim` 
additionally.
+
+```
+spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path=/data/spark-x/executor-x
+spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO
+```
+
 If no volume is set as local storage, Spark uses temporary scratch space to 
spill data to disk during shuffles and other operations. When using Kubernetes 
as the resource manager the pods will be created with an 
[emptyDir](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir) 
volume mounted for each directory listed in `spark.local.dir` or the 
environment variable `SPARK_LOCAL_DIRS` .  If no directories are explicitly 
specified then a default directory is created and configured  [...]
 
 `emptyDir` volumes use the ephemeral storage feature of Kubernetes and do not 
persist beyond the life of the pod.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-44503][SQL] Project any PARTITION BY expressions not already returned from Python UDTF TABLE arguments

2023-08-09 Thread ueshin
This is an automated email from the ASF dual-hosted git repository.

ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 87298db43d9 [SPARK-44503][SQL] Project any PARTITION BY expressions 
not already returned from Python UDTF TABLE arguments
87298db43d9 is described below

commit 87298db43d9a33fa3a3986f274442a17aad74dc3
Author: Daniel Tenedorio 
AuthorDate: Wed Aug 9 10:27:07 2023 -0700

[SPARK-44503][SQL] Project any PARTITION BY expressions not already 
returned from Python UDTF TABLE arguments

### What changes were proposed in this pull request?

This PR adds a projection when any Python UDTF TABLE argument contains 
PARTITION BY expressions that are not simple attributes that are already 
present in the output of the relation.

For example:

```
CREATE TABLE t(d DATE, y INT) USING PARQUET;
INSERT INTO t VALUES ...
SELECT * FROM UDTF(TABLE(t) PARTITION BY EXTRACT(YEAR FROM d) ORDER BY y 
ASC);
```

This will generate a plan like:

```
+- Sort (y ASC)
  +- RepartitionByExpressions (partition_by_0)
+- Project (t.d, t.y, EXTRACT(YEAR FROM t.d) AS partition_by_0)
  +- LogicalRelation "t"
```

### Why are the changes needed?

We project the PARTITION BY expressions so that their resulting values 
appear in attributes that the Python UDTF interpreter can simply inspect in 
order to know when the partition boundaries have changed.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

This PR adds unit test coverage.

Closes #42351 from dtenedor/partition-by-execution.

Authored-by: Daniel Tenedorio 
Signed-off-by: Takuya UESHIN 
---
 .../FunctionTableSubqueryArgumentExpression.scala  |  77 +++--
 .../sql/execution/python/PythonUDTFSuite.scala | 127 +++--
 2 files changed, 184 insertions(+), 20 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/FunctionTableSubqueryArgumentExpression.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/FunctionTableSubqueryArgumentExpression.scala
index e7a4888125d..daa0751eedf 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/FunctionTableSubqueryArgumentExpression.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/FunctionTableSubqueryArgumentExpression.scala
@@ -104,23 +104,80 @@ case class FunctionTableSubqueryArgumentExpression(
 // the query plan.
 var subquery = plan
 if (partitionByExpressions.nonEmpty) {
-  subquery = RepartitionByExpression(
-partitionExpressions = partitionByExpressions,
-child = subquery,
-optNumPartitions = None)
+  // Add a projection to project each of the partitioning expressions that 
it is not a simple
+  // attribute that is already present in the plan output. Then add a sort 
operation by the
+  // partition keys (plus any explicit ORDER BY items) since after the 
hash-based shuffle
+  // operation, the rows from several partitions may arrive interleaved. 
In this way, the Python
+  // UDTF evaluator is able to inspect the values of the partitioning 
expressions for adjacent
+  // rows in order to determine when each partition ends and the next one 
begins.
+  subquery = Project(
+projectList = subquery.output ++ extraProjectedPartitioningExpressions,
+child = subquery)
+  val partitioningAttributes = partitioningExpressionIndexes.map(i => 
subquery.output(i))
+  subquery = Sort(
+order = partitioningAttributes.map(e => SortOrder(e, Ascending)) ++ 
orderByExpressions,
+global = false,
+child = RepartitionByExpression(
+  partitionExpressions = partitioningAttributes,
+  optNumPartitions = None,
+  child = subquery))
 }
 if (withSinglePartition) {
   subquery = Repartition(
 numPartitions = 1,
 shuffle = true,
 child = subquery)
-}
-if (orderByExpressions.nonEmpty) {
-  subquery = Sort(
-order = orderByExpressions,
-global = false,
-child = subquery)
+  if (orderByExpressions.nonEmpty) {
+subquery = Sort(
+  order = orderByExpressions,
+  global = false,
+  child = subquery)
+  }
 }
 Project(Seq(Alias(CreateStruct(subquery.output), "c")()), subquery)
   }
+
+  /**
+   * These are the indexes of the PARTITION BY expressions within the 
concatenation of the child's
+   * output attributes and the [[extraProjectedPartitioningExpressions]]. We 
send these indexes to
+   * the Python UDTF evaluator so it knows which expressions to compare on 
adjacent rows to know
+   * when the partition has 

[spark] branch branch-3.5 updated: [SPARK-44720][CONNECT] Make Dataset use Encoder instead of AgnosticEncoder

2023-08-09 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 6eca5da8d3f [SPARK-44720][CONNECT] Make Dataset use Encoder instead of 
AgnosticEncoder
6eca5da8d3f is described below

commit 6eca5da8d3fba6d1e385f06494030996241937fa
Author: Herman van Hovell 
AuthorDate: Wed Aug 9 15:58:18 2023 +0200

[SPARK-44720][CONNECT] Make Dataset use Encoder instead of AgnosticEncoder

### What changes were proposed in this pull request?
Make the Spark Connect Dataset use Encoder instead of AgnosticEncoder

### Why are the changes needed?
We want to improve binary compatibility between the Spark Connect Scala 
Client and the original sql/core APIs.

### Does this PR introduce _any_ user-facing change?
Yes. It changes the type of `Dataset.encoder` from `AgnosticEncoder` to 
`Encoder`.

### How was this patch tested?
Existing tests.

Closes #42396 from hvanhovell/SPARK-44720.

Authored-by: Herman van Hovell 
Signed-off-by: Herman van Hovell 
(cherry picked from commit be9ffb37585fe421705ceaa52fe49b89c50703a3)
Signed-off-by: Herman van Hovell 
---
 .../main/scala/org/apache/spark/sql/Dataset.scala  | 87 --
 .../apache/spark/sql/KeyValueGroupedDataset.scala  |  6 +-
 .../spark/sql/streaming/DataStreamWriter.scala |  2 +-
 .../CheckConnectJvmClientCompatibility.scala   |  3 -
 4 files changed, 50 insertions(+), 48 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
index 5f263903c8b..2d72ea6bda8 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -128,11 +128,13 @@ import org.apache.spark.util.SparkClassUtils
 class Dataset[T] private[sql] (
 val sparkSession: SparkSession,
 @DeveloperApi val plan: proto.Plan,
-val encoder: AgnosticEncoder[T])
+val encoder: Encoder[T])
 extends Serializable {
   // Make sure we don't forget to set plan id.
   assert(plan.getRoot.getCommon.hasPlanId)
 
+  private[sql] val agnosticEncoder: AgnosticEncoder[T] = encoderFor(encoder)
+
   override def toString: String = {
 try {
   val builder = new mutable.StringBuilder
@@ -828,7 +830,7 @@ class Dataset[T] private[sql] (
   }
 
   private def buildSort(global: Boolean, sortExprs: Seq[Column]): Dataset[T] = 
{
-sparkSession.newDataset(encoder) { builder =>
+sparkSession.newDataset(agnosticEncoder) { builder =>
   builder.getSortBuilder
 .setInput(plan.getRoot)
 .setIsGlobal(global)
@@ -878,8 +880,8 @@ class Dataset[T] private[sql] (
   ProductEncoder[(T, U)](
 
ClassTag(SparkClassUtils.getContextOrSparkClassLoader.loadClass(s"scala.Tuple2")),
 Seq(
-  EncoderField(s"_1", this.encoder, leftNullable, Metadata.empty),
-  EncoderField(s"_2", other.encoder, rightNullable, Metadata.empty)))
+  EncoderField(s"_1", this.agnosticEncoder, leftNullable, 
Metadata.empty),
+  EncoderField(s"_2", other.agnosticEncoder, rightNullable, 
Metadata.empty)))
 
 sparkSession.newDataset(tupleEncoder) { builder =>
   val joinBuilder = builder.getJoinBuilder
@@ -889,8 +891,8 @@ class Dataset[T] private[sql] (
 .setJoinType(joinTypeValue)
 .setJoinCondition(condition.expr)
 .setJoinDataType(joinBuilder.getJoinDataTypeBuilder
-  .setIsLeftStruct(this.encoder.isStruct)
-  .setIsRightStruct(other.encoder.isStruct))
+  .setIsLeftStruct(this.agnosticEncoder.isStruct)
+  .setIsRightStruct(other.agnosticEncoder.isStruct))
 }
   }
 
@@ -1010,13 +1012,13 @@ class Dataset[T] private[sql] (
* @since 3.4.0
*/
   @scala.annotation.varargs
-  def hint(name: String, parameters: Any*): Dataset[T] = 
sparkSession.newDataset(encoder) {
-builder =>
+  def hint(name: String, parameters: Any*): Dataset[T] =
+sparkSession.newDataset(agnosticEncoder) { builder =>
   builder.getHintBuilder
 .setInput(plan.getRoot)
 .setName(name)
 .addAllParameters(parameters.map(p => functions.lit(p).expr).asJava)
-  }
+}
 
   private def getPlanId: Option[Long] =
 if (plan.getRoot.hasCommon && plan.getRoot.getCommon.hasPlanId) {
@@ -1056,7 +1058,7 @@ class Dataset[T] private[sql] (
* @group typedrel
* @since 3.4.0
*/
-  def as(alias: String): Dataset[T] = sparkSession.newDataset(encoder) { 
builder =>
+  def as(alias: String): Dataset[T] = sparkSession.newDataset(agnosticEncoder) 
{ builder =>
 builder.getSubqueryAliasBuilder
   .setInput(plan.getRoot)
   .setAlias(alias)
@@ -1238,8 +1240,9 @@ 

[spark] branch master updated: [SPARK-44720][CONNECT] Make Dataset use Encoder instead of AgnosticEncoder

2023-08-09 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new be9ffb37585 [SPARK-44720][CONNECT] Make Dataset use Encoder instead of 
AgnosticEncoder
be9ffb37585 is described below

commit be9ffb37585fe421705ceaa52fe49b89c50703a3
Author: Herman van Hovell 
AuthorDate: Wed Aug 9 15:58:18 2023 +0200

[SPARK-44720][CONNECT] Make Dataset use Encoder instead of AgnosticEncoder

### What changes were proposed in this pull request?
Make the Spark Connect Dataset use Encoder instead of AgnosticEncoder

### Why are the changes needed?
We want to improve binary compatibility between the Spark Connect Scala 
Client and the original sql/core APIs.

### Does this PR introduce _any_ user-facing change?
Yes. It changes the type of `Dataset.encoder` from `AgnosticEncoder` to 
`Encoder`.

### How was this patch tested?
Existing tests.

Closes #42396 from hvanhovell/SPARK-44720.

Authored-by: Herman van Hovell 
Signed-off-by: Herman van Hovell 
---
 .../main/scala/org/apache/spark/sql/Dataset.scala  | 87 --
 .../apache/spark/sql/KeyValueGroupedDataset.scala  |  6 +-
 .../spark/sql/streaming/DataStreamWriter.scala |  2 +-
 .../CheckConnectJvmClientCompatibility.scala   |  3 -
 4 files changed, 50 insertions(+), 48 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
index 5f263903c8b..2d72ea6bda8 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -128,11 +128,13 @@ import org.apache.spark.util.SparkClassUtils
 class Dataset[T] private[sql] (
 val sparkSession: SparkSession,
 @DeveloperApi val plan: proto.Plan,
-val encoder: AgnosticEncoder[T])
+val encoder: Encoder[T])
 extends Serializable {
   // Make sure we don't forget to set plan id.
   assert(plan.getRoot.getCommon.hasPlanId)
 
+  private[sql] val agnosticEncoder: AgnosticEncoder[T] = encoderFor(encoder)
+
   override def toString: String = {
 try {
   val builder = new mutable.StringBuilder
@@ -828,7 +830,7 @@ class Dataset[T] private[sql] (
   }
 
   private def buildSort(global: Boolean, sortExprs: Seq[Column]): Dataset[T] = 
{
-sparkSession.newDataset(encoder) { builder =>
+sparkSession.newDataset(agnosticEncoder) { builder =>
   builder.getSortBuilder
 .setInput(plan.getRoot)
 .setIsGlobal(global)
@@ -878,8 +880,8 @@ class Dataset[T] private[sql] (
   ProductEncoder[(T, U)](
 
ClassTag(SparkClassUtils.getContextOrSparkClassLoader.loadClass(s"scala.Tuple2")),
 Seq(
-  EncoderField(s"_1", this.encoder, leftNullable, Metadata.empty),
-  EncoderField(s"_2", other.encoder, rightNullable, Metadata.empty)))
+  EncoderField(s"_1", this.agnosticEncoder, leftNullable, 
Metadata.empty),
+  EncoderField(s"_2", other.agnosticEncoder, rightNullable, 
Metadata.empty)))
 
 sparkSession.newDataset(tupleEncoder) { builder =>
   val joinBuilder = builder.getJoinBuilder
@@ -889,8 +891,8 @@ class Dataset[T] private[sql] (
 .setJoinType(joinTypeValue)
 .setJoinCondition(condition.expr)
 .setJoinDataType(joinBuilder.getJoinDataTypeBuilder
-  .setIsLeftStruct(this.encoder.isStruct)
-  .setIsRightStruct(other.encoder.isStruct))
+  .setIsLeftStruct(this.agnosticEncoder.isStruct)
+  .setIsRightStruct(other.agnosticEncoder.isStruct))
 }
   }
 
@@ -1010,13 +1012,13 @@ class Dataset[T] private[sql] (
* @since 3.4.0
*/
   @scala.annotation.varargs
-  def hint(name: String, parameters: Any*): Dataset[T] = 
sparkSession.newDataset(encoder) {
-builder =>
+  def hint(name: String, parameters: Any*): Dataset[T] =
+sparkSession.newDataset(agnosticEncoder) { builder =>
   builder.getHintBuilder
 .setInput(plan.getRoot)
 .setName(name)
 .addAllParameters(parameters.map(p => functions.lit(p).expr).asJava)
-  }
+}
 
   private def getPlanId: Option[Long] =
 if (plan.getRoot.hasCommon && plan.getRoot.getCommon.hasPlanId) {
@@ -1056,7 +1058,7 @@ class Dataset[T] private[sql] (
* @group typedrel
* @since 3.4.0
*/
-  def as(alias: String): Dataset[T] = sparkSession.newDataset(encoder) { 
builder =>
+  def as(alias: String): Dataset[T] = sparkSession.newDataset(agnosticEncoder) 
{ builder =>
 builder.getSubqueryAliasBuilder
   .setInput(plan.getRoot)
   .setAlias(alias)
@@ -1238,8 +1240,9 @@ class Dataset[T] private[sql] (
* @group typedrel
* @since 3.4.0
*/
-  def filter(condition: Column): 

[spark] branch branch-3.5 updated: [SPARK-43429][CONNECT] Deflake SparkSessionSuite

2023-08-09 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 767b2b5a8dc [SPARK-43429][CONNECT] Deflake SparkSessionSuite
767b2b5a8dc is described below

commit 767b2b5a8dc8d655ab6787845a87556f15456aaa
Author: Herman van Hovell 
AuthorDate: Wed Aug 9 20:42:20 2023 +0900

[SPARK-43429][CONNECT] Deflake SparkSessionSuite

### What changes were proposed in this pull request?
This PR tries to fix flakiness in the `SparkSessionSuite.active session in 
multiple threads` test. There was a chance that modification could happen 
before the other thread could check the state. This PR decouples modifcations 
from checks.

### Why are the changes needed?
Flaky tests are no bueno.

### Does this PR introduce _any_ user-facing change?
Yes.

### How was this patch tested?
It is a test.

Closes #42406 from hvanhovell/SPARK-43429-deflake.

Authored-by: Herman van Hovell 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit 27c5a1f9f0e322fad0da300afdb75eadd8224b15)
Signed-off-by: Hyukjin Kwon 
---
 .../org/apache/spark/sql/SparkSessionSuite.scala   | 32 ++
 1 file changed, 32 insertions(+)

diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionSuite.scala
index f06744399f8..2d7ded2d688 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionSuite.scala
@@ -171,42 +171,74 @@ class SparkSessionSuite extends ConnectFunSuite {
 
 try {
   val script1 = execute { phaser =>
+// Step 0 - check initial state
 phaser.arriveAndAwaitAdvance()
 assert(SparkSession.getDefaultSession.contains(session1))
 assert(SparkSession.getActiveSession.contains(session2))
 
+// Step 1 - new active session in script 2
+phaser.arriveAndAwaitAdvance()
+
+// Step2 - script 1 is unchanged, script 2 has new active session
 phaser.arriveAndAwaitAdvance()
 assert(SparkSession.getDefaultSession.contains(session1))
 assert(SparkSession.getActiveSession.contains(session2))
+
+// Step 3 - close session 1, no more default session in both scripts
+phaser.arriveAndAwaitAdvance()
 session1.close()
 
+// Step 4 - no default session, same active session.
 phaser.arriveAndAwaitAdvance()
 assert(SparkSession.getDefaultSession.isEmpty)
 assert(SparkSession.getActiveSession.contains(session2))
+
+// Step 5 - clear active session in script 1
+phaser.arriveAndAwaitAdvance()
 SparkSession.clearActiveSession()
 
+// Step 6 - no default/no active session in script 1, script2 
unchanged.
 phaser.arriveAndAwaitAdvance()
 assert(SparkSession.getDefaultSession.isEmpty)
 assert(SparkSession.getActiveSession.isEmpty)
+
+// Step 7 - close active session in script2
+phaser.arriveAndAwaitAdvance()
   }
   val script2 = execute { phaser =>
+// Step 0 - check initial state
 phaser.arriveAndAwaitAdvance()
 assert(SparkSession.getDefaultSession.contains(session1))
 assert(SparkSession.getActiveSession.contains(session2))
+
+// Step 1 - new active session in script 2
+phaser.arriveAndAwaitAdvance()
 SparkSession.clearActiveSession()
 val internalSession = 
SparkSession.builder().remote(connectionString3).getOrCreate()
 
+// Step2 - script 1 is unchanged, script 2 has new active session
 phaser.arriveAndAwaitAdvance()
 assert(SparkSession.getDefaultSession.contains(session1))
 assert(SparkSession.getActiveSession.contains(internalSession))
 
+// Step 3 - close session 1, no more default session in both scripts
+phaser.arriveAndAwaitAdvance()
+
+// Step 4 - no default session, same active session.
 phaser.arriveAndAwaitAdvance()
 assert(SparkSession.getDefaultSession.isEmpty)
 assert(SparkSession.getActiveSession.contains(internalSession))
 
+// Step 5 - clear active session in script 1
+phaser.arriveAndAwaitAdvance()
+
+// Step 6 - no default/no active session in script 1, script2 
unchanged.
 phaser.arriveAndAwaitAdvance()
 assert(SparkSession.getDefaultSession.isEmpty)
 assert(SparkSession.getActiveSession.contains(internalSession))
+
+// Step 7 - close active session in script2
+phaser.arriveAndAwaitAdvance()
 internalSession.close()
 assert(SparkSession.getActiveSession.isEmpty)
   }



[spark] branch master updated: [SPARK-43429][CONNECT] Deflake SparkSessionSuite

2023-08-09 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 27c5a1f9f0e [SPARK-43429][CONNECT] Deflake SparkSessionSuite
27c5a1f9f0e is described below

commit 27c5a1f9f0e322fad0da300afdb75eadd8224b15
Author: Herman van Hovell 
AuthorDate: Wed Aug 9 20:42:20 2023 +0900

[SPARK-43429][CONNECT] Deflake SparkSessionSuite

### What changes were proposed in this pull request?
This PR tries to fix flakiness in the `SparkSessionSuite.active session in 
multiple threads` test. There was a chance that modification could happen 
before the other thread could check the state. This PR decouples modifcations 
from checks.

### Why are the changes needed?
Flaky tests are no bueno.

### Does this PR introduce _any_ user-facing change?
Yes.

### How was this patch tested?
It is a test.

Closes #42406 from hvanhovell/SPARK-43429-deflake.

Authored-by: Herman van Hovell 
Signed-off-by: Hyukjin Kwon 
---
 .../org/apache/spark/sql/SparkSessionSuite.scala   | 32 ++
 1 file changed, 32 insertions(+)

diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionSuite.scala
index f06744399f8..2d7ded2d688 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionSuite.scala
@@ -171,42 +171,74 @@ class SparkSessionSuite extends ConnectFunSuite {
 
 try {
   val script1 = execute { phaser =>
+// Step 0 - check initial state
 phaser.arriveAndAwaitAdvance()
 assert(SparkSession.getDefaultSession.contains(session1))
 assert(SparkSession.getActiveSession.contains(session2))
 
+// Step 1 - new active session in script 2
+phaser.arriveAndAwaitAdvance()
+
+// Step2 - script 1 is unchanged, script 2 has new active session
 phaser.arriveAndAwaitAdvance()
 assert(SparkSession.getDefaultSession.contains(session1))
 assert(SparkSession.getActiveSession.contains(session2))
+
+// Step 3 - close session 1, no more default session in both scripts
+phaser.arriveAndAwaitAdvance()
 session1.close()
 
+// Step 4 - no default session, same active session.
 phaser.arriveAndAwaitAdvance()
 assert(SparkSession.getDefaultSession.isEmpty)
 assert(SparkSession.getActiveSession.contains(session2))
+
+// Step 5 - clear active session in script 1
+phaser.arriveAndAwaitAdvance()
 SparkSession.clearActiveSession()
 
+// Step 6 - no default/no active session in script 1, script2 
unchanged.
 phaser.arriveAndAwaitAdvance()
 assert(SparkSession.getDefaultSession.isEmpty)
 assert(SparkSession.getActiveSession.isEmpty)
+
+// Step 7 - close active session in script2
+phaser.arriveAndAwaitAdvance()
   }
   val script2 = execute { phaser =>
+// Step 0 - check initial state
 phaser.arriveAndAwaitAdvance()
 assert(SparkSession.getDefaultSession.contains(session1))
 assert(SparkSession.getActiveSession.contains(session2))
+
+// Step 1 - new active session in script 2
+phaser.arriveAndAwaitAdvance()
 SparkSession.clearActiveSession()
 val internalSession = 
SparkSession.builder().remote(connectionString3).getOrCreate()
 
+// Step2 - script 1 is unchanged, script 2 has new active session
 phaser.arriveAndAwaitAdvance()
 assert(SparkSession.getDefaultSession.contains(session1))
 assert(SparkSession.getActiveSession.contains(internalSession))
 
+// Step 3 - close session 1, no more default session in both scripts
+phaser.arriveAndAwaitAdvance()
+
+// Step 4 - no default session, same active session.
 phaser.arriveAndAwaitAdvance()
 assert(SparkSession.getDefaultSession.isEmpty)
 assert(SparkSession.getActiveSession.contains(internalSession))
 
+// Step 5 - clear active session in script 1
+phaser.arriveAndAwaitAdvance()
+
+// Step 6 - no default/no active session in script 1, script2 
unchanged.
 phaser.arriveAndAwaitAdvance()
 assert(SparkSession.getDefaultSession.isEmpty)
 assert(SparkSession.getActiveSession.contains(internalSession))
+
+// Step 7 - close active session in script2
+phaser.arriveAndAwaitAdvance()
 internalSession.close()
 assert(SparkSession.getActiveSession.isEmpty)
   }


-
To unsubscribe, e-mail: 

[spark] branch master updated: [SPARK-42620][PS] Add `inclusive` parameter for (DataFrame|Series).between_time

2023-08-09 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 0b757d3b610 [SPARK-42620][PS] Add `inclusive` parameter for 
(DataFrame|Series).between_time
0b757d3b610 is described below

commit 0b757d3b610a90e70828d5e21521810f923c6aed
Author: zhyhimont 
AuthorDate: Wed Aug 9 20:35:15 2023 +0900

[SPARK-42620][PS] Add `inclusive` parameter for 
(DataFrame|Series).between_time

### What changes were proposed in this pull request?

Add `inclusive` parameter for (DataFrame|Series).between_time to support 
the pandas 2.0.0

### Why are the changes needed?

When pandas 2.0.0 is released, we should match the behavior in pandas API 
on Spark.

### Does this PR introduce _any_ user-facing change?

yes, the API changes

Before:
` (DataFrame|Series).between_time(start_time, end_time, include_start, 
include_end, axis)`

After:
` (DataFrame|Series).between_time(start_time, end_time, inclusive, axis)`

### How was this patch tested?

Unit tests were updated

Closes #40370 from dzhigimont/SPARK-42620-ZH.

Lead-authored-by: zhyhimont 
Co-authored-by: Zhyhimont Dmitry 
Co-authored-by: Zhyhimont Dmitry 
Signed-off-by: Hyukjin Kwon 
---
 .../source/migration_guide/pyspark_upgrade.rst |  2 ++
 python/pyspark/errors/error_classes.py |  5 
 python/pyspark/pandas/frame.py | 30 +++-
 python/pyspark/pandas/series.py| 18 
 .../pyspark/pandas/tests/frame/test_reindexing.py  | 33 +++---
 python/pyspark/pandas/tests/series/test_compute.py | 32 ++---
 6 files changed, 86 insertions(+), 34 deletions(-)

diff --git a/python/docs/source/migration_guide/pyspark_upgrade.rst 
b/python/docs/source/migration_guide/pyspark_upgrade.rst
index 1b247d46227..da49719579a 100644
--- a/python/docs/source/migration_guide/pyspark_upgrade.rst
+++ b/python/docs/source/migration_guide/pyspark_upgrade.rst
@@ -32,6 +32,8 @@ Upgrading from PySpark 3.5 to 4.0
 * In Spark 4.0, ``na_sentinel`` parameter from ``Index.factorize`` and 
`Series.factorize`` has been removed from pandas API on Spark, use 
``use_na_sentinel`` instead.
 * In Spark 4.0, ``inplace`` parameter from ``Categorical.add_categories``, 
``Categorical.remove_categories``, ``Categorical.set_categories``, 
``Categorical.rename_categories``, ``Categorical.reorder_categories``, 
``Categorical.as_ordered``, ``Categorical.as_unordered`` have been removed from 
pandas API on Spark.
 * In Spark 4.0, ``closed`` parameter from ``ps.date_range`` has been removed 
from pandas API on Spark.
+* In Spark 4.0, ``include_start`` and ``include_end`` parameters from 
``DataFrame.between_time`` have been removed from pandas API on Spark, use 
``inclusive`` instead.
+* In Spark 4.0, ``include_start`` and ``include_end`` parameters from 
``Series.between_time`` have been removed from pandas API on Spark, use 
``inclusive`` instead.
 
 
 Upgrading from PySpark 3.3 to 3.4
diff --git a/python/pyspark/errors/error_classes.py 
b/python/pyspark/errors/error_classes.py
index bc32afeb87a..c8033478362 100644
--- a/python/pyspark/errors/error_classes.py
+++ b/python/pyspark/errors/error_classes.py
@@ -853,6 +853,11 @@ ERROR_CLASSES_JSON = """
   "Value `` cannot be accessed inside tasks."
 ]
   },
+  "VALUE_NOT_ALLOWED" : {
+"message" : [
+  "Value for `` has to be amongst the following values: 
."
+]
+  },
   "VALUE_NOT_ANY_OR_ALL" : {
 "message" : [
   "Value for `` must be 'any' or 'all', got ''."
diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py
index 65c43eb7cf4..8fbe1b8f926 100644
--- a/python/pyspark/pandas/frame.py
+++ b/python/pyspark/pandas/frame.py
@@ -59,6 +59,8 @@ from pandas.api.types import (  # type: ignore[attr-defined]
 )
 from pandas.tseries.frequencies import DateOffset, to_offset
 
+from pyspark.errors import PySparkValueError
+
 if TYPE_CHECKING:
 from pandas.io.formats.style import Styler
 
@@ -3501,14 +3503,11 @@ defaultdict(, {'col..., 'col...})]
 ).resolved_copy
 return DataFrame(internal)
 
-# TODO(SPARK-42620): Add `inclusive` parameter and replace `include_start` 
& `include_end`.
-# See https://github.com/pandas-dev/pandas/issues/43248
 def between_time(
 self,
 start_time: Union[datetime.time, str],
 end_time: Union[datetime.time, str],
-include_start: bool = True,
-include_end: bool = True,
+inclusive: str = "both",
 axis: Axis = 0,
 ) -> "DataFrame":
 """
@@ -3523,15 +3522,10 @@ defaultdict(, {'col..., 'col...})]
 Initial time as a time filter limit.
 end_time : datetime.time or str
  

[spark] branch branch-3.5 updated: Revert "[SPARK-43660][CONNECT][PS][FOLLOWUP] Remove JVM dependency for resample"

2023-08-09 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 39b3dd72bec Revert "[SPARK-43660][CONNECT][PS][FOLLOWUP] Remove JVM 
dependency for resample"
39b3dd72bec is described below

commit 39b3dd72bec3f78b55348007dbeaab67d15165f6
Author: Hyukjin Kwon 
AuthorDate: Wed Aug 9 20:34:19 2023 +0900

Revert "[SPARK-43660][CONNECT][PS][FOLLOWUP] Remove JVM dependency for 
resample"

This reverts commit 2e566a5b6d3871315b9115753ebb1aab75cdbe13.
---
 python/pyspark/pandas/resample.py | 26 +-
 1 file changed, 17 insertions(+), 9 deletions(-)

diff --git a/python/pyspark/pandas/resample.py 
b/python/pyspark/pandas/resample.py
index 0d2c3cc753c..30f8c9d3169 100644
--- a/python/pyspark/pandas/resample.py
+++ b/python/pyspark/pandas/resample.py
@@ -67,6 +67,7 @@ from pyspark.pandas.utils import (
 scol_for,
 verify_temp_column_name,
 )
+from pyspark.sql.utils import is_remote
 from pyspark.pandas.spark.functions import timestampdiff
 
 
@@ -144,15 +145,22 @@ class Resampler(Generic[FrameLike], metaclass=ABCMeta):
 def get_make_interval(  # type: ignore[return]
 self, unit: str, col: Union[Column, int, float]
 ) -> Column:
-col = col if not isinstance(col, (int, float)) else F.lit(col)  # 
type: ignore[assignment]
-if unit == "MONTH":
-return F.make_interval(months=col)  # type: ignore
-if unit == "HOUR":
-return F.make_interval(hours=col)  # type: ignore
-if unit == "MINUTE":
-return F.make_interval(mins=col)  # type: ignore
-if unit == "SECOND":
-return F.make_interval(secs=col)  # type: ignore
+if is_remote():
+from pyspark.sql.connect.functions import lit, make_interval
+
+col = col if not isinstance(col, (int, float)) else lit(col)  # 
type: ignore[assignment]
+if unit == "MONTH":
+return make_interval(months=col)  # type: ignore
+if unit == "HOUR":
+return make_interval(hours=col)  # type: ignore
+if unit == "MINUTE":
+return make_interval(mins=col)  # type: ignore
+if unit == "SECOND":
+return make_interval(secs=col)  # type: ignore
+else:
+sql_utils = SparkContext._active_spark_context._jvm.PythonSQLUtils
+col = col._jc if isinstance(col, Column) else F.lit(col)._jc
+return sql_utils.makeInterval(unit, col)
 
 def _bin_timestamp(self, origin: pd.Timestamp, ts_scol: Column) -> Column:
 key_type = self._resamplekey_type


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: Revert "[SPARK-43660][CONNECT][PS][FOLLOWUP] Remove JVM dependency for resample"

2023-08-09 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new e5e0f389dad Revert "[SPARK-43660][CONNECT][PS][FOLLOWUP] Remove JVM 
dependency for resample"
e5e0f389dad is described below

commit e5e0f389dad678e664d5e9897d40a8765a0dc2e5
Author: Hyukjin Kwon 
AuthorDate: Wed Aug 9 20:33:50 2023 +0900

Revert "[SPARK-43660][CONNECT][PS][FOLLOWUP] Remove JVM dependency for 
resample"

This reverts commit b19a83e236b2732c2af6d0a4d79ebcbca4105c59.
---
 python/pyspark/pandas/resample.py | 26 +-
 1 file changed, 17 insertions(+), 9 deletions(-)

diff --git a/python/pyspark/pandas/resample.py 
b/python/pyspark/pandas/resample.py
index 0d2c3cc753c..30f8c9d3169 100644
--- a/python/pyspark/pandas/resample.py
+++ b/python/pyspark/pandas/resample.py
@@ -67,6 +67,7 @@ from pyspark.pandas.utils import (
 scol_for,
 verify_temp_column_name,
 )
+from pyspark.sql.utils import is_remote
 from pyspark.pandas.spark.functions import timestampdiff
 
 
@@ -144,15 +145,22 @@ class Resampler(Generic[FrameLike], metaclass=ABCMeta):
 def get_make_interval(  # type: ignore[return]
 self, unit: str, col: Union[Column, int, float]
 ) -> Column:
-col = col if not isinstance(col, (int, float)) else F.lit(col)  # 
type: ignore[assignment]
-if unit == "MONTH":
-return F.make_interval(months=col)  # type: ignore
-if unit == "HOUR":
-return F.make_interval(hours=col)  # type: ignore
-if unit == "MINUTE":
-return F.make_interval(mins=col)  # type: ignore
-if unit == "SECOND":
-return F.make_interval(secs=col)  # type: ignore
+if is_remote():
+from pyspark.sql.connect.functions import lit, make_interval
+
+col = col if not isinstance(col, (int, float)) else lit(col)  # 
type: ignore[assignment]
+if unit == "MONTH":
+return make_interval(months=col)  # type: ignore
+if unit == "HOUR":
+return make_interval(hours=col)  # type: ignore
+if unit == "MINUTE":
+return make_interval(mins=col)  # type: ignore
+if unit == "SECOND":
+return make_interval(secs=col)  # type: ignore
+else:
+sql_utils = SparkContext._active_spark_context._jvm.PythonSQLUtils
+col = col._jc if isinstance(col, Column) else F.lit(col)._jc
+return sql_utils.makeInterval(unit, col)
 
 def _bin_timestamp(self, origin: pd.Timestamp, ts_scol: Column) -> Column:
 key_type = self._resamplekey_type


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.5 updated: [SPARK-43660][CONNECT][PS][FOLLOWUP] Remove JVM dependency for resample

2023-08-09 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 2e566a5b6d3 [SPARK-43660][CONNECT][PS][FOLLOWUP] Remove JVM dependency 
for resample
2e566a5b6d3 is described below

commit 2e566a5b6d3871315b9115753ebb1aab75cdbe13
Author: itholic 
AuthorDate: Wed Aug 9 20:32:48 2023 +0900

[SPARK-43660][CONNECT][PS][FOLLOWUP] Remove JVM dependency for resample

### What changes were proposed in this pull request?

This is follow-up for https://github.com/apache/spark/pull/41877 to remove 
JVM dependency.

### Why are the changes needed?

To remove JVM dependency from Pandas API on Spark with Spark Connect.

### Does this PR introduce _any_ user-facing change?

No, it's internal handling.

### How was this patch tested?

The existing UT.

Closes #42410 from itholic/resample_followup.

Authored-by: itholic 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit b19a83e236b2732c2af6d0a4d79ebcbca4105c59)
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/pandas/resample.py | 26 +-
 1 file changed, 9 insertions(+), 17 deletions(-)

diff --git a/python/pyspark/pandas/resample.py 
b/python/pyspark/pandas/resample.py
index 30f8c9d3169..0d2c3cc753c 100644
--- a/python/pyspark/pandas/resample.py
+++ b/python/pyspark/pandas/resample.py
@@ -67,7 +67,6 @@ from pyspark.pandas.utils import (
 scol_for,
 verify_temp_column_name,
 )
-from pyspark.sql.utils import is_remote
 from pyspark.pandas.spark.functions import timestampdiff
 
 
@@ -145,22 +144,15 @@ class Resampler(Generic[FrameLike], metaclass=ABCMeta):
 def get_make_interval(  # type: ignore[return]
 self, unit: str, col: Union[Column, int, float]
 ) -> Column:
-if is_remote():
-from pyspark.sql.connect.functions import lit, make_interval
-
-col = col if not isinstance(col, (int, float)) else lit(col)  # 
type: ignore[assignment]
-if unit == "MONTH":
-return make_interval(months=col)  # type: ignore
-if unit == "HOUR":
-return make_interval(hours=col)  # type: ignore
-if unit == "MINUTE":
-return make_interval(mins=col)  # type: ignore
-if unit == "SECOND":
-return make_interval(secs=col)  # type: ignore
-else:
-sql_utils = SparkContext._active_spark_context._jvm.PythonSQLUtils
-col = col._jc if isinstance(col, Column) else F.lit(col)._jc
-return sql_utils.makeInterval(unit, col)
+col = col if not isinstance(col, (int, float)) else F.lit(col)  # 
type: ignore[assignment]
+if unit == "MONTH":
+return F.make_interval(months=col)  # type: ignore
+if unit == "HOUR":
+return F.make_interval(hours=col)  # type: ignore
+if unit == "MINUTE":
+return F.make_interval(mins=col)  # type: ignore
+if unit == "SECOND":
+return F.make_interval(secs=col)  # type: ignore
 
 def _bin_timestamp(self, origin: pd.Timestamp, ts_scol: Column) -> Column:
 key_type = self._resamplekey_type


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (4f25df1dc25 -> b19a83e236b)

2023-08-09 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 4f25df1dc25 [SPARK-43979][SQL][FOLLOWUP] transformUpWithNewOutput` 
should only be used with new outputs
 add b19a83e236b [SPARK-43660][CONNECT][PS][FOLLOWUP] Remove JVM dependency 
for resample

No new revisions were added by this update.

Summary of changes:
 python/pyspark/pandas/resample.py | 26 +-
 1 file changed, 9 insertions(+), 17 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-43979][SQL][FOLLOWUP] transformUpWithNewOutput` should only be used with new outputs

2023-08-09 Thread yao
This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 4f25df1dc25 [SPARK-43979][SQL][FOLLOWUP] transformUpWithNewOutput` 
should only be used with new outputs
4f25df1dc25 is described below

commit 4f25df1dc25cc4f002107821cc67e35c1fe0e42c
Author: Wenchen Fan 
AuthorDate: Wed Aug 9 19:16:37 2023 +0800

[SPARK-43979][SQL][FOLLOWUP] transformUpWithNewOutput` should only be used 
with new outputs

### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/41475 . It's 
risky to use `transformUpWithNewOutput` with existing attribute ids. If the 
plan contains duplicated attribute ids somewhere, then we will hit conflicting 
attributes and an assertion error will be thrown by 
`QueryPlan#transformUpWithNewOutput`.

This PR takes a different approach. We canonicalize the plan first and then 
remove the alias-only project. Then we don't need `transformUpWithNewOutput` 
anymore as all attribute ids are normalized in the canonicalized plan.

### Why are the changes needed?

fix potential bugs

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

existing tests

Closes #42408 from cloud-fan/collect-metrics.

Authored-by: Wenchen Fan 
Signed-off-by: Kent Yao 
---
 .../apache/spark/sql/catalyst/analysis/CheckAnalysis.scala   | 12 ++--
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index fee5660017c..0b953fc2b61 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -1080,13 +1080,13 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
 def check(plan: LogicalPlan): Unit = plan.foreach { node =>
   node match {
 case metrics @ CollectMetrics(name, _, _) =>
-  val simplifiedMetrics = simplifyPlanForCollectedMetrics(metrics)
+  val simplifiedMetrics = 
simplifyPlanForCollectedMetrics(metrics.canonicalized)
   metricsMap.get(name) match {
 case Some(other) =>
-  val simplifiedOther = simplifyPlanForCollectedMetrics(other)
+  val simplifiedOther = 
simplifyPlanForCollectedMetrics(other.canonicalized)
   // Exact duplicates are allowed. They can be the result
   // of a CTE that is used multiple times or a self join.
-  if (!simplifiedMetrics.sameResult(simplifiedOther)) {
+  if (simplifiedMetrics != simplifiedOther) {
 failAnalysis(
   errorClass = "DUPLICATED_METRICS_NAME",
   messageParameters = Map("metricName" -> name))
@@ -,7 +,7 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
* duplicates metric definition.
*/
   private def simplifyPlanForCollectedMetrics(plan: LogicalPlan): LogicalPlan 
= {
-plan.transformUpWithNewOutput {
+plan.resolveOperators {
   case p: Project if p.projectList.size == p.child.output.size =>
 val assignExprIdOnly = p.projectList.zip(p.child.output).forall {
   case (left: Alias, right: Attribute) =>
@@ -1119,9 +1119,9 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
   case _ => false
 }
 if (assignExprIdOnly) {
-  (p.child, p.output.zip(p.child.output))
+  p.child
 } else {
-  (p, Nil)
+  p
 }
 }
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.5 updated: [SPARK-43979][SQL][FOLLOWUP] transformUpWithNewOutput` should only be used with new outputs

2023-08-09 Thread yao
This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new b8204d1b89e [SPARK-43979][SQL][FOLLOWUP] transformUpWithNewOutput` 
should only be used with new outputs
b8204d1b89e is described below

commit b8204d1b89eea0c32e5269fb155651157e2c96e8
Author: Wenchen Fan 
AuthorDate: Wed Aug 9 19:16:37 2023 +0800

[SPARK-43979][SQL][FOLLOWUP] transformUpWithNewOutput` should only be used 
with new outputs

### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/41475 . It's 
risky to use `transformUpWithNewOutput` with existing attribute ids. If the 
plan contains duplicated attribute ids somewhere, then we will hit conflicting 
attributes and an assertion error will be thrown by 
`QueryPlan#transformUpWithNewOutput`.

This PR takes a different approach. We canonicalize the plan first and then 
remove the alias-only project. Then we don't need `transformUpWithNewOutput` 
anymore as all attribute ids are normalized in the canonicalized plan.

### Why are the changes needed?

fix potential bugs

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

existing tests

Closes #42408 from cloud-fan/collect-metrics.

Authored-by: Wenchen Fan 
Signed-off-by: Kent Yao 
(cherry picked from commit 4f25df1dc25cc4f002107821cc67e35c1fe0e42c)
Signed-off-by: Kent Yao 
---
 .../apache/spark/sql/catalyst/analysis/CheckAnalysis.scala   | 12 ++--
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index e198fd58953..848749f9e3b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -1071,13 +1071,13 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
 def check(plan: LogicalPlan): Unit = plan.foreach { node =>
   node match {
 case metrics @ CollectMetrics(name, _, _) =>
-  val simplifiedMetrics = simplifyPlanForCollectedMetrics(metrics)
+  val simplifiedMetrics = 
simplifyPlanForCollectedMetrics(metrics.canonicalized)
   metricsMap.get(name) match {
 case Some(other) =>
-  val simplifiedOther = simplifyPlanForCollectedMetrics(other)
+  val simplifiedOther = 
simplifyPlanForCollectedMetrics(other.canonicalized)
   // Exact duplicates are allowed. They can be the result
   // of a CTE that is used multiple times or a self join.
-  if (!simplifiedMetrics.sameResult(simplifiedOther)) {
+  if (simplifiedMetrics != simplifiedOther) {
 failAnalysis(
   errorClass = "DUPLICATED_METRICS_NAME",
   messageParameters = Map("metricName" -> name))
@@ -1102,7 +1102,7 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
* duplicates metric definition.
*/
   private def simplifyPlanForCollectedMetrics(plan: LogicalPlan): LogicalPlan 
= {
-plan.transformUpWithNewOutput {
+plan.resolveOperators {
   case p: Project if p.projectList.size == p.child.output.size =>
 val assignExprIdOnly = p.projectList.zip(p.child.output).forall {
   case (left: Alias, right: Attribute) =>
@@ -1110,9 +1110,9 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
   case _ => false
 }
 if (assignExprIdOnly) {
-  (p.child, p.output.zip(p.child.output))
+  p.child
 } else {
-  (p, Nil)
+  p
 }
 }
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.5 updated: [SPARK-44738][PYTHON][CONNECT] Add missing client metadata to calls

2023-08-09 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 03dfd4747ae [SPARK-44738][PYTHON][CONNECT] Add missing client metadata 
to calls
03dfd4747ae is described below

commit 03dfd4747ae3b59e7e3ac348c30e82b63e1ed269
Author: Martin Grund 
AuthorDate: Wed Aug 9 18:08:23 2023 +0900

[SPARK-44738][PYTHON][CONNECT] Add missing client metadata to calls

### What changes were proposed in this pull request?

The refactoring for the re-attachable execution missed properly propagating 
the client metadata for the individual RPC calls.

### Why are the changes needed?
Compatibility.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing UT

Closes #42409 from grundprinzip/SPARK-44738.

Authored-by: Martin Grund 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit c73660c3e7279f61fe6e2f6bbf88f410f7ce25a1)
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/connect/client/reattach.py | 11 +++
 1 file changed, 7 insertions(+), 4 deletions(-)

diff --git a/python/pyspark/sql/connect/client/reattach.py 
b/python/pyspark/sql/connect/client/reattach.py
index c5c45904c9b..c6b1beaa121 100644
--- a/python/pyspark/sql/connect/client/reattach.py
+++ b/python/pyspark/sql/connect/client/reattach.py
@@ -135,7 +135,9 @@ class ExecutePlanResponseReattachableIterator(Generator):
 if not attempt.is_first_try():
 # on retry, the iterator is borked, so we need a 
new one
 self._iterator = iter(
-
self._stub.ReattachExecute(self._create_reattach_execute_request())
+self._stub.ReattachExecute(
+self._create_reattach_execute_request(), 
metadata=self._metadata
+)
 )
 
 if self._current is None:
@@ -154,7 +156,8 @@ class ExecutePlanResponseReattachableIterator(Generator):
 while not has_next:
 self._iterator = iter(
 self._stub.ReattachExecute(
-self._create_reattach_execute_request()
+
self._create_reattach_execute_request(),
+metadata=self._metadata,
 )
 )
 # shouldn't change
@@ -192,7 +195,7 @@ class ExecutePlanResponseReattachableIterator(Generator):
 can_retry=SparkConnectClient.retry_exception, 
**self._retry_policy
 ):
 with attempt:
-self._stub.ReleaseExecute(request)
+self._stub.ReleaseExecute(request, 
metadata=self._metadata)
 except Exception as e:
 warnings.warn(f"ReleaseExecute failed with exception: {e}.")
 
@@ -220,7 +223,7 @@ class ExecutePlanResponseReattachableIterator(Generator):
 can_retry=SparkConnectClient.retry_exception, 
**self._retry_policy
 ):
 with attempt:
-self._stub.ReleaseExecute(request)
+self._stub.ReleaseExecute(request, 
metadata=self._metadata)
 except Exception as e:
 warnings.warn(f"ReleaseExecute failed with exception: {e}.")
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-44738][PYTHON][CONNECT] Add missing client metadata to calls

2023-08-09 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new c73660c3e72 [SPARK-44738][PYTHON][CONNECT] Add missing client metadata 
to calls
c73660c3e72 is described below

commit c73660c3e7279f61fe6e2f6bbf88f410f7ce25a1
Author: Martin Grund 
AuthorDate: Wed Aug 9 18:08:23 2023 +0900

[SPARK-44738][PYTHON][CONNECT] Add missing client metadata to calls

### What changes were proposed in this pull request?

The refactoring for the re-attachable execution missed properly propagating 
the client metadata for the individual RPC calls.

### Why are the changes needed?
Compatibility.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing UT

Closes #42409 from grundprinzip/SPARK-44738.

Authored-by: Martin Grund 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/connect/client/reattach.py | 11 +++
 1 file changed, 7 insertions(+), 4 deletions(-)

diff --git a/python/pyspark/sql/connect/client/reattach.py 
b/python/pyspark/sql/connect/client/reattach.py
index c5c45904c9b..c6b1beaa121 100644
--- a/python/pyspark/sql/connect/client/reattach.py
+++ b/python/pyspark/sql/connect/client/reattach.py
@@ -135,7 +135,9 @@ class ExecutePlanResponseReattachableIterator(Generator):
 if not attempt.is_first_try():
 # on retry, the iterator is borked, so we need a 
new one
 self._iterator = iter(
-
self._stub.ReattachExecute(self._create_reattach_execute_request())
+self._stub.ReattachExecute(
+self._create_reattach_execute_request(), 
metadata=self._metadata
+)
 )
 
 if self._current is None:
@@ -154,7 +156,8 @@ class ExecutePlanResponseReattachableIterator(Generator):
 while not has_next:
 self._iterator = iter(
 self._stub.ReattachExecute(
-self._create_reattach_execute_request()
+
self._create_reattach_execute_request(),
+metadata=self._metadata,
 )
 )
 # shouldn't change
@@ -192,7 +195,7 @@ class ExecutePlanResponseReattachableIterator(Generator):
 can_retry=SparkConnectClient.retry_exception, 
**self._retry_policy
 ):
 with attempt:
-self._stub.ReleaseExecute(request)
+self._stub.ReleaseExecute(request, 
metadata=self._metadata)
 except Exception as e:
 warnings.warn(f"ReleaseExecute failed with exception: {e}.")
 
@@ -220,7 +223,7 @@ class ExecutePlanResponseReattachableIterator(Generator):
 can_retry=SparkConnectClient.retry_exception, 
**self._retry_policy
 ):
 with attempt:
-self._stub.ReleaseExecute(request)
+self._stub.ReleaseExecute(request, 
metadata=self._metadata)
 except Exception as e:
 warnings.warn(f"ReleaseExecute failed with exception: {e}.")
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.5 updated: [SPARK-44551][SQL] Fix behavior of null IN (empty list) in expression execution

2023-08-09 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 7520fc160aa [SPARK-44551][SQL] Fix behavior of null IN (empty list) in 
expression execution
7520fc160aa is described below

commit 7520fc160aa75360e29d5527e2a59c5605f144d4
Author: Jack Chen 
AuthorDate: Wed Aug 9 15:10:51 2023 +0800

[SPARK-44551][SQL] Fix behavior of null IN (empty list) in expression 
execution

### What changes were proposed in this pull request?
`null IN (empty list)` incorrectly evaluates to null, when it should 
evaluate to false. (The reason it should be false is because a IN (b1, b2) is 
defined as a = b1 OR a = b2, and an empty IN list is treated as an empty OR 
which is false. This is specified by ANSI SQL.)

Many places in Spark execution (In, InSet, InSubquery) and optimization 
(OptimizeIn, NullPropagation) implemented this wrong behavior. This is a 
longstanding correctness issue which has existed since null support for IN 
expressions was first added to Spark.

This PR fixes Spark execution (In, InSet, InSubquery). See previous PR 
https://github.com/apache/spark/pull/42007 for optimization fixes.

The behavior is under a flag, which will be available to revert to the 
legacy behavior if needed. This flag is set to disable the new behavior until 
all of the fix PRs are complete.

See [this 
doc](https://docs.google.com/document/d/1k8AY8oyT-GI04SnP7eXttPDnDj-Ek-c3luF2zL6DPNU/edit)
 for more information.

### Why are the changes needed?
Fix wrong SQL semantics

### Does this PR introduce _any_ user-facing change?
Not yet, but will fix wrong SQL semantics when enabled

### How was this patch tested?
Unit tests PredicateSuite and tests added in previous PR 
https://github.com/apache/spark/pull/42007

Closes #42163 from jchen5/null-in-empty-exec.

Authored-by: Jack Chen 
Signed-off-by: Wenchen Fan 
(cherry picked from commit f9058d69b2af774e78677ac4cad55c7c91eb42ae)
Signed-off-by: Wenchen Fan 
---
 .../sql/catalyst/expressions/predicates.scala  | 196 -
 .../spark/sql/catalyst/optimizer/expressions.scala |   4 +-
 .../sql/catalyst/expressions/PredicateSuite.scala  |  35 ++--
 .../subquery/in-subquery/in-null-semantics.sql |   3 +-
 .../subquery/in-subquery/in-null-semantics.sql.out |   4 +-
 .../scala/org/apache/spark/sql/EmptyInSuite.scala  |   3 +-
 6 files changed, 144 insertions(+), 101 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index ee2ba7c73d1..31b872e04ce 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -468,6 +468,8 @@ case class In(value: Expression, list: Seq[Expression]) 
extends Predicate {
   override def foldable: Boolean = children.forall(_.foldable)
 
   final override val nodePatterns: Seq[TreePattern] = Seq(IN)
+  private val legacyNullInEmptyBehavior =
+SQLConf.get.getConf(SQLConf.LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR)
 
   override lazy val canonicalized: Expression = {
 val basic = withNewChildren(children.map(_.canonicalized)).asInstanceOf[In]
@@ -481,88 +483,104 @@ case class In(value: Expression, list: Seq[Expression]) 
extends Predicate {
   override def toString: String = s"$value IN ${list.mkString("(", ",", ")")}"
 
   override def eval(input: InternalRow): Any = {
-val evaluatedValue = value.eval(input)
-if (evaluatedValue == null) {
-  null
+if (list.isEmpty && !legacyNullInEmptyBehavior) {
+  // IN (empty list) is always false under current behavior.
+  // Under legacy behavior it's null if the left side is null, otherwise 
false (SPARK-44550).
+  false
 } else {
-  var hasNull = false
-  list.foreach { e =>
-val v = e.eval(input)
-if (v == null) {
-  hasNull = true
-} else if (ordering.equiv(v, evaluatedValue)) {
-  return true
-}
-  }
-  if (hasNull) {
+  val evaluatedValue = value.eval(input)
+  if (evaluatedValue == null) {
 null
   } else {
-false
+var hasNull = false
+list.foreach { e =>
+  val v = e.eval(input)
+  if (v == null) {
+hasNull = true
+  } else if (ordering.equiv(v, evaluatedValue)) {
+return true
+  }
+}
+if (hasNull) {
+  null
+} else {
+  false
+}
   }
 }
   }
 
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
-val javaDataType = 

[spark] branch master updated: [SPARK-44551][SQL] Fix behavior of null IN (empty list) in expression execution

2023-08-09 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f9058d69b2a [SPARK-44551][SQL] Fix behavior of null IN (empty list) in 
expression execution
f9058d69b2a is described below

commit f9058d69b2af774e78677ac4cad55c7c91eb42ae
Author: Jack Chen 
AuthorDate: Wed Aug 9 15:10:51 2023 +0800

[SPARK-44551][SQL] Fix behavior of null IN (empty list) in expression 
execution

### What changes were proposed in this pull request?
`null IN (empty list)` incorrectly evaluates to null, when it should 
evaluate to false. (The reason it should be false is because a IN (b1, b2) is 
defined as a = b1 OR a = b2, and an empty IN list is treated as an empty OR 
which is false. This is specified by ANSI SQL.)

Many places in Spark execution (In, InSet, InSubquery) and optimization 
(OptimizeIn, NullPropagation) implemented this wrong behavior. This is a 
longstanding correctness issue which has existed since null support for IN 
expressions was first added to Spark.

This PR fixes Spark execution (In, InSet, InSubquery). See previous PR 
https://github.com/apache/spark/pull/42007 for optimization fixes.

The behavior is under a flag, which will be available to revert to the 
legacy behavior if needed. This flag is set to disable the new behavior until 
all of the fix PRs are complete.

See [this 
doc](https://docs.google.com/document/d/1k8AY8oyT-GI04SnP7eXttPDnDj-Ek-c3luF2zL6DPNU/edit)
 for more information.

### Why are the changes needed?
Fix wrong SQL semantics

### Does this PR introduce _any_ user-facing change?
Not yet, but will fix wrong SQL semantics when enabled

### How was this patch tested?
Unit tests PredicateSuite and tests added in previous PR 
https://github.com/apache/spark/pull/42007

Closes #42163 from jchen5/null-in-empty-exec.

Authored-by: Jack Chen 
Signed-off-by: Wenchen Fan 
---
 .../sql/catalyst/expressions/predicates.scala  | 196 -
 .../spark/sql/catalyst/optimizer/expressions.scala |   4 +-
 .../sql/catalyst/expressions/PredicateSuite.scala  |  35 ++--
 .../subquery/in-subquery/in-null-semantics.sql |   3 +-
 .../subquery/in-subquery/in-null-semantics.sql.out |   4 +-
 .../scala/org/apache/spark/sql/EmptyInSuite.scala  |   3 +-
 6 files changed, 144 insertions(+), 101 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index ee2ba7c73d1..31b872e04ce 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -468,6 +468,8 @@ case class In(value: Expression, list: Seq[Expression]) 
extends Predicate {
   override def foldable: Boolean = children.forall(_.foldable)
 
   final override val nodePatterns: Seq[TreePattern] = Seq(IN)
+  private val legacyNullInEmptyBehavior =
+SQLConf.get.getConf(SQLConf.LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR)
 
   override lazy val canonicalized: Expression = {
 val basic = withNewChildren(children.map(_.canonicalized)).asInstanceOf[In]
@@ -481,88 +483,104 @@ case class In(value: Expression, list: Seq[Expression]) 
extends Predicate {
   override def toString: String = s"$value IN ${list.mkString("(", ",", ")")}"
 
   override def eval(input: InternalRow): Any = {
-val evaluatedValue = value.eval(input)
-if (evaluatedValue == null) {
-  null
+if (list.isEmpty && !legacyNullInEmptyBehavior) {
+  // IN (empty list) is always false under current behavior.
+  // Under legacy behavior it's null if the left side is null, otherwise 
false (SPARK-44550).
+  false
 } else {
-  var hasNull = false
-  list.foreach { e =>
-val v = e.eval(input)
-if (v == null) {
-  hasNull = true
-} else if (ordering.equiv(v, evaluatedValue)) {
-  return true
-}
-  }
-  if (hasNull) {
+  val evaluatedValue = value.eval(input)
+  if (evaluatedValue == null) {
 null
   } else {
-false
+var hasNull = false
+list.foreach { e =>
+  val v = e.eval(input)
+  if (v == null) {
+hasNull = true
+  } else if (ordering.equiv(v, evaluatedValue)) {
+return true
+  }
+}
+if (hasNull) {
+  null
+} else {
+  false
+}
   }
 }
   }
 
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
-val javaDataType = CodeGenerator.javaType(value.dataType)
-val valueGen = value.genCode(ctx)
-val listGen = list.map(_.genCode(ctx))
-//