[spark] branch master updated (590b77f7628 -> b4b91212b1d)

2023-08-07 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 590b77f7628 [SPARK-44005][PYTHON] Improve error messages for regular 
Python UDTFs that return non-tuple values
 add b4b91212b1d [SPARK-44703][CORE] Log eventLog rewrite duration when 
compact old event log files

No new revisions were added by this update.

Summary of changes:
 .../scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala  | 3 +++
 1 file changed, 3 insertions(+)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.5 updated: [SPARK-44005][PYTHON] Improve error messages for regular Python UDTFs that return non-tuple values

2023-08-07 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new b7a9b85716c [SPARK-44005][PYTHON] Improve error messages for regular 
Python UDTFs that return non-tuple values
b7a9b85716c is described below

commit b7a9b85716c7fb0aedb78bb8a04b00ba6c132d28
Author: allisonwang-db 
AuthorDate: Tue Aug 8 11:09:58 2023 +0800

[SPARK-44005][PYTHON] Improve error messages for regular Python UDTFs that 
return non-tuple values

### What changes were proposed in this pull request?

This PR improves error messages for regular Python UDTFs when the result 
rows are not one of tuple, list and dict. Note this is supported when arrow 
optimization is enabled.

### Why are the changes needed?

To make Python UDTFs more user friendly.

### Does this PR introduce _any_ user-facing change?

Yes.
```
class TestUDTF:
def eval(self, a: int):
yield a
```
Before this PR, this will fail with this error
`Unexpected tuple 1 with StructType`

After this PR, this will have a more user-friendly error:
`[UDTF_INVALID_OUTPUT_ROW_TYPE] The type of an individual output row in the 
UDTF is invalid. Each row should be a tuple, list, or dict, but got 'int'. 
Please make sure that the output rows are of the correct type.`

### How was this patch tested?

Existing UTs.

Closes #42353 from allisonwang-db/spark-44005-non-tuple-return-val.

Authored-by: allisonwang-db 
Signed-off-by: Ruifeng Zheng 
(cherry picked from commit 590b77f76284ad03ad8b3b6d30b23983c66513fc)
Signed-off-by: Ruifeng Zheng 
---
 python/pyspark/errors/error_classes.py |  5 +
 python/pyspark/sql/tests/test_udtf.py  | 26 +++---
 python/pyspark/worker.py   | 12 +---
 3 files changed, 25 insertions(+), 18 deletions(-)

diff --git a/python/pyspark/errors/error_classes.py 
b/python/pyspark/errors/error_classes.py
index 937a8758404..279812ebae1 100644
--- a/python/pyspark/errors/error_classes.py
+++ b/python/pyspark/errors/error_classes.py
@@ -733,6 +733,11 @@ ERROR_CLASSES_JSON = """
   "User defined table function encountered an error in the '' 
method: "
 ]
   },
+  "UDTF_INVALID_OUTPUT_ROW_TYPE" : {
+"message" : [
+"The type of an individual output row in the UDTF is invalid. Each row 
should be a tuple, list, or dict, but got ''. Please make sure that the 
output rows are of the correct type."
+]
+  },
   "UDTF_RETURN_NOT_ITERABLE" : {
 "message" : [
   "The return value of the UDTF is invalid. It should be an iterable 
(e.g., generator or list), but got ''. Please make sure that the UDTF 
returns one of these types."
diff --git a/python/pyspark/sql/tests/test_udtf.py 
b/python/pyspark/sql/tests/test_udtf.py
index 0540ecddde7..d152228fc52 100644
--- a/python/pyspark/sql/tests/test_udtf.py
+++ b/python/pyspark/sql/tests/test_udtf.py
@@ -139,24 +139,21 @@ class BaseUDTFTestsMixin:
 self.assertEqual(rows, [Row(a=1, b=2), Row(a=2, b=3)])
 
 def test_udtf_eval_returning_non_tuple(self):
+@udtf(returnType="a: int")
 class TestUDTF:
 def eval(self, a: int):
 yield a
 
-func = udtf(TestUDTF, returnType="a: int")
-# TODO(SPARK-44005): improve this error message
-with self.assertRaisesRegex(PythonException, "Unexpected tuple 1 with 
StructType"):
-func(lit(1)).collect()
+with self.assertRaisesRegex(PythonException, 
"UDTF_INVALID_OUTPUT_ROW_TYPE"):
+TestUDTF(lit(1)).collect()
 
-def test_udtf_eval_returning_non_generator(self):
+@udtf(returnType="a: int")
 class TestUDTF:
 def eval(self, a: int):
 return (a,)
 
-func = udtf(TestUDTF, returnType="a: int")
-# TODO(SPARK-44005): improve this error message
-with self.assertRaisesRegex(PythonException, "Unexpected tuple 1 with 
StructType"):
-func(lit(1)).collect()
+with self.assertRaisesRegex(PythonException, 
"UDTF_INVALID_OUTPUT_ROW_TYPE"):
+TestUDTF(lit(1)).collect()
 
 def test_udtf_with_invalid_return_value(self):
 @udtf(returnType="x: int")
@@ -1159,21 +1156,20 @@ class UDTFArrowTestsMixin(BaseUDTFTestsMixin):
 self.spark.conf.set("spark.sql.execution.pythonUDTF.arrow.enabled", 
old_value)
 
 def test_udtf_eval_returning_non_tuple(self):
+@udtf(returnType="a: int")
 class TestUDTF:
 def eval(self, a: int):
 yield a
 
-func = udtf(TestUDTF, returnType="a: int")
 # When arrow is enabled, it can handle non-tuple return value.
-self.assertEqual(func(lit(1)).collect(), [Row(a=1)])
+assertDataFrameEqual(TestUDTF(lit(1))

[spark] branch master updated: [SPARK-44005][PYTHON] Improve error messages for regular Python UDTFs that return non-tuple values

2023-08-07 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 590b77f7628 [SPARK-44005][PYTHON] Improve error messages for regular 
Python UDTFs that return non-tuple values
590b77f7628 is described below

commit 590b77f76284ad03ad8b3b6d30b23983c66513fc
Author: allisonwang-db 
AuthorDate: Tue Aug 8 11:09:58 2023 +0800

[SPARK-44005][PYTHON] Improve error messages for regular Python UDTFs that 
return non-tuple values

### What changes were proposed in this pull request?

This PR improves error messages for regular Python UDTFs when the result 
rows are not one of tuple, list and dict. Note this is supported when arrow 
optimization is enabled.

### Why are the changes needed?

To make Python UDTFs more user friendly.

### Does this PR introduce _any_ user-facing change?

Yes.
```
class TestUDTF:
def eval(self, a: int):
yield a
```
Before this PR, this will fail with this error
`Unexpected tuple 1 with StructType`

After this PR, this will have a more user-friendly error:
`[UDTF_INVALID_OUTPUT_ROW_TYPE] The type of an individual output row in the 
UDTF is invalid. Each row should be a tuple, list, or dict, but got 'int'. 
Please make sure that the output rows are of the correct type.`

### How was this patch tested?

Existing UTs.

Closes #42353 from allisonwang-db/spark-44005-non-tuple-return-val.

Authored-by: allisonwang-db 
Signed-off-by: Ruifeng Zheng 
---
 python/pyspark/errors/error_classes.py |  5 +
 python/pyspark/sql/tests/test_udtf.py  | 26 +++---
 python/pyspark/worker.py   | 12 +---
 3 files changed, 25 insertions(+), 18 deletions(-)

diff --git a/python/pyspark/errors/error_classes.py 
b/python/pyspark/errors/error_classes.py
index 24885e94d32..bc32afeb87a 100644
--- a/python/pyspark/errors/error_classes.py
+++ b/python/pyspark/errors/error_classes.py
@@ -743,6 +743,11 @@ ERROR_CLASSES_JSON = """
   "User defined table function encountered an error in the '' 
method: "
 ]
   },
+  "UDTF_INVALID_OUTPUT_ROW_TYPE" : {
+"message" : [
+"The type of an individual output row in the UDTF is invalid. Each row 
should be a tuple, list, or dict, but got ''. Please make sure that the 
output rows are of the correct type."
+]
+  },
   "UDTF_RETURN_NOT_ITERABLE" : {
 "message" : [
   "The return value of the UDTF is invalid. It should be an iterable 
(e.g., generator or list), but got ''. Please make sure that the UDTF 
returns one of these types."
diff --git a/python/pyspark/sql/tests/test_udtf.py 
b/python/pyspark/sql/tests/test_udtf.py
index b2f473996bc..300067716e9 100644
--- a/python/pyspark/sql/tests/test_udtf.py
+++ b/python/pyspark/sql/tests/test_udtf.py
@@ -163,24 +163,21 @@ class BaseUDTFTestsMixin:
 self.assertEqual(rows, [Row(a=1, b=2), Row(a=2, b=3)])
 
 def test_udtf_eval_returning_non_tuple(self):
+@udtf(returnType="a: int")
 class TestUDTF:
 def eval(self, a: int):
 yield a
 
-func = udtf(TestUDTF, returnType="a: int")
-# TODO(SPARK-44005): improve this error message
-with self.assertRaisesRegex(PythonException, "Unexpected tuple 1 with 
StructType"):
-func(lit(1)).collect()
+with self.assertRaisesRegex(PythonException, 
"UDTF_INVALID_OUTPUT_ROW_TYPE"):
+TestUDTF(lit(1)).collect()
 
-def test_udtf_eval_returning_non_generator(self):
+@udtf(returnType="a: int")
 class TestUDTF:
 def eval(self, a: int):
 return (a,)
 
-func = udtf(TestUDTF, returnType="a: int")
-# TODO(SPARK-44005): improve this error message
-with self.assertRaisesRegex(PythonException, "Unexpected tuple 1 with 
StructType"):
-func(lit(1)).collect()
+with self.assertRaisesRegex(PythonException, 
"UDTF_INVALID_OUTPUT_ROW_TYPE"):
+TestUDTF(lit(1)).collect()
 
 def test_udtf_with_invalid_return_value(self):
 @udtf(returnType="x: int")
@@ -1852,21 +1849,20 @@ class UDTFArrowTestsMixin(BaseUDTFTestsMixin):
 self.spark.conf.set("spark.sql.execution.pythonUDTF.arrow.enabled", 
old_value)
 
 def test_udtf_eval_returning_non_tuple(self):
+@udtf(returnType="a: int")
 class TestUDTF:
 def eval(self, a: int):
 yield a
 
-func = udtf(TestUDTF, returnType="a: int")
 # When arrow is enabled, it can handle non-tuple return value.
-self.assertEqual(func(lit(1)).collect(), [Row(a=1)])
+assertDataFrameEqual(TestUDTF(lit(1)), [Row(a=1)])
 
-def test_udtf_eval_returning_non_generator(self):
+@udtf(returnType="a: int")

[spark] branch branch-3.5 updated: [SPARK-44689][CONNECT] Make the exception handling of function `SparkConnectPlanner#unpackScalarScalaUDF` more universal

2023-08-07 Thread yangjie01
This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new cd4ae6e7452 [SPARK-44689][CONNECT] Make the exception handling of 
function `SparkConnectPlanner#unpackScalarScalaUDF` more universal
cd4ae6e7452 is described below

commit cd4ae6e7452170422bdf14ab5e1957e61503904f
Author: yangjie01 
AuthorDate: Tue Aug 8 11:06:21 2023 +0800

[SPARK-44689][CONNECT] Make the exception handling of function 
`SparkConnectPlanner#unpackScalarScalaUDF` more universal

### What changes were proposed in this pull request?
This PR changes the exception handling in the `unpackScalarScalaUD` 
function in `SparkConnectPlanner` from determining the exception type based on 
a fixed nesting level to using Guava `Throwables` to get the root cause and 
then determining the type of the root cause. This makes it compatible with 
differences between different Java versions.

### Why are the changes needed?
The following failure occurred when testing `UDFClassLoadingE2ESuite` in 
Java 17 daily test:

https://github.com/apache/spark/actions/runs/5766913899/job/15635782831

```
[info] UDFClassLoadingE2ESuite:
[info] - update class loader after stubbing: new session *** FAILED *** 
(101 milliseconds)
[info]   "Exception in SerializedLambda.readResolve" did not contain 
"java.lang.NoSuchMethodException: 
org.apache.spark.sql.connect.client.StubClassDummyUdf" 
(UDFClassLoadingE2ESuite.scala:57)
...
[info] - update class loader after stubbing: same session *** FAILED *** 
(52 milliseconds)
[info]   "Exception in SerializedLambda.readResolve" did not contain 
"java.lang.NoSuchMethodException: 
org.apache.spark.sql.connect.client.StubClassDummyUdf" 
(UDFClassLoadingE2ESuite.scala:73)
...
```

After analysis, it was found that there are differences in the exception 
stack generated on the server side between Java 8 and Java 17:

- Java 8

```
java.io.IOException: unexpected exception type
  at 
java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1750)
  at 
java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1280)
  at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
  at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
  at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
  at 
org.apache.spark.util.SparkSerDeUtils.deserialize(SparkSerDeUtils.scala:50)
  at 
org.apache.spark.util.SparkSerDeUtils.deserialize$(SparkSerDeUtils.scala:41)
  at org.apache.spark.util.Utils$.deserialize(Utils.scala:95)
  at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.unpackScalarScalaUDF(SparkConnectPlanner.scala:1516)
  at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.org$apache$spark$sql$connect$planner$SparkConnectPlanner$$unpackUdf(SparkConnectPlanner.scala:1507)
  at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformScalarScalaFunction(SparkConnectPlanner.scala:1544)
  at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleRegisterScalarScalaUDF(SparkConnectPlanner.scala:2565)
  at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleRegisterUserDefinedFunction(SparkConnectPlanner.scala:2492)
  at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2363)
  at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:202)
  at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:158)
  at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:132)
  at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:184)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
  at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:184)
  at 
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
  at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withContextClassLoader$1(SessionHolder.scala:171)
  at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:179)
  at 
org.apache.spark

[spark] branch master updated: [SPARK-44689][CONNECT] Make the exception handling of function `SparkConnectPlanner#unpackScalarScalaUDF` more universal

2023-08-07 Thread yangjie01
This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 25053d98186 [SPARK-44689][CONNECT] Make the exception handling of 
function `SparkConnectPlanner#unpackScalarScalaUDF` more universal
25053d98186 is described below

commit 25053d98186489d9f2061c9b815a5a33f7e309c4
Author: yangjie01 
AuthorDate: Tue Aug 8 11:06:21 2023 +0800

[SPARK-44689][CONNECT] Make the exception handling of function 
`SparkConnectPlanner#unpackScalarScalaUDF` more universal

### What changes were proposed in this pull request?
This PR changes the exception handling in the `unpackScalarScalaUD` 
function in `SparkConnectPlanner` from determining the exception type based on 
a fixed nesting level to using Guava `Throwables` to get the root cause and 
then determining the type of the root cause. This makes it compatible with 
differences between different Java versions.

### Why are the changes needed?
The following failure occurred when testing `UDFClassLoadingE2ESuite` in 
Java 17 daily test:

https://github.com/apache/spark/actions/runs/5766913899/job/15635782831

```
[info] UDFClassLoadingE2ESuite:
[info] - update class loader after stubbing: new session *** FAILED *** 
(101 milliseconds)
[info]   "Exception in SerializedLambda.readResolve" did not contain 
"java.lang.NoSuchMethodException: 
org.apache.spark.sql.connect.client.StubClassDummyUdf" 
(UDFClassLoadingE2ESuite.scala:57)
...
[info] - update class loader after stubbing: same session *** FAILED *** 
(52 milliseconds)
[info]   "Exception in SerializedLambda.readResolve" did not contain 
"java.lang.NoSuchMethodException: 
org.apache.spark.sql.connect.client.StubClassDummyUdf" 
(UDFClassLoadingE2ESuite.scala:73)
...
```

After analysis, it was found that there are differences in the exception 
stack generated on the server side between Java 8 and Java 17:

- Java 8

```
java.io.IOException: unexpected exception type
  at 
java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1750)
  at 
java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1280)
  at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
  at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
  at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
  at 
org.apache.spark.util.SparkSerDeUtils.deserialize(SparkSerDeUtils.scala:50)
  at 
org.apache.spark.util.SparkSerDeUtils.deserialize$(SparkSerDeUtils.scala:41)
  at org.apache.spark.util.Utils$.deserialize(Utils.scala:95)
  at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.unpackScalarScalaUDF(SparkConnectPlanner.scala:1516)
  at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.org$apache$spark$sql$connect$planner$SparkConnectPlanner$$unpackUdf(SparkConnectPlanner.scala:1507)
  at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformScalarScalaFunction(SparkConnectPlanner.scala:1544)
  at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleRegisterScalarScalaUDF(SparkConnectPlanner.scala:2565)
  at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleRegisterUserDefinedFunction(SparkConnectPlanner.scala:2492)
  at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2363)
  at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:202)
  at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:158)
  at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:132)
  at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:184)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
  at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:184)
  at 
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
  at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withContextClassLoader$1(SessionHolder.scala:171)
  at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:179)
  at 
org.apache.spark.sql.con

[spark] branch master updated (aa1261dc129 -> 6dadd188f36)

2023-08-07 Thread yangjie01
This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from aa1261dc129 [SPARK-44641][SQL] Incorrect result in certain scenarios 
when SPJ is not triggered
 add 6dadd188f36 [SPARK-44554][INFRA] Make Python linter related checks 
pass of branch-3.3/3.4 daily testing

No new revisions were added by this update.

Summary of changes:
 .github/workflows/build_and_test.yml | 17 +
 1 file changed, 17 insertions(+)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.4 updated: [SPARK-44641][SQL] Incorrect result in certain scenarios when SPJ is not triggered

2023-08-07 Thread sunchao
This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new 5c7d1e2f8a2 [SPARK-44641][SQL] Incorrect result in certain scenarios 
when SPJ is not triggered
5c7d1e2f8a2 is described below

commit 5c7d1e2f8a2110e33c1d1c8f458d14a02c2056b7
Author: Chao Sun 
AuthorDate: Mon Aug 7 19:16:38 2023 -0700

[SPARK-44641][SQL] Incorrect result in certain scenarios when SPJ is not 
triggered

This PR makes sure we use unique partition values when calculating the 
final partitions in `BatchScanExec`, to make sure no duplicated partitions are 
generated.

When `spark.sql.sources.v2.bucketing.pushPartValues.enabled` and 
`spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled` are 
enabled, and SPJ is not triggered, currently Spark will generate 
incorrect/duplicated results.

This is because with both configs enabled, Spark will delay the partition 
grouping until the time it calculates the final partitions used by the input 
RDD. To calculate the partitions, it uses partition values from the 
`KeyGroupedPartitioning` to find out the right ordering for the partitions. 
However, since grouping is not done when the partition values is computed, 
there could be duplicated partition values. This means the result could contain 
duplicated partitions too.

No, this is a bug fix.

Added a new test case for this scenario.

Closes #42324 from sunchao/SPARK-44641.

Authored-by: Chao Sun 
Signed-off-by: Chao Sun 
(cherry picked from commit aa1261dc129618d27a1bdc743a5fdd54219f7c01)
Signed-off-by: Chao Sun 
---
 .../sql/catalyst/plans/physical/partitioning.scala |  9 +++-
 .../execution/datasources/v2/BatchScanExec.scala   |  9 +++-
 .../connector/KeyGroupedPartitioningSuite.scala| 56 ++
 3 files changed, 72 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index 6512344169b..d2f9e9b5d5b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -313,7 +313,7 @@ case class HashPartitioning(expressions: Seq[Expression], 
numPartitions: Int)
  * by `expressions`. `partitionValuesOpt`, if defined, should contain value of 
partition key(s) in
  * ascending order, after evaluated by the transforms in `expressions`, for 
each input partition.
  * In addition, its length must be the same as the number of input partitions 
(and thus is a 1-1
- * mapping), and each row in `partitionValuesOpt` must be unique.
+ * mapping). The `partitionValues` may contain duplicated partition values.
  *
  * For example, if `expressions` is `[years(ts_col)]`, then a valid value of 
`partitionValuesOpt` is
  * `[0, 1, 2]`, which represents 3 input partitions with distinct partition 
values. All rows
@@ -355,6 +355,13 @@ case class KeyGroupedPartitioning(
 
   override def createShuffleSpec(distribution: ClusteredDistribution): 
ShuffleSpec =
 KeyGroupedShuffleSpec(this, distribution)
+
+  lazy val uniquePartitionValues: Seq[InternalRow] = {
+partitionValues
+.map(InternalRowComparableWrapper(_, expressions))
+.distinct
+.map(_.row)
+  }
 }
 
 object KeyGroupedPartitioning {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
index d43331d57c4..02821d10d50 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
@@ -191,10 +191,17 @@ case class BatchScanExec(
   Seq.fill(numSplits)(Seq.empty))
   }
 } else {
+  // either `commonPartitionValues` is not defined, or it is 
defined but
+  // `applyPartialClustering` is false.
   val partitionMapping = groupedPartitions.map { case (row, parts) 
=>
 InternalRowComparableWrapper(row, p.expressions) -> parts
   }.toMap
-  finalPartitions = p.partitionValues.map { partValue =>
+
+  // In case `commonPartitionValues` is not defined (e.g., SPJ is 
not used), there
+  // could exist duplicated partition values, as partition 
grouping is not done
+  // at the beginning and postponed to this method. It is 
important to use unique
+  // partition values here so that grouped partitions won't get 
duplicated.
+

[spark] branch branch-3.5 updated: [SPARK-44641][SQL] Incorrect result in certain scenarios when SPJ is not triggered

2023-08-07 Thread sunchao
This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 06871f2ae4c [SPARK-44641][SQL] Incorrect result in certain scenarios 
when SPJ is not triggered
06871f2ae4c is described below

commit 06871f2ae4c5f2b5a2208a6ead83e7802d2e0c16
Author: Chao Sun 
AuthorDate: Mon Aug 7 19:16:38 2023 -0700

[SPARK-44641][SQL] Incorrect result in certain scenarios when SPJ is not 
triggered

This PR makes sure we use unique partition values when calculating the 
final partitions in `BatchScanExec`, to make sure no duplicated partitions are 
generated.

When `spark.sql.sources.v2.bucketing.pushPartValues.enabled` and 
`spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled` are 
enabled, and SPJ is not triggered, currently Spark will generate 
incorrect/duplicated results.

This is because with both configs enabled, Spark will delay the partition 
grouping until the time it calculates the final partitions used by the input 
RDD. To calculate the partitions, it uses partition values from the 
`KeyGroupedPartitioning` to find out the right ordering for the partitions. 
However, since grouping is not done when the partition values is computed, 
there could be duplicated partition values. This means the result could contain 
duplicated partitions too.

No, this is a bug fix.

Added a new test case for this scenario.

Closes #42324 from sunchao/SPARK-44641.

Authored-by: Chao Sun 
Signed-off-by: Chao Sun 
(cherry picked from commit aa1261dc129618d27a1bdc743a5fdd54219f7c01)
Signed-off-by: Chao Sun 
---
 .../sql/catalyst/plans/physical/partitioning.scala |  9 +++-
 .../execution/datasources/v2/BatchScanExec.scala   |  9 +++-
 .../connector/KeyGroupedPartitioningSuite.scala| 56 ++
 3 files changed, 72 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index 6512344169b..d2f9e9b5d5b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -313,7 +313,7 @@ case class HashPartitioning(expressions: Seq[Expression], 
numPartitions: Int)
  * by `expressions`. `partitionValuesOpt`, if defined, should contain value of 
partition key(s) in
  * ascending order, after evaluated by the transforms in `expressions`, for 
each input partition.
  * In addition, its length must be the same as the number of input partitions 
(and thus is a 1-1
- * mapping), and each row in `partitionValuesOpt` must be unique.
+ * mapping). The `partitionValues` may contain duplicated partition values.
  *
  * For example, if `expressions` is `[years(ts_col)]`, then a valid value of 
`partitionValuesOpt` is
  * `[0, 1, 2]`, which represents 3 input partitions with distinct partition 
values. All rows
@@ -355,6 +355,13 @@ case class KeyGroupedPartitioning(
 
   override def createShuffleSpec(distribution: ClusteredDistribution): 
ShuffleSpec =
 KeyGroupedShuffleSpec(this, distribution)
+
+  lazy val uniquePartitionValues: Seq[InternalRow] = {
+partitionValues
+.map(InternalRowComparableWrapper(_, expressions))
+.distinct
+.map(_.row)
+  }
 }
 
 object KeyGroupedPartitioning {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
index 4b538197392..eba3c71f871 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
@@ -190,10 +190,17 @@ case class BatchScanExec(
 Seq.fill(numSplits)(Seq.empty))
   }
 } else {
+  // either `commonPartitionValues` is not defined, or it is 
defined but
+  // `applyPartialClustering` is false.
   val partitionMapping = groupedPartitions.map { case (row, parts) 
=>
 InternalRowComparableWrapper(row, p.expressions) -> parts
   }.toMap
-  finalPartitions = p.partitionValues.map { partValue =>
+
+  // In case `commonPartitionValues` is not defined (e.g., SPJ is 
not used), there
+  // could exist duplicated partition values, as partition 
grouping is not done
+  // at the beginning and postponed to this method. It is 
important to use unique
+  // partition values here so that grouped partitions won't get 
duplicated.
+  

[spark] branch master updated: [SPARK-44641][SQL] Incorrect result in certain scenarios when SPJ is not triggered

2023-08-07 Thread sunchao
This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new aa1261dc129 [SPARK-44641][SQL] Incorrect result in certain scenarios 
when SPJ is not triggered
aa1261dc129 is described below

commit aa1261dc129618d27a1bdc743a5fdd54219f7c01
Author: Chao Sun 
AuthorDate: Mon Aug 7 19:16:38 2023 -0700

[SPARK-44641][SQL] Incorrect result in certain scenarios when SPJ is not 
triggered

### What changes were proposed in this pull request?

This PR makes sure we use unique partition values when calculating the 
final partitions in `BatchScanExec`, to make sure no duplicated partitions are 
generated.

### Why are the changes needed?

When `spark.sql.sources.v2.bucketing.pushPartValues.enabled` and 
`spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled` are 
enabled, and SPJ is not triggered, currently Spark will generate 
incorrect/duplicated results.

This is because with both configs enabled, Spark will delay the partition 
grouping until the time it calculates the final partitions used by the input 
RDD. To calculate the partitions, it uses partition values from the 
`KeyGroupedPartitioning` to find out the right ordering for the partitions. 
However, since grouping is not done when the partition values is computed, 
there could be duplicated partition values. This means the result could contain 
duplicated partitions too.

### Does this PR introduce _any_ user-facing change?

No, this is a bug fix.

### How was this patch tested?

Added a new test case for this scenario.

Closes #42324 from sunchao/SPARK-44641.

Authored-by: Chao Sun 
Signed-off-by: Chao Sun 
---
 .../sql/catalyst/plans/physical/partitioning.scala |  9 +++-
 .../execution/datasources/v2/BatchScanExec.scala   |  9 +++-
 .../connector/KeyGroupedPartitioningSuite.scala| 56 ++
 3 files changed, 72 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index bd8ba54ddd7..456005768bd 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -313,7 +313,7 @@ case class HashPartitioning(expressions: Seq[Expression], 
numPartitions: Int)
  * by `expressions`. `partitionValues`, if defined, should contain value of 
partition key(s) in
  * ascending order, after evaluated by the transforms in `expressions`, for 
each input partition.
  * In addition, its length must be the same as the number of input partitions 
(and thus is a 1-1
- * mapping), and each row in `partitionValues` must be unique.
+ * mapping). The `partitionValues` may contain duplicated partition values.
  *
  * For example, if `expressions` is `[years(ts_col)]`, then a valid value of 
`partitionValues` is
  * `[0, 1, 2]`, which represents 3 input partitions with distinct partition 
values. All rows
@@ -355,6 +355,13 @@ case class KeyGroupedPartitioning(
 
   override def createShuffleSpec(distribution: ClusteredDistribution): 
ShuffleSpec =
 KeyGroupedShuffleSpec(this, distribution)
+
+  lazy val uniquePartitionValues: Seq[InternalRow] = {
+partitionValues
+.map(InternalRowComparableWrapper(_, expressions))
+.distinct
+.map(_.row)
+  }
 }
 
 object KeyGroupedPartitioning {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
index 4b538197392..eba3c71f871 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
@@ -190,10 +190,17 @@ case class BatchScanExec(
 Seq.fill(numSplits)(Seq.empty))
   }
 } else {
+  // either `commonPartitionValues` is not defined, or it is 
defined but
+  // `applyPartialClustering` is false.
   val partitionMapping = groupedPartitions.map { case (row, parts) 
=>
 InternalRowComparableWrapper(row, p.expressions) -> parts
   }.toMap
-  finalPartitions = p.partitionValues.map { partValue =>
+
+  // In case `commonPartitionValues` is not defined (e.g., SPJ is 
not used), there
+  // could exist duplicated partition values, as partition 
grouping is not done
+  // at the beginning and postponed to this method. It is 
important to use unique
+  

[spark] branch master updated: [SPARK-43429][CONNECT] Add Default & Active SparkSession for Scala Client

2023-08-07 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 7493c5764f9 [SPARK-43429][CONNECT] Add Default & Active SparkSession 
for Scala Client
7493c5764f9 is described below

commit 7493c5764f9644878babacccd4f688fe13ef84aa
Author: Herman van Hovell 
AuthorDate: Tue Aug 8 04:15:07 2023 +0200

[SPARK-43429][CONNECT] Add Default & Active SparkSession for Scala Client

### What changes were proposed in this pull request?
This adds the `default` and `active` session variables to `SparkSession`:
- `default` session is global value. It is typically the first session 
created through `getOrCreate`. It can be changed through `set` or `clear`. If 
the session is closed and it is the `default` session we clear the `default` 
session.
- `active` session is a thread local value. It is typically the first 
session created in this thread or it inherits is value from its parent thread. 
It can be changed through `set` or `clear`, please note that these methods 
operate thread locally, so they won't change the parent or children. If the 
session is closed and it is the `active` session for the current thread then we 
clear the active value (only for the current thread!).

### Why are the changes needed?
To increase compatibility with the existing SparkSession API in `sql/core`.

### Does this PR introduce _any_ user-facing change?
Yes. It adds a couple methods that were missing from the Scala Client.

### How was this patch tested?
Added tests to `SparkSessionSuite`.

Closes #42367 from hvanhovell/SPARK-43429.

Authored-by: Herman van Hovell 
Signed-off-by: Herman van Hovell 
---
 .../scala/org/apache/spark/sql/SparkSession.scala  | 100 --
 .../org/apache/spark/sql/SparkSessionSuite.scala   | 144 +++--
 .../CheckConnectJvmClientCompatibility.scala   |   2 -
 3 files changed, 225 insertions(+), 21 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 355d7edadc7..7367ed153f7 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql
 import java.io.Closeable
 import java.net.URI
 import java.util.concurrent.TimeUnit._
-import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
 
 import scala.collection.JavaConverters._
 import scala.reflect.runtime.universe.TypeTag
@@ -730,6 +730,23 @@ object SparkSession extends Logging {
   override def load(c: Configuration): SparkSession = create(c)
 })
 
+  /** The active SparkSession for the current thread. */
+  private val activeThreadSession = new InheritableThreadLocal[SparkSession]
+
+  /** Reference to the root SparkSession. */
+  private val defaultSession = new AtomicReference[SparkSession]
+
+  /**
+   * Set the (global) default [[SparkSession]], and (thread-local) active 
[[SparkSession]] when
+   * they are not set yet.
+   */
+  private def setDefaultAndActiveSession(session: SparkSession): Unit = {
+defaultSession.compareAndSet(null, session)
+if (getActiveSession.isEmpty) {
+  setActiveSession(session)
+}
+  }
+
   /**
* Create a new [[SparkSession]] based on the connect client 
[[Configuration]].
*/
@@ -742,8 +759,17 @@ object SparkSession extends Logging {
*/
   private[sql] def onSessionClose(session: SparkSession): Unit = {
 sessions.invalidate(session.client.configuration)
+defaultSession.compareAndSet(session, null)
+if (getActiveSession.contains(session)) {
+  clearActiveSession()
+}
   }
 
+  /**
+   * Creates a [[SparkSession.Builder]] for constructing a [[SparkSession]].
+   *
+   * @since 3.4.0
+   */
   def builder(): Builder = new Builder()
 
   private[sql] lazy val cleaner = {
@@ -799,10 +825,15 @@ object SparkSession extends Logging {
  *
  * This will always return a newly created session.
  *
+ * This method will update the default and/or active session if they are 
not set.
+ *
  * @since 3.5.0
  */
 def create(): SparkSession = {
-  
tryCreateSessionFromClient().getOrElse(SparkSession.this.create(builder.configuration))
+  val session = tryCreateSessionFromClient()
+.getOrElse(SparkSession.this.create(builder.configuration))
+  setDefaultAndActiveSession(session)
+  session
 }
 
 /**
@@ -811,30 +842,79 @@ object SparkSession extends Logging {
  * If a session exist with the same configuration that is returned instead 
of creating a new
  * sessio

[spark] branch branch-3.5 updated: [SPARK-44683][SS] Logging level isn't passed to RocksDB state store provider correctly

2023-08-07 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 19c8001dacb [SPARK-44683][SS] Logging level isn't passed to RocksDB 
state store provider correctly
19c8001dacb is described below

commit 19c8001dacb2933ef0a052babdd976fa886b8b4d
Author: Siying Dong 
AuthorDate: Tue Aug 8 11:11:56 2023 +0900

[SPARK-44683][SS] Logging level isn't passed to RocksDB state store 
provider correctly

### What changes were proposed in this pull request?
The logging level is passed into RocksDB in a correct way.

### Why are the changes needed?
We pass log4j's log level to RocksDB so that RocksDB debug log can go to 
log4j. However, we pass in log level after we create the logger. However, the 
way it is set isn't effective. This has two impacts: (1) setting DEBUG level 
don't make RocksDB generate DEBUG level logs; (2) setting WARN or ERROR level 
does prevent INFO level logging, but RocksDB still makes JNI calls to Scala, 
which is an unnecessary overhead.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Manually change the log level and observe the log lines in unit tests.

Closes #42354 from siying/rocks_log_level.

Authored-by: Siying Dong 
Signed-off-by: Jungtaek Lim 
(cherry picked from commit 630b1777904f15c7ac05c3cd61c0006cd692bc93)
Signed-off-by: Jungtaek Lim 
---
 .../org/apache/spark/sql/execution/streaming/state/RocksDB.scala | 5 -
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 386df61a9e0..2398b778072 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -613,8 +613,11 @@ class RocksDB(
 if (log.isWarnEnabled) dbLogLevel = InfoLogLevel.WARN_LEVEL
 if (log.isInfoEnabled) dbLogLevel = InfoLogLevel.INFO_LEVEL
 if (log.isDebugEnabled) dbLogLevel = InfoLogLevel.DEBUG_LEVEL
-dbOptions.setLogger(dbLogger)
+dbLogger.setInfoLogLevel(dbLogLevel)
+// The log level set in dbLogger is effective and the one to dbOptions 
isn't applied to
+// customized logger. We still set it as it might show up in RocksDB 
config file or logging.
 dbOptions.setInfoLogLevel(dbLogLevel)
+dbOptions.setLogger(dbLogger)
 logInfo(s"Set RocksDB native logging level to $dbLogLevel")
 dbLogger
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-44683][SS] Logging level isn't passed to RocksDB state store provider correctly

2023-08-07 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 630b1777904 [SPARK-44683][SS] Logging level isn't passed to RocksDB 
state store provider correctly
630b1777904 is described below

commit 630b1777904f15c7ac05c3cd61c0006cd692bc93
Author: Siying Dong 
AuthorDate: Tue Aug 8 11:11:56 2023 +0900

[SPARK-44683][SS] Logging level isn't passed to RocksDB state store 
provider correctly

### What changes were proposed in this pull request?
The logging level is passed into RocksDB in a correct way.

### Why are the changes needed?
We pass log4j's log level to RocksDB so that RocksDB debug log can go to 
log4j. However, we pass in log level after we create the logger. However, the 
way it is set isn't effective. This has two impacts: (1) setting DEBUG level 
don't make RocksDB generate DEBUG level logs; (2) setting WARN or ERROR level 
does prevent INFO level logging, but RocksDB still makes JNI calls to Scala, 
which is an unnecessary overhead.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Manually change the log level and observe the log lines in unit tests.

Closes #42354 from siying/rocks_log_level.

Authored-by: Siying Dong 
Signed-off-by: Jungtaek Lim 
---
 .../org/apache/spark/sql/execution/streaming/state/RocksDB.scala | 5 -
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index d4366fe732b..a2868df9411 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -611,8 +611,11 @@ class RocksDB(
 if (log.isWarnEnabled) dbLogLevel = InfoLogLevel.WARN_LEVEL
 if (log.isInfoEnabled) dbLogLevel = InfoLogLevel.INFO_LEVEL
 if (log.isDebugEnabled) dbLogLevel = InfoLogLevel.DEBUG_LEVEL
-dbOptions.setLogger(dbLogger)
+dbLogger.setInfoLogLevel(dbLogLevel)
+// The log level set in dbLogger is effective and the one to dbOptions 
isn't applied to
+// customized logger. We still set it as it might show up in RocksDB 
config file or logging.
 dbOptions.setInfoLogLevel(dbLogLevel)
+dbOptions.setLogger(dbLogger)
 logInfo(s"Set RocksDB native logging level to $dbLogLevel")
 dbLogger
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.5 updated: [SPARK-44694][PYTHON][CONNECT] Refactor active sessions and expose them as an API

2023-08-07 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 21e7fe81a66 [SPARK-44694][PYTHON][CONNECT] Refactor active sessions 
and expose them as an API
21e7fe81a66 is described below

commit 21e7fe81a662eacebcbc1971a3586a6d470376f3
Author: Hyukjin Kwon 
AuthorDate: Tue Aug 8 11:03:05 2023 +0900

[SPARK-44694][PYTHON][CONNECT] Refactor active sessions and expose them as 
an API

This PR proposes to (mostly) refactor all the internal workarounds to get 
the active session correctly.

There are few things to note:

- _PySpark without Spark Connect does not already support the hierarchy of 
active sessions_. With pinned thread mode (enabled by default), PySpark does 
map each Python thread to JVM thread, but the thread creation happens within 
gateway server, that does not respect the thread hierarchy. Therefore, this PR 
follows the exactly same behaviour.
  - New thread will not have an active thread by default.
  - Other behaviours are same as PySpark without Connect, see also 
https://github.com/apache/spark/pull/42367
- Since I am here, I piggiyback few documentation changes. We missed 
document `SparkSession.readStream`, `SparkSession.streams`, 
`SparkSession.udtf`, `SparkSession.conf` and `SparkSession.version` in Spark 
Connect.
- The changes here are mostly refactoring that reuses existing unittests 
while I expose two methods:
  - `SparkSession.getActiveSession` (only for Spark Connect)
  - `SparkSession.active` (for both in PySpark)

For Spark Connect users to be able to play with active and default sessions 
in Python.

Yes, it adds new API:
  - `SparkSession.getActiveSession` (only for Spark Connect)
  - `SparkSession.active` (for both in PySpark)

Existing unittests should cover all.

Closes #42371 from HyukjinKwon/SPARK-44694.

Authored-by: Hyukjin Kwon 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit 9368a0f0c1001fb6fd64799a2e744874b6cd27e4)
Signed-off-by: Hyukjin Kwon 
---
 .../source/reference/pyspark.sql/spark_session.rst |   1 +
 python/pyspark/errors/error_classes.py |   5 +
 python/pyspark/ml/connect/io_utils.py  |   8 +-
 python/pyspark/ml/connect/tuning.py|  11 ++-
 python/pyspark/ml/torch/distributor.py |   3 +-
 python/pyspark/ml/util.py  |  13 ---
 python/pyspark/pandas/utils.py |   7 +-
 python/pyspark/sql/connect/session.py  | 107 ++---
 python/pyspark/sql/connect/udf.py  |  25 +++--
 python/pyspark/sql/connect/udtf.py |  27 +++---
 python/pyspark/sql/session.py  |  65 +++--
 .../sql/tests/connect/test_connect_basic.py|   4 +-
 python/pyspark/sql/utils.py|  18 
 13 files changed, 197 insertions(+), 97 deletions(-)

diff --git a/python/docs/source/reference/pyspark.sql/spark_session.rst 
b/python/docs/source/reference/pyspark.sql/spark_session.rst
index a5f8bc47d44..74315a0aacc 100644
--- a/python/docs/source/reference/pyspark.sql/spark_session.rst
+++ b/python/docs/source/reference/pyspark.sql/spark_session.rst
@@ -28,6 +28,7 @@ See also :class:`SparkSession`.
 .. autosummary::
 :toctree: api/
 
+SparkSession.active
 SparkSession.builder.appName
 SparkSession.builder.config
 SparkSession.builder.enableHiveSupport
diff --git a/python/pyspark/errors/error_classes.py 
b/python/pyspark/errors/error_classes.py
index 971dc59bbb2..937a8758404 100644
--- a/python/pyspark/errors/error_classes.py
+++ b/python/pyspark/errors/error_classes.py
@@ -607,6 +607,11 @@ ERROR_CLASSES_JSON = """
   "Argument `` should be a WindowSpec, got ."
 ]
   },
+  "NO_ACTIVE_OR_DEFAULT_SESSION" : {
+"message" : [
+  "No active or default Spark session found. Please create a new Spark 
session before running the code."
+]
+  },
   "NO_ACTIVE_SESSION" : {
 "message" : [
   "No active Spark session found. Please create a new Spark session before 
running the code."
diff --git a/python/pyspark/ml/connect/io_utils.py 
b/python/pyspark/ml/connect/io_utils.py
index 9a963086aaf..a09a244862c 100644
--- a/python/pyspark/ml/connect/io_utils.py
+++ b/python/pyspark/ml/connect/io_utils.py
@@ -23,7 +23,7 @@ import time
 from urllib.parse import urlparse
 from typing import Any, Dict, List
 from pyspark.ml.base import Params
-from pyspark.ml.util import _get_active_session
+from pyspark.sql import SparkSession
 from pyspark.sql.utils import is_remote
 
 
@@ -34,7 +34,7 @@ _META_DATA_FILE_NAME = "metadata.json"
 
 
 def _copy_file_from_local_to_fs(local_path: str, dest_path: str) -> None:
-session = _get_active_session(is_remote())
+sessio

[spark] branch master updated: [SPARK-44694][PYTHON][CONNECT] Refactor active sessions and expose them as an API

2023-08-07 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 9368a0f0c10 [SPARK-44694][PYTHON][CONNECT] Refactor active sessions 
and expose them as an API
9368a0f0c10 is described below

commit 9368a0f0c1001fb6fd64799a2e744874b6cd27e4
Author: Hyukjin Kwon 
AuthorDate: Tue Aug 8 11:03:05 2023 +0900

[SPARK-44694][PYTHON][CONNECT] Refactor active sessions and expose them as 
an API

### What changes were proposed in this pull request?

This PR proposes to (mostly) refactor all the internal workarounds to get 
the active session correctly.

There are few things to note:

- _PySpark without Spark Connect does not already support the hierarchy of 
active sessions_. With pinned thread mode (enabled by default), PySpark does 
map each Python thread to JVM thread, but the thread creation happens within 
gateway server, that does not respect the thread hierarchy. Therefore, this PR 
follows the exactly same behaviour.
  - New thread will not have an active thread by default.
  - Other behaviours are same as PySpark without Connect, see also 
https://github.com/apache/spark/pull/42367
- Since I am here, I piggiyback few documentation changes. We missed 
document `SparkSession.readStream`, `SparkSession.streams`, 
`SparkSession.udtf`, `SparkSession.conf` and `SparkSession.version` in Spark 
Connect.
- The changes here are mostly refactoring that reuses existing unittests 
while I expose two methods:
  - `SparkSession.getActiveSession` (only for Spark Connect)
  - `SparkSession.active` (for both in PySpark)

### Why are the changes needed?

For Spark Connect users to be able to play with active and default sessions 
in Python.

### Does this PR introduce _any_ user-facing change?

Yes, it adds new API:
  - `SparkSession.getActiveSession` (only for Spark Connect)
  - `SparkSession.active` (for both in PySpark)

### How was this patch tested?

Existing unittests should cover all.

Closes #42371 from HyukjinKwon/SPARK-44694.

Authored-by: Hyukjin Kwon 
Signed-off-by: Hyukjin Kwon 
---
 .../source/reference/pyspark.sql/spark_session.rst |   1 +
 python/pyspark/errors/error_classes.py |   5 +
 python/pyspark/ml/connect/io_utils.py  |   8 +-
 python/pyspark/ml/connect/tuning.py|  11 ++-
 python/pyspark/ml/torch/distributor.py |   3 +-
 python/pyspark/ml/util.py  |  13 ---
 python/pyspark/pandas/utils.py |   7 +-
 python/pyspark/sql/connect/session.py  | 107 ++---
 python/pyspark/sql/connect/udf.py  |  25 +++--
 python/pyspark/sql/connect/udtf.py |  27 +++---
 python/pyspark/sql/session.py  |  65 +++--
 .../sql/tests/connect/test_connect_basic.py|   4 +-
 python/pyspark/sql/utils.py|  18 
 13 files changed, 197 insertions(+), 97 deletions(-)

diff --git a/python/docs/source/reference/pyspark.sql/spark_session.rst 
b/python/docs/source/reference/pyspark.sql/spark_session.rst
index c16ca4f162f..f25dbab5f6b 100644
--- a/python/docs/source/reference/pyspark.sql/spark_session.rst
+++ b/python/docs/source/reference/pyspark.sql/spark_session.rst
@@ -28,6 +28,7 @@ See also :class:`SparkSession`.
 .. autosummary::
 :toctree: api/
 
+SparkSession.active
 SparkSession.builder.appName
 SparkSession.builder.config
 SparkSession.builder.enableHiveSupport
diff --git a/python/pyspark/errors/error_classes.py 
b/python/pyspark/errors/error_classes.py
index a534bc6deb4..24885e94d32 100644
--- a/python/pyspark/errors/error_classes.py
+++ b/python/pyspark/errors/error_classes.py
@@ -617,6 +617,11 @@ ERROR_CLASSES_JSON = """
   "Argument `` should be a WindowSpec, got ."
 ]
   },
+  "NO_ACTIVE_OR_DEFAULT_SESSION" : {
+"message" : [
+  "No active or default Spark session found. Please create a new Spark 
session before running the code."
+]
+  },
   "NO_ACTIVE_SESSION" : {
 "message" : [
   "No active Spark session found. Please create a new Spark session before 
running the code."
diff --git a/python/pyspark/ml/connect/io_utils.py 
b/python/pyspark/ml/connect/io_utils.py
index 9a963086aaf..a09a244862c 100644
--- a/python/pyspark/ml/connect/io_utils.py
+++ b/python/pyspark/ml/connect/io_utils.py
@@ -23,7 +23,7 @@ import time
 from urllib.parse import urlparse
 from typing import Any, Dict, List
 from pyspark.ml.base import Params
-from pyspark.ml.util import _get_active_session
+from pyspark.sql import SparkSession
 from pyspark.sql.utils import is_remote
 
 
@@ -34,7 +34,7 @@ _META_DATA_FILE_NAME = "metadata.json"
 
 
 def _copy_file_from_local_to_fs(local_path

[spark] branch branch-3.5 updated: [MINOR][UI] Increasing the number of significant digits for Fraction Cached of RDD

2023-08-07 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new df02aa29ddc [MINOR][UI] Increasing the number of significant digits 
for Fraction Cached of RDD
df02aa29ddc is described below

commit df02aa29ddc9b03e4386128f26bfc98b869485e5
Author: Kent Yao 
AuthorDate: Tue Aug 8 08:10:09 2023 +0900

[MINOR][UI] Increasing the number of significant digits for Fraction Cached 
of RDD

### What changes were proposed in this pull request?

This PR is a typo improvement for increasing the number of significant 
digits for Fraction Cached of RDD that shows on the Storage Tab.

### Why are the changes needed?

improves accuracy and precision


![image](https://github.com/apache/spark/assets/8326978/7106352c-b806-4953-8938-c4cba8ea1191)

### Does this PR introduce _any_ user-facing change?

Yes, the Fraction Cached on Storage Page increases the fractional length 
from 0 to 2

### How was this patch tested?

locally verified

Closes #42373 from yaooqinn/uiminor.

Authored-by: Kent Yao 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit f47a2560e6e39ba8eac51a76290614b2fba4d65a)
Signed-off-by: Hyukjin Kwon 
---
 .../scala/org/apache/spark/ui/storage/StoragePage.scala   |  2 +-
 .../org/apache/spark/ui/storage/StoragePageSuite.scala| 15 ---
 2 files changed, 9 insertions(+), 8 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala 
b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
index c1708c320c5..72662267365 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
@@ -98,7 +98,7 @@ private[ui] class StoragePage(parent: SparkUITab, store: 
AppStatusStore) extends
   {rdd.storageLevel}
   
   {rdd.numCachedPartitions.toString}
-  {"%.0f%%".format(rdd.numCachedPartitions * 100.0 / 
rdd.numPartitions)}
+  {"%.2f%%".format(rdd.numCachedPartitions * 100.0 / 
rdd.numPartitions)}
   {Utils.bytesToString(rdd.memoryUsed)}
   {Utils.bytesToString(rdd.diskUsed)}
 
diff --git 
a/core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala
index 718c6856cb3..d1e25bf8a23 100644
--- a/core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala
@@ -48,8 +48,8 @@ class StoragePageSuite extends SparkFunSuite {
 
 val rdd2 = new RDDStorageInfo(2,
   "rdd2",
-  10,
-  5,
+  1000,
+  56,
   StorageLevel.DISK_ONLY.description,
   0L,
   200L,
@@ -58,8 +58,8 @@ class StoragePageSuite extends SparkFunSuite {
 
 val rdd3 = new RDDStorageInfo(3,
   "rdd3",
-  10,
-  10,
+  1000,
+  103,
   StorageLevel.MEMORY_AND_DISK_SER.description,
   400L,
   500L,
@@ -94,19 +94,20 @@ class StoragePageSuite extends SparkFunSuite {
 
 assert((xmlNodes \\ "tr").size === 3)
 assert(((xmlNodes \\ "tr")(0) \\ "td").map(_.text.trim) ===
-  Seq("1", "rdd1", "Memory Deserialized 1x Replicated", "10", "100%", 
"100.0 B", "0.0 B"))
+  Seq("1", "rdd1", "Memory Deserialized 1x Replicated", "10", "100.00%", 
"100.0 B", "0.0 B"))
 // Check the url
 assert(((xmlNodes \\ "tr")(0) \\ "td" \ 
"a")(0).attribute("href").map(_.text) ===
   Some("http://localhost:4040/storage/rdd/?id=1";))
 
 assert(((xmlNodes \\ "tr")(1) \\ "td").map(_.text.trim) ===
-  Seq("2", "rdd2", "Disk Serialized 1x Replicated", "5", "50%", "0.0 B", 
"200.0 B"))
+  Seq("2", "rdd2", "Disk Serialized 1x Replicated", "56", "5.60%", "0.0 
B", "200.0 B"))
 // Check the url
 assert(((xmlNodes \\ "tr")(1) \\ "td" \ 
"a")(0).attribute("href").map(_.text) ===
   Some("http://localhost:4040/storage/rdd/?id=2";))
 
 assert(((xmlNodes \\ "tr")(2) \\ "td").map(_.text.trim) ===
-  Seq("3", "rdd3", "Disk Memory Serialized 1x Replicated", "10", "100%", 
"400.0 B", "500.0 B"))
+  Seq("3", "rdd3", "Disk Memory Serialized 1x Replicated", "103", 
"10.30%", "400.0 B",
+"500.0 B"))
 // Check the url
 assert(((xmlNodes \\ "tr")(2) \\ "td" \ 
"a")(0).attribute("href").map(_.text) ===
   Some("http://localhost:4040/storage/rdd/?id=3";))


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [MINOR][UI] Increasing the number of significant digits for Fraction Cached of RDD

2023-08-07 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f47a2560e6e [MINOR][UI] Increasing the number of significant digits 
for Fraction Cached of RDD
f47a2560e6e is described below

commit f47a2560e6e39ba8eac51a76290614b2fba4d65a
Author: Kent Yao 
AuthorDate: Tue Aug 8 08:10:09 2023 +0900

[MINOR][UI] Increasing the number of significant digits for Fraction Cached 
of RDD

### What changes were proposed in this pull request?

This PR is a typo improvement for increasing the number of significant 
digits for Fraction Cached of RDD that shows on the Storage Tab.

### Why are the changes needed?

improves accuracy and precision


![image](https://github.com/apache/spark/assets/8326978/7106352c-b806-4953-8938-c4cba8ea1191)

### Does this PR introduce _any_ user-facing change?

Yes, the Fraction Cached on Storage Page increases the fractional length 
from 0 to 2

### How was this patch tested?

locally verified

Closes #42373 from yaooqinn/uiminor.

Authored-by: Kent Yao 
Signed-off-by: Hyukjin Kwon 
---
 .../scala/org/apache/spark/ui/storage/StoragePage.scala   |  2 +-
 .../org/apache/spark/ui/storage/StoragePageSuite.scala| 15 ---
 2 files changed, 9 insertions(+), 8 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala 
b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
index c1708c320c5..72662267365 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
@@ -98,7 +98,7 @@ private[ui] class StoragePage(parent: SparkUITab, store: 
AppStatusStore) extends
   {rdd.storageLevel}
   
   {rdd.numCachedPartitions.toString}
-  {"%.0f%%".format(rdd.numCachedPartitions * 100.0 / 
rdd.numPartitions)}
+  {"%.2f%%".format(rdd.numCachedPartitions * 100.0 / 
rdd.numPartitions)}
   {Utils.bytesToString(rdd.memoryUsed)}
   {Utils.bytesToString(rdd.diskUsed)}
 
diff --git 
a/core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala
index 718c6856cb3..d1e25bf8a23 100644
--- a/core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala
@@ -48,8 +48,8 @@ class StoragePageSuite extends SparkFunSuite {
 
 val rdd2 = new RDDStorageInfo(2,
   "rdd2",
-  10,
-  5,
+  1000,
+  56,
   StorageLevel.DISK_ONLY.description,
   0L,
   200L,
@@ -58,8 +58,8 @@ class StoragePageSuite extends SparkFunSuite {
 
 val rdd3 = new RDDStorageInfo(3,
   "rdd3",
-  10,
-  10,
+  1000,
+  103,
   StorageLevel.MEMORY_AND_DISK_SER.description,
   400L,
   500L,
@@ -94,19 +94,20 @@ class StoragePageSuite extends SparkFunSuite {
 
 assert((xmlNodes \\ "tr").size === 3)
 assert(((xmlNodes \\ "tr")(0) \\ "td").map(_.text.trim) ===
-  Seq("1", "rdd1", "Memory Deserialized 1x Replicated", "10", "100%", 
"100.0 B", "0.0 B"))
+  Seq("1", "rdd1", "Memory Deserialized 1x Replicated", "10", "100.00%", 
"100.0 B", "0.0 B"))
 // Check the url
 assert(((xmlNodes \\ "tr")(0) \\ "td" \ 
"a")(0).attribute("href").map(_.text) ===
   Some("http://localhost:4040/storage/rdd/?id=1";))
 
 assert(((xmlNodes \\ "tr")(1) \\ "td").map(_.text.trim) ===
-  Seq("2", "rdd2", "Disk Serialized 1x Replicated", "5", "50%", "0.0 B", 
"200.0 B"))
+  Seq("2", "rdd2", "Disk Serialized 1x Replicated", "56", "5.60%", "0.0 
B", "200.0 B"))
 // Check the url
 assert(((xmlNodes \\ "tr")(1) \\ "td" \ 
"a")(0).attribute("href").map(_.text) ===
   Some("http://localhost:4040/storage/rdd/?id=2";))
 
 assert(((xmlNodes \\ "tr")(2) \\ "td").map(_.text.trim) ===
-  Seq("3", "rdd3", "Disk Memory Serialized 1x Replicated", "10", "100%", 
"400.0 B", "500.0 B"))
+  Seq("3", "rdd3", "Disk Memory Serialized 1x Replicated", "103", 
"10.30%", "400.0 B",
+"500.0 B"))
 // Check the url
 assert(((xmlNodes \\ "tr")(2) \\ "td" \ 
"a")(0).attribute("href").map(_.text) ===
   Some("http://localhost:4040/storage/rdd/?id=3";))


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.5 updated: [SPARK-44132][SQL] Materialize `Stream` of join column names to avoid codegen failure

2023-08-07 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 4f869ce6f67 [SPARK-44132][SQL] Materialize `Stream` of join column 
names to avoid codegen failure
4f869ce6f67 is described below

commit 4f869ce6f67c4bb2444c351632adaf9c78c5ec1b
Author: Steven Aerts 
AuthorDate: Tue Aug 8 08:09:05 2023 +0900

[SPARK-44132][SQL] Materialize `Stream` of join column names to avoid 
codegen failure

### What changes were proposed in this pull request?

Materialize passed join columns as an `IndexedSeq` before passing it to the 
lower layers.

### Why are the changes needed?

When nesting multiple full outer joins using column names which are a 
`Stream`, the code generator will generate faulty code resulting in a NPE or 
bad `UnsafeRow` access at runtime. See the 2 added test cases.

### Why are the changes needed?
Otherwise the code will crash, see the 2 added test cases.  Which show an 
NPE and a bad `UnsafeRow` access.

### Does this PR introduce _any_ user-facing change?
No, only bug fix.

### How was this patch tested?
A reproduction scenario was created and added to the code base.

Closes #41712 from steven-aerts/SPARK-44132-fix.

Authored-by: Steven Aerts 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit 8911578020f8a2428b12dd72cb0ed4b7d747d835)
Signed-off-by: Hyukjin Kwon 
---
 .../main/scala/org/apache/spark/sql/Dataset.scala|  2 +-
 .../sql/execution/joins/JoinCodegenSupport.scala |  2 +-
 .../test/scala/org/apache/spark/sql/JoinSuite.scala  | 20 
 3 files changed, 22 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 61c83829d20..eda017937d9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -1092,7 +1092,7 @@ class Dataset[T] private[sql](
   Join(
 joined.left,
 joined.right,
-UsingJoin(JoinType(joinType), usingColumns),
+UsingJoin(JoinType(joinType), usingColumns.toIndexedSeq),
 None,
 JoinHint.NONE)
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala
index a7d1edefcd6..6496f9a0006 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala
@@ -79,7 +79,7 @@ trait JoinCodegenSupport extends CodegenSupport with 
BaseJoinExec {
   setDefaultValue: Boolean): Seq[ExprCode] = {
 ctx.currentVars = null
 ctx.INPUT_ROW = row
-plan.output.zipWithIndex.map { case (a, i) =>
+plan.output.toIndexedSeq.zipWithIndex.map { case (a, i) =>
   val ev = BoundReference(i, a.dataType, a.nullable).genCode(ctx)
   if (setDefaultValue) {
 // the variables are needed even there is no matched rows
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 7f358723eeb..14f1fb27906 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -1709,4 +1709,24 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
   checkAnswer(sql(query), expected)
 }
   }
+
+  test("SPARK-44132: FULL OUTER JOIN by streamed column name fails with NPE") {
+val dsA = Seq((1, "a")).toDF("id", "c1")
+val dsB = Seq((2, "b")).toDF("id", "c2")
+val dsC = Seq((3, "c")).toDF("id", "c3")
+val joined = dsA.join(dsB, Stream("id"), "full_outer").join(dsC, 
Stream("id"), "full_outer")
+
+val expected = Seq(Row(1, "a", null, null), Row(2, null, "b", null), 
Row(3, null, null, "c"))
+
+checkAnswer(joined, expected)
+  }
+
+  test("SPARK-44132: FULL OUTER JOIN by streamed column name fails with 
invalid access") {
+val ds = Seq((1, "a")).toDF("id", "c1")
+val joined = ds.join(ds, Stream("id"), "full_outer").join(ds, 
Stream("id"), "full_outer")
+
+val expected = Seq(Row(1, "a", "a", "a"))
+
+checkAnswer(joined, expected)
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-44132][SQL] Materialize `Stream` of join column names to avoid codegen failure

2023-08-07 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 8911578020f [SPARK-44132][SQL] Materialize `Stream` of join column 
names to avoid codegen failure
8911578020f is described below

commit 8911578020f8a2428b12dd72cb0ed4b7d747d835
Author: Steven Aerts 
AuthorDate: Tue Aug 8 08:09:05 2023 +0900

[SPARK-44132][SQL] Materialize `Stream` of join column names to avoid 
codegen failure

### What changes were proposed in this pull request?

Materialize passed join columns as an `IndexedSeq` before passing it to the 
lower layers.

### Why are the changes needed?

When nesting multiple full outer joins using column names which are a 
`Stream`, the code generator will generate faulty code resulting in a NPE or 
bad `UnsafeRow` access at runtime. See the 2 added test cases.

### Why are the changes needed?
Otherwise the code will crash, see the 2 added test cases.  Which show an 
NPE and a bad `UnsafeRow` access.

### Does this PR introduce _any_ user-facing change?
No, only bug fix.

### How was this patch tested?
A reproduction scenario was created and added to the code base.

Closes #41712 from steven-aerts/SPARK-44132-fix.

Authored-by: Steven Aerts 
Signed-off-by: Hyukjin Kwon 
---
 .../main/scala/org/apache/spark/sql/Dataset.scala|  2 +-
 .../sql/execution/joins/JoinCodegenSupport.scala |  2 +-
 .../test/scala/org/apache/spark/sql/JoinSuite.scala  | 20 
 3 files changed, 22 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 61c83829d20..eda017937d9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -1092,7 +1092,7 @@ class Dataset[T] private[sql](
   Join(
 joined.left,
 joined.right,
-UsingJoin(JoinType(joinType), usingColumns),
+UsingJoin(JoinType(joinType), usingColumns.toIndexedSeq),
 None,
 JoinHint.NONE)
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala
index a7d1edefcd6..6496f9a0006 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala
@@ -79,7 +79,7 @@ trait JoinCodegenSupport extends CodegenSupport with 
BaseJoinExec {
   setDefaultValue: Boolean): Seq[ExprCode] = {
 ctx.currentVars = null
 ctx.INPUT_ROW = row
-plan.output.zipWithIndex.map { case (a, i) =>
+plan.output.toIndexedSeq.zipWithIndex.map { case (a, i) =>
   val ev = BoundReference(i, a.dataType, a.nullable).genCode(ctx)
   if (setDefaultValue) {
 // the variables are needed even there is no matched rows
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 7f358723eeb..14f1fb27906 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -1709,4 +1709,24 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
   checkAnswer(sql(query), expected)
 }
   }
+
+  test("SPARK-44132: FULL OUTER JOIN by streamed column name fails with NPE") {
+val dsA = Seq((1, "a")).toDF("id", "c1")
+val dsB = Seq((2, "b")).toDF("id", "c2")
+val dsC = Seq((3, "c")).toDF("id", "c3")
+val joined = dsA.join(dsB, Stream("id"), "full_outer").join(dsC, 
Stream("id"), "full_outer")
+
+val expected = Seq(Row(1, "a", null, null), Row(2, null, "b", null), 
Row(3, null, null, "c"))
+
+checkAnswer(joined, expected)
+  }
+
+  test("SPARK-44132: FULL OUTER JOIN by streamed column name fails with 
invalid access") {
+val ds = Seq((1, "a")).toDF("id", "c1")
+val joined = ds.join(ds, Stream("id"), "full_outer").join(ds, 
Stream("id"), "full_outer")
+
+val expected = Seq(Row(1, "a", "a", "a"))
+
+checkAnswer(joined, expected)
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.5 updated: [SPARK-44692][CONNECT][SQL] Move Trigger(s) to sql/api

2023-08-07 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new d8f02274c38 [SPARK-44692][CONNECT][SQL] Move Trigger(s) to sql/api
d8f02274c38 is described below

commit d8f02274c38c027e2d56f5158ce63f6e74255d2d
Author: Herman van Hovell 
AuthorDate: Tue Aug 8 00:42:13 2023 +0200

[SPARK-44692][CONNECT][SQL] Move Trigger(s) to sql/api

This PR moves `Triggers.scala` and `Trigger.scala` from `sql/core` to 
`sql/api`, and it removes the duplicates from the connect scala client.

Not really needed, just some deduplication.

No.

Existing tests.

Closes #42368 from hvanhovell/SPARK-44692.

Authored-by: Herman van Hovell 
Signed-off-by: Herman van Hovell 
(cherry picked from commit 4eea89d339649152a1afcd8b7a32020454e71d42)
Signed-off-by: Herman van Hovell 
---
 .../org/apache/spark/sql/streaming/Trigger.java| 180 -
 dev/checkstyle-suppressions.xml|   4 +-
 project/MimaExcludes.scala |   4 +-
 .../org/apache/spark/sql/streaming/Trigger.java|   0
 .../spark/sql/execution/streaming/Triggers.scala   |   6 +-
 .../spark/sql/execution/streaming/Triggers.scala   | 113 -
 6 files changed, 6 insertions(+), 301 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/java/org/apache/spark/sql/streaming/Trigger.java
 
b/connector/connect/client/jvm/src/main/java/org/apache/spark/sql/streaming/Trigger.java
deleted file mode 100644
index 27ffe67d990..000
--- 
a/connector/connect/client/jvm/src/main/java/org/apache/spark/sql/streaming/Trigger.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming;
-
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.duration.Duration;
-
-import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.execution.streaming.AvailableNowTrigger$;
-import org.apache.spark.sql.execution.streaming.ContinuousTrigger;
-import org.apache.spark.sql.execution.streaming.OneTimeTrigger$;
-import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger;
-
-/**
- * Policy used to indicate how often results should be produced by a 
[[StreamingQuery]].
- *
- * @since 3.5.0
- */
-@Evolving
-public class Trigger {
-  // This is a copy of the same class in sql/core/.../streaming/Trigger.java
-
-  /**
-   * A trigger policy that runs a query periodically based on an interval in 
processing time.
-   * If `interval` is 0, the query will run as fast as possible.
-   *
-   * @since 3.5.0
-   */
-  public static Trigger ProcessingTime(long intervalMs) {
-  return ProcessingTimeTrigger.create(intervalMs, TimeUnit.MILLISECONDS);
-  }
-
-  /**
-   * (Java-friendly)
-   * A trigger policy that runs a query periodically based on an interval in 
processing time.
-   * If `interval` is 0, the query will run as fast as possible.
-   *
-   * {{{
-   *import java.util.concurrent.TimeUnit
-   *df.writeStream().trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS))
-   * }}}
-   *
-   * @since 3.5.0
-   */
-  public static Trigger ProcessingTime(long interval, TimeUnit timeUnit) {
-  return ProcessingTimeTrigger.create(interval, timeUnit);
-  }
-
-  /**
-   * (Scala-friendly)
-   * A trigger policy that runs a query periodically based on an interval in 
processing time.
-   * If `duration` is 0, the query will run as fast as possible.
-   *
-   * {{{
-   *import scala.concurrent.duration._
-   *df.writeStream.trigger(Trigger.ProcessingTime(10.seconds))
-   * }}}
-   * @since 3.5.0
-   */
-  public static Trigger ProcessingTime(Duration interval) {
-  return ProcessingTimeTrigger.apply(interval);
-  }
-
-  /**
-   * A trigger policy that runs a query periodically based on an interval in 
processing time.
-   * If `interval` is effectively 0, the query will run as fast as possible.
-   *
-   * {{{
-   *df.writeStream.trigger(Trigger.ProcessingTime("10 seconds"))
-   * }}}
-   * @since 3.5.0

[spark] branch master updated: [SPARK-44692][CONNECT][SQL] Move Trigger(s) to sql/api

2023-08-07 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 4eea89d3396 [SPARK-44692][CONNECT][SQL] Move Trigger(s) to sql/api
4eea89d3396 is described below

commit 4eea89d339649152a1afcd8b7a32020454e71d42
Author: Herman van Hovell 
AuthorDate: Tue Aug 8 00:42:13 2023 +0200

[SPARK-44692][CONNECT][SQL] Move Trigger(s) to sql/api

### What changes were proposed in this pull request?
This PR moves `Triggers.scala` and `Trigger.scala` from `sql/core` to 
`sql/api`, and it removes the duplicates from the connect scala client.

### Why are the changes needed?
Not really needed, just some deduplication.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing tests.

Closes #42368 from hvanhovell/SPARK-44692.

Authored-by: Herman van Hovell 
Signed-off-by: Herman van Hovell 
---
 .../org/apache/spark/sql/streaming/Trigger.java| 180 -
 dev/checkstyle-suppressions.xml|   4 +-
 project/MimaExcludes.scala |   4 +-
 .../org/apache/spark/sql/streaming/Trigger.java|   0
 .../spark/sql/execution/streaming/Triggers.scala   |   6 +-
 .../spark/sql/execution/streaming/Triggers.scala   | 113 -
 6 files changed, 6 insertions(+), 301 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/java/org/apache/spark/sql/streaming/Trigger.java
 
b/connector/connect/client/jvm/src/main/java/org/apache/spark/sql/streaming/Trigger.java
deleted file mode 100644
index 27ffe67d990..000
--- 
a/connector/connect/client/jvm/src/main/java/org/apache/spark/sql/streaming/Trigger.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming;
-
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.duration.Duration;
-
-import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.execution.streaming.AvailableNowTrigger$;
-import org.apache.spark.sql.execution.streaming.ContinuousTrigger;
-import org.apache.spark.sql.execution.streaming.OneTimeTrigger$;
-import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger;
-
-/**
- * Policy used to indicate how often results should be produced by a 
[[StreamingQuery]].
- *
- * @since 3.5.0
- */
-@Evolving
-public class Trigger {
-  // This is a copy of the same class in sql/core/.../streaming/Trigger.java
-
-  /**
-   * A trigger policy that runs a query periodically based on an interval in 
processing time.
-   * If `interval` is 0, the query will run as fast as possible.
-   *
-   * @since 3.5.0
-   */
-  public static Trigger ProcessingTime(long intervalMs) {
-  return ProcessingTimeTrigger.create(intervalMs, TimeUnit.MILLISECONDS);
-  }
-
-  /**
-   * (Java-friendly)
-   * A trigger policy that runs a query periodically based on an interval in 
processing time.
-   * If `interval` is 0, the query will run as fast as possible.
-   *
-   * {{{
-   *import java.util.concurrent.TimeUnit
-   *df.writeStream().trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS))
-   * }}}
-   *
-   * @since 3.5.0
-   */
-  public static Trigger ProcessingTime(long interval, TimeUnit timeUnit) {
-  return ProcessingTimeTrigger.create(interval, timeUnit);
-  }
-
-  /**
-   * (Scala-friendly)
-   * A trigger policy that runs a query periodically based on an interval in 
processing time.
-   * If `duration` is 0, the query will run as fast as possible.
-   *
-   * {{{
-   *import scala.concurrent.duration._
-   *df.writeStream.trigger(Trigger.ProcessingTime(10.seconds))
-   * }}}
-   * @since 3.5.0
-   */
-  public static Trigger ProcessingTime(Duration interval) {
-  return ProcessingTimeTrigger.apply(interval);
-  }
-
-  /**
-   * A trigger policy that runs a query periodically based on an interval in 
processing time.
-   * If `interval` is effectively 0, the query will run as fast as possible.
-   *
-   * {{{
-   *df.writeStream.trigger(T

[spark] branch branch-3.5 updated: [SPARK-44575][SQL][CONNECT] Implement basic error translation

2023-08-07 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 3b24f65fd5a [SPARK-44575][SQL][CONNECT] Implement basic error 
translation
3b24f65fd5a is described below

commit 3b24f65fd5a97cb58d87ec0d1d798df59d946fbd
Author: Yihong He 
AuthorDate: Tue Aug 8 06:33:48 2023 +0900

[SPARK-44575][SQL][CONNECT] Implement basic error translation

### What changes were proposed in this pull request?

- Implement basic error translation for spark connect scala client.

### Why are the changes needed?

- Better compatibility with the existing control flow

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

`build/sbt "connect-client-jvm/testOnly *Suite"`

Closes #42266 from heyihong/SPARK-44575.

Authored-by: Yihong He 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit 42e5daddf3ba16ff7d08e82e51cd8924cc56e180)
Signed-off-by: Hyukjin Kwon 
---
 .../connect/client/GrpcExceptionConverter.scala| 54 ++
 .../scala/org/apache/spark/sql/CatalogSuite.scala  |  6 +--
 .../org/apache/spark/sql/ClientE2ETestSuite.scala  | 12 +++--
 .../spark/sql/DataFrameNaFunctionSuite.scala   |  3 +-
 .../sql/KeyValueGroupedDatasetE2ETestSuite.scala   |  3 +-
 5 files changed, 60 insertions(+), 18 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
index 7ff3421a5a0..64d1e5c488a 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
@@ -16,18 +16,26 @@
  */
 package org.apache.spark.sql.connect.client
 
+import scala.jdk.CollectionConverters._
+import scala.reflect.ClassTag
+
+import com.google.rpc.ErrorInfo
 import io.grpc.StatusRuntimeException
 import io.grpc.protobuf.StatusProto
 
-import org.apache.spark.{SparkException, SparkThrowable}
+import org.apache.spark.SparkException
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.catalyst.trees.Origin
+import org.apache.spark.util.JsonUtils
 
-private[client] object GrpcExceptionConverter {
+private[client] object GrpcExceptionConverter extends JsonUtils {
   def convert[T](f: => T): T = {
 try {
   f
 } catch {
   case e: StatusRuntimeException =>
-throw toSparkThrowable(e)
+throw toThrowable(e)
 }
   }
 
@@ -53,11 +61,41 @@ private[client] object GrpcExceptionConverter {
 }
   }
 
-  private def toSparkThrowable(ex: StatusRuntimeException): SparkThrowable 
with Throwable = {
-val status = StatusProto.fromThrowable(ex)
-// TODO: Add finer grained error conversion
-new SparkException(status.getMessage, ex.getCause)
+  private def errorConstructor[T <: Throwable: ClassTag](
+  throwableCtr: (String, Throwable) => T): (String, (String, Throwable) => 
Throwable) = {
+val className = implicitly[reflect.ClassTag[T]].runtimeClass.getName
+(className, throwableCtr)
   }
-}
 
+  private val errorFactory = Map(
+errorConstructor((message, _) => new ParseException(None, message, 
Origin(), Origin())),
+errorConstructor((message, cause) => new AnalysisException(message, cause 
= Option(cause
+
+  private def errorInfoToThrowable(info: ErrorInfo, message: String): 
Option[Throwable] = {
+val classes =
+  mapper.readValue(info.getMetadataOrDefault("classes", "[]"), 
classOf[Array[String]])
 
+classes
+  .find(errorFactory.contains)
+  .map { cls =>
+val constructor = errorFactory.get(cls).get
+constructor(message, null)
+  }
+  }
+
+  private def toThrowable(ex: StatusRuntimeException): Throwable = {
+val status = StatusProto.fromThrowable(ex)
+
+val fallbackEx = new SparkException(status.getMessage, ex.getCause)
+
+val errorInfoOpt = status.getDetailsList.asScala
+  .find(_.is(classOf[ErrorInfo]))
+
+if (errorInfoOpt.isEmpty) {
+  return fallbackEx
+}
+
+errorInfoToThrowable(errorInfoOpt.get.unpack(classOf[ErrorInfo]), 
status.getMessage)
+  .getOrElse(fallbackEx)
+  }
+}
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CatalogSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CatalogSuite.scala
index 00a6bcc9b5c..fa97498f7e7 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CatalogSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/

[spark] branch master updated: [SPARK-44575][SQL][CONNECT] Implement basic error translation

2023-08-07 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 42e5daddf3b [SPARK-44575][SQL][CONNECT] Implement basic error 
translation
42e5daddf3b is described below

commit 42e5daddf3ba16ff7d08e82e51cd8924cc56e180
Author: Yihong He 
AuthorDate: Tue Aug 8 06:33:48 2023 +0900

[SPARK-44575][SQL][CONNECT] Implement basic error translation

### What changes were proposed in this pull request?

- Implement basic error translation for spark connect scala client.

### Why are the changes needed?

- Better compatibility with the existing control flow

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

`build/sbt "connect-client-jvm/testOnly *Suite"`

Closes #42266 from heyihong/SPARK-44575.

Authored-by: Yihong He 
Signed-off-by: Hyukjin Kwon 
---
 .../connect/client/GrpcExceptionConverter.scala| 54 ++
 .../scala/org/apache/spark/sql/CatalogSuite.scala  |  6 +--
 .../org/apache/spark/sql/ClientE2ETestSuite.scala  | 12 +++--
 .../spark/sql/DataFrameNaFunctionSuite.scala   |  3 +-
 .../sql/KeyValueGroupedDatasetE2ETestSuite.scala   |  3 +-
 5 files changed, 60 insertions(+), 18 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
index 7ff3421a5a0..64d1e5c488a 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
@@ -16,18 +16,26 @@
  */
 package org.apache.spark.sql.connect.client
 
+import scala.jdk.CollectionConverters._
+import scala.reflect.ClassTag
+
+import com.google.rpc.ErrorInfo
 import io.grpc.StatusRuntimeException
 import io.grpc.protobuf.StatusProto
 
-import org.apache.spark.{SparkException, SparkThrowable}
+import org.apache.spark.SparkException
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.catalyst.trees.Origin
+import org.apache.spark.util.JsonUtils
 
-private[client] object GrpcExceptionConverter {
+private[client] object GrpcExceptionConverter extends JsonUtils {
   def convert[T](f: => T): T = {
 try {
   f
 } catch {
   case e: StatusRuntimeException =>
-throw toSparkThrowable(e)
+throw toThrowable(e)
 }
   }
 
@@ -53,11 +61,41 @@ private[client] object GrpcExceptionConverter {
 }
   }
 
-  private def toSparkThrowable(ex: StatusRuntimeException): SparkThrowable 
with Throwable = {
-val status = StatusProto.fromThrowable(ex)
-// TODO: Add finer grained error conversion
-new SparkException(status.getMessage, ex.getCause)
+  private def errorConstructor[T <: Throwable: ClassTag](
+  throwableCtr: (String, Throwable) => T): (String, (String, Throwable) => 
Throwable) = {
+val className = implicitly[reflect.ClassTag[T]].runtimeClass.getName
+(className, throwableCtr)
   }
-}
 
+  private val errorFactory = Map(
+errorConstructor((message, _) => new ParseException(None, message, 
Origin(), Origin())),
+errorConstructor((message, cause) => new AnalysisException(message, cause 
= Option(cause
+
+  private def errorInfoToThrowable(info: ErrorInfo, message: String): 
Option[Throwable] = {
+val classes =
+  mapper.readValue(info.getMetadataOrDefault("classes", "[]"), 
classOf[Array[String]])
 
+classes
+  .find(errorFactory.contains)
+  .map { cls =>
+val constructor = errorFactory.get(cls).get
+constructor(message, null)
+  }
+  }
+
+  private def toThrowable(ex: StatusRuntimeException): Throwable = {
+val status = StatusProto.fromThrowable(ex)
+
+val fallbackEx = new SparkException(status.getMessage, ex.getCause)
+
+val errorInfoOpt = status.getDetailsList.asScala
+  .find(_.is(classOf[ErrorInfo]))
+
+if (errorInfoOpt.isEmpty) {
+  return fallbackEx
+}
+
+errorInfoToThrowable(errorInfoOpt.get.unpack(classOf[ErrorInfo]), 
status.getMessage)
+  .getOrElse(fallbackEx)
+  }
+}
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CatalogSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CatalogSuite.scala
index 00a6bcc9b5c..fa97498f7e7 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CatalogSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CatalogSuite.scala
@@ -46,7 +46,7 @@ class CatalogSuite extends RemoteSparkSession with SQLHelper

[spark] branch master updated: [SPARK-43606][PS] Remove `Int64Index` & `Float64Index`

2023-08-07 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new d5d3f393f16 [SPARK-43606][PS] Remove `Int64Index` & `Float64Index`
d5d3f393f16 is described below

commit d5d3f393f16d1c17f88857b81e9bd7573d594d87
Author: itholic 
AuthorDate: Tue Aug 8 06:31:52 2023 +0900

[SPARK-43606][PS] Remove `Int64Index` & `Float64Index`

### What changes were proposed in this pull request?

This PR proposes to remove `Int64Index` & `Float64Index` from pandas API on 
Spark.

### Why are the changes needed?

To match the behavior with pandas 2 and above.

### Does this PR introduce _any_ user-facing change?

Yes, the `Int64Index` & `Float64Index` will be removed.

### How was this patch tested?

Enabling the existing doctests & UTs.

Closes #42267 from itholic/SPARK-43245.

Authored-by: itholic 
Signed-off-by: Hyukjin Kwon 
---
 dev/sparktestsupport/modules.py|   1 -
 .../source/migration_guide/pyspark_upgrade.rst |   1 +
 .../source/reference/pyspark.pandas/indexing.rst   |  10 -
 python/pyspark/pandas/__init__.py  |   3 -
 python/pyspark/pandas/base.py  |  28 +--
 python/pyspark/pandas/frame.py |   8 +-
 python/pyspark/pandas/indexes/__init__.py  |   1 -
 python/pyspark/pandas/indexes/base.py  | 159 +++-
 python/pyspark/pandas/indexes/category.py  |   4 +-
 python/pyspark/pandas/indexes/datetimes.py |  12 +-
 python/pyspark/pandas/indexes/numeric.py   | 210 -
 python/pyspark/pandas/series.py|   4 +-
 python/pyspark/pandas/spark/accessors.py   |  16 +-
 python/pyspark/pandas/tests/indexes/test_base.py   |  45 +
 python/pyspark/pandas/tests/series/test_compute.py |   4 +-
 python/pyspark/pandas/tests/series/test_series.py  |   3 +-
 python/pyspark/pandas/usage_logging/__init__.py|   3 -
 17 files changed, 124 insertions(+), 388 deletions(-)

diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index b2f978c47ea..c5be1957a7d 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -668,7 +668,6 @@ pyspark_pandas = Module(
 "pyspark.pandas.indexes.datetimes",
 "pyspark.pandas.indexes.timedelta",
 "pyspark.pandas.indexes.multi",
-"pyspark.pandas.indexes.numeric",
 "pyspark.pandas.spark.accessors",
 "pyspark.pandas.spark.utils",
 "pyspark.pandas.typedef.typehints",
diff --git a/python/docs/source/migration_guide/pyspark_upgrade.rst 
b/python/docs/source/migration_guide/pyspark_upgrade.rst
index 9bd879fb1a1..7a691ee2645 100644
--- a/python/docs/source/migration_guide/pyspark_upgrade.rst
+++ b/python/docs/source/migration_guide/pyspark_upgrade.rst
@@ -22,6 +22,7 @@ Upgrading PySpark
 Upgrading from PySpark 3.5 to 4.0
 -
 
+* In Spark 4.0, ``Int64Index`` and ``Float64Index`` have been removed from 
pandas API on Spark, ``Index`` should be used directly.
 * In Spark 4.0, ``DataFrame.iteritems`` has been removed from pandas API on 
Spark, use ``DataFrame.items`` instead.
 * In Spark 4.0, ``Series.iteritems`` has been removed from pandas API on 
Spark, use ``Series.items`` instead.
 * In Spark 4.0, ``DataFrame.append`` has been removed from pandas API on 
Spark, use ``ps.concat`` instead.
diff --git a/python/docs/source/reference/pyspark.pandas/indexing.rst 
b/python/docs/source/reference/pyspark.pandas/indexing.rst
index 15539fa2266..70d463c052a 100644
--- a/python/docs/source/reference/pyspark.pandas/indexing.rst
+++ b/python/docs/source/reference/pyspark.pandas/indexing.rst
@@ -166,16 +166,6 @@ Selecting
Index.asof
Index.isin
 
-.. _api.numeric:
-
-Numeric Index
--
-.. autosummary::
-   :toctree: api/
-
-   Int64Index
-   Float64Index
-
 .. _api.categorical:
 
 CategoricalIndex
diff --git a/python/pyspark/pandas/__init__.py 
b/python/pyspark/pandas/__init__.py
index 980aeab2bee..d8ce385639c 100644
--- a/python/pyspark/pandas/__init__.py
+++ b/python/pyspark/pandas/__init__.py
@@ -61,7 +61,6 @@ from pyspark.pandas.indexes.base import Index
 from pyspark.pandas.indexes.category import CategoricalIndex
 from pyspark.pandas.indexes.datetimes import DatetimeIndex
 from pyspark.pandas.indexes.multi import MultiIndex
-from pyspark.pandas.indexes.numeric import Float64Index, Int64Index
 from pyspark.pandas.indexes.timedelta import TimedeltaIndex
 from pyspark.pandas.series import Series
 from pyspark.pandas.groupby import NamedAgg
@@ -77,8 +76,6 @@ __all__ = [  # noqa: F405
 "Series",
 "Index",
 "MultiIndex",
-"Int64Index",
-"Float64Index",
 "CategoricalIndex",
 "DatetimeIndex",
 "Tim

[spark] branch master updated: [SPARK-44707][K8S] Use INFO log in `ExecutorPodsWatcher.onClose` if `SparkContext` is stopped

2023-08-07 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 726ccb532a1 [SPARK-44707][K8S] Use INFO log in 
`ExecutorPodsWatcher.onClose` if `SparkContext` is stopped
726ccb532a1 is described below

commit 726ccb532a1b5dbe9c55e68a71d3125570c6738d
Author: Dongjoon Hyun 
AuthorDate: Mon Aug 7 13:56:24 2023 -0700

[SPARK-44707][K8S] Use INFO log in `ExecutorPodsWatcher.onClose` if 
`SparkContext` is stopped

### What changes were proposed in this pull request?

This PR is a minor log change which aims to use `INFO`-level log instead of 
`WARN`-level in `ExecutorPodsWatcher.onClose` if `SparkContext` is stopped. 
Since Spark can distinguish the expected behavior from the error cases, Spark 
had better avoid WARNING.

### Why are the changes needed?

Previously, we have `WARN ExecutorPodsWatchSnapshotSource: Kubernetes 
client has been closed` message.
```
23/08/07 18:10:14 INFO SparkContext: SparkContext is stopping with exitCode 
0.
23/08/07 18:10:14 WARN TaskSetManager: Lost task 2594.0 in stage 0.0 (TID 
2594) ([2620:149:100d:1813::3f86] executor 1615): TaskKilled (another attempt 
succeeded)
23/08/07 18:10:14 INFO TaskSetManager: task 2594.0 in stage 0.0 (TID 2594) 
failed, but the task will not be re-executed (either because the task failed 
with a shuffle data fetch failure, so the previous stage needs to be re-run, or 
because a different copy of the task has already succeeded).
23/08/07 18:10:14 INFO SparkUI: Stopped Spark web UI at http://xxx:4040
23/08/07 18:10:14 INFO KubernetesClusterSchedulerBackend: Shutting down all 
executors
23/08/07 18:10:14 INFO 
KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each 
executor to shut down
23/08/07 18:10:14 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client 
has been closed.
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

Closes #42381 from dongjoon-hyun/SPARK-44707.

Authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
---
 .../cluster/k8s/ExecutorPodsWatchSnapshotSource.scala  | 14 +++---
 1 file changed, 11 insertions(+), 3 deletions(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala
index 4809222650d..6953ed789f7 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala
@@ -86,12 +86,20 @@ class ExecutorPodsWatchSnapshotSource(
 }
 
 override def onClose(e: WatcherException): Unit = {
-  logWarning("Kubernetes client has been closed (this is expected if the 
application is" +
-" shutting down.)", e)
+  if (SparkContext.getActive.map(_.isStopped).getOrElse(true)) {
+logInfo("Kubernetes client has been closed.")
+  } else {
+logWarning("Kubernetes client has been closed (this is expected if the 
application is" +
+  " shutting down.)", e)
+  }
 }
 
 override def onClose(): Unit = {
-  logWarning("Kubernetes client has been closed.")
+  if (SparkContext.getActive.map(_.isStopped).getOrElse(true)) {
+logInfo("Kubernetes client has been closed.")
+  } else {
+logWarning("Kubernetes client has been closed.")
+  }
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.5 updated: [SPARK-44561][PYTHON] Fix AssertionError when converting UDTF output to a complex type

2023-08-07 Thread ueshin
This is an automated email from the ASF dual-hosted git repository.

ueshin pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new ea6b41cb398 [SPARK-44561][PYTHON] Fix AssertionError when converting 
UDTF output to a complex type
ea6b41cb398 is described below

commit ea6b41cb3989996e45102b1930b1498324761093
Author: Takuya UESHIN 
AuthorDate: Mon Aug 7 11:48:24 2023 -0700

[SPARK-44561][PYTHON] Fix AssertionError when converting UDTF output to a 
complex type

### What changes were proposed in this pull request?

Fixes AssertionError when converting UDTF output to a complex type by 
ignore assertions in `_create_converter_from_pandas` to make Arrow raise an 
error.

### Why are the changes needed?

There is an assertion in `_create_converter_from_pandas`, but it should not 
be applied for Python UDTF case.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added/modified the related tests.

Closes #42310 from ueshin/issues/SPARK-44561/udtf_complex_types.

Authored-by: Takuya UESHIN 
Signed-off-by: Takuya UESHIN 
(cherry picked from commit f1a161cb39504bd625ea7fa50d2cc72a1a2a59e9)
Signed-off-by: Takuya UESHIN 
---
 python/pyspark/sql/pandas/serializers.py   |   5 +-
 python/pyspark/sql/pandas/types.py | 108 ++---
 .../pyspark/sql/tests/connect/test_parity_udtf.py  |   3 +
 python/pyspark/sql/tests/test_udtf.py  | 247 +++--
 4 files changed, 314 insertions(+), 49 deletions(-)

diff --git a/python/pyspark/sql/pandas/serializers.py 
b/python/pyspark/sql/pandas/serializers.py
index f3037c8b39c..d1a3babb1fd 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -571,7 +571,10 @@ class 
ArrowStreamPandasUDTFSerializer(ArrowStreamPandasUDFSerializer):
 dt = spark_type or from_arrow_type(arrow_type, 
prefer_timestamp_ntz=True)
 # TODO(SPARK-43579): cache the converter for reuse
 conv = _create_converter_from_pandas(
-dt, timezone=self._timezone, 
error_on_duplicated_field_names=False
+dt,
+timezone=self._timezone,
+error_on_duplicated_field_names=False,
+ignore_unexpected_complex_type_values=True,
 )
 series = conv(series)
 
diff --git a/python/pyspark/sql/pandas/types.py 
b/python/pyspark/sql/pandas/types.py
index 53362047604..b02a003e632 100644
--- a/python/pyspark/sql/pandas/types.py
+++ b/python/pyspark/sql/pandas/types.py
@@ -21,7 +21,7 @@ pandas instances during the type conversion.
 """
 import datetime
 import itertools
-from typing import Any, Callable, List, Optional, Union, TYPE_CHECKING
+from typing import Any, Callable, Iterable, List, Optional, Union, 
TYPE_CHECKING
 
 from pyspark.sql.types import (
 cast,
@@ -750,6 +750,7 @@ def _create_converter_from_pandas(
 *,
 timezone: Optional[str],
 error_on_duplicated_field_names: bool = True,
+ignore_unexpected_complex_type_values: bool = False,
 ) -> Callable[["pd.Series"], "pd.Series"]:
 """
 Create a converter of pandas Series to create Spark DataFrame with Arrow 
optimization.
@@ -763,6 +764,17 @@ def _create_converter_from_pandas(
 error_on_duplicated_field_names : bool, optional
 Whether raise an exception when there are duplicated field names.
 (default ``True``)
+ignore_unexpected_complex_type_values : bool, optional
+Whether ignore the case where unexpected values are given for complex 
types.
+If ``False``, each complex type expects:
+
+* array type: :class:`Iterable`
+* map type: :class:`dict`
+* struct type: :class:`dict` or :class:`tuple`
+
+and raise an AssertionError when the given value is not the expected 
type.
+If ``True``, just ignore and return the give value.
+(default ``False``)
 
 Returns
 ---
@@ -781,15 +793,26 @@ def _create_converter_from_pandas(
 def _converter(dt: DataType) -> Optional[Callable[[Any], Any]]:
 
 if isinstance(dt, ArrayType):
-_element_conv = _converter(dt.elementType)
-if _element_conv is None:
-return None
+_element_conv = _converter(dt.elementType) or (lambda x: x)
 
-def convert_array(value: Any) -> Any:
-if value is None:
-return None
-else:
-return [_element_conv(v) for v in value]  # type: 
ignore[misc]
+if ignore_unexpected_complex_type_values:
+
+def convert_array(value: Any) -> Any:
+if value is None:
+return None
+elif isinstance(value, Itera

[spark] branch master updated: [SPARK-44561][PYTHON] Fix AssertionError when converting UDTF output to a complex type

2023-08-07 Thread ueshin
This is an automated email from the ASF dual-hosted git repository.

ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f1a161cb395 [SPARK-44561][PYTHON] Fix AssertionError when converting 
UDTF output to a complex type
f1a161cb395 is described below

commit f1a161cb39504bd625ea7fa50d2cc72a1a2a59e9
Author: Takuya UESHIN 
AuthorDate: Mon Aug 7 11:48:24 2023 -0700

[SPARK-44561][PYTHON] Fix AssertionError when converting UDTF output to a 
complex type

### What changes were proposed in this pull request?

Fixes AssertionError when converting UDTF output to a complex type by 
ignore assertions in `_create_converter_from_pandas` to make Arrow raise an 
error.

### Why are the changes needed?

There is an assertion in `_create_converter_from_pandas`, but it should not 
be applied for Python UDTF case.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added/modified the related tests.

Closes #42310 from ueshin/issues/SPARK-44561/udtf_complex_types.

Authored-by: Takuya UESHIN 
Signed-off-by: Takuya UESHIN 
---
 python/pyspark/sql/pandas/serializers.py   |   5 +-
 python/pyspark/sql/pandas/types.py | 108 ++---
 .../pyspark/sql/tests/connect/test_parity_udtf.py  |   3 +
 python/pyspark/sql/tests/test_udtf.py  | 247 +++--
 4 files changed, 314 insertions(+), 49 deletions(-)

diff --git a/python/pyspark/sql/pandas/serializers.py 
b/python/pyspark/sql/pandas/serializers.py
index f3037c8b39c..d1a3babb1fd 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -571,7 +571,10 @@ class 
ArrowStreamPandasUDTFSerializer(ArrowStreamPandasUDFSerializer):
 dt = spark_type or from_arrow_type(arrow_type, 
prefer_timestamp_ntz=True)
 # TODO(SPARK-43579): cache the converter for reuse
 conv = _create_converter_from_pandas(
-dt, timezone=self._timezone, 
error_on_duplicated_field_names=False
+dt,
+timezone=self._timezone,
+error_on_duplicated_field_names=False,
+ignore_unexpected_complex_type_values=True,
 )
 series = conv(series)
 
diff --git a/python/pyspark/sql/pandas/types.py 
b/python/pyspark/sql/pandas/types.py
index 53362047604..b02a003e632 100644
--- a/python/pyspark/sql/pandas/types.py
+++ b/python/pyspark/sql/pandas/types.py
@@ -21,7 +21,7 @@ pandas instances during the type conversion.
 """
 import datetime
 import itertools
-from typing import Any, Callable, List, Optional, Union, TYPE_CHECKING
+from typing import Any, Callable, Iterable, List, Optional, Union, 
TYPE_CHECKING
 
 from pyspark.sql.types import (
 cast,
@@ -750,6 +750,7 @@ def _create_converter_from_pandas(
 *,
 timezone: Optional[str],
 error_on_duplicated_field_names: bool = True,
+ignore_unexpected_complex_type_values: bool = False,
 ) -> Callable[["pd.Series"], "pd.Series"]:
 """
 Create a converter of pandas Series to create Spark DataFrame with Arrow 
optimization.
@@ -763,6 +764,17 @@ def _create_converter_from_pandas(
 error_on_duplicated_field_names : bool, optional
 Whether raise an exception when there are duplicated field names.
 (default ``True``)
+ignore_unexpected_complex_type_values : bool, optional
+Whether ignore the case where unexpected values are given for complex 
types.
+If ``False``, each complex type expects:
+
+* array type: :class:`Iterable`
+* map type: :class:`dict`
+* struct type: :class:`dict` or :class:`tuple`
+
+and raise an AssertionError when the given value is not the expected 
type.
+If ``True``, just ignore and return the give value.
+(default ``False``)
 
 Returns
 ---
@@ -781,15 +793,26 @@ def _create_converter_from_pandas(
 def _converter(dt: DataType) -> Optional[Callable[[Any], Any]]:
 
 if isinstance(dt, ArrayType):
-_element_conv = _converter(dt.elementType)
-if _element_conv is None:
-return None
+_element_conv = _converter(dt.elementType) or (lambda x: x)
 
-def convert_array(value: Any) -> Any:
-if value is None:
-return None
-else:
-return [_element_conv(v) for v in value]  # type: 
ignore[misc]
+if ignore_unexpected_complex_type_values:
+
+def convert_array(value: Any) -> Any:
+if value is None:
+return None
+elif isinstance(value, Iterable):
+return [_element_conv(v) for v in value]
+else:
+   

[spark] branch master updated: [SPARK-38475][CORE] Use error class in org.apache.spark.serializer

2023-08-07 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 2a23c7a18a0 [SPARK-38475][CORE] Use error class in 
org.apache.spark.serializer
2a23c7a18a0 is described below

commit 2a23c7a18a0ba75d95ee1d898896a8f0dc2c5531
Author: Bo Zhang 
AuthorDate: Mon Aug 7 22:10:01 2023 +0500

[SPARK-38475][CORE] Use error class in org.apache.spark.serializer

### What changes were proposed in this pull request?
This PR aims to change exceptions created in package 
org.apache.spark.serializer to use error class.

### Why are the changes needed?
This is to move exceptions created in package org.apache.spark.serializer 
to error class.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing tests.

Closes #42243 from bozhang2820/spark-38475.

Lead-authored-by: Bo Zhang 
Co-authored-by: Bo Zhang 
Signed-off-by: Max Gekk 
---
 .../src/main/resources/error/error-classes.json| 21 +
 .../spark/serializer/GenericAvroSerializer.scala   |  6 ++---
 .../apache/spark/serializer/KryoSerializer.scala   | 27 --
 docs/sql-error-conditions.md   | 24 +++
 4 files changed, 68 insertions(+), 10 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index 680f787429c..0ea1eed35e4 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -831,6 +831,11 @@
   "Not found an encoder of the type  to Spark SQL internal 
representation. Consider to change the input type to one of supported at 
'/sql-ref-datatypes.html'."
 ]
   },
+  "ERROR_READING_AVRO_UNKNOWN_FINGERPRINT" : {
+"message" : [
+  "Error reading avro data -- encountered an unknown fingerprint: 
, not sure what schema to use. This could happen if you registered 
additional schemas after starting your spark context."
+]
+  },
   "EVENT_TIME_IS_NOT_ON_TIMESTAMP_TYPE" : {
 "message" : [
   "The event time  has the invalid type , but 
expected \"TIMESTAMP\"."
@@ -864,6 +869,11 @@
 ],
 "sqlState" : "22018"
   },
+  "FAILED_REGISTER_CLASS_WITH_KRYO" : {
+"message" : [
+  "Failed to register classes with Kryo."
+]
+  },
   "FAILED_RENAME_PATH" : {
 "message" : [
   "Failed to rename  to  as destination already 
exists."
@@ -1564,6 +1574,12 @@
 ],
 "sqlState" : "22032"
   },
+  "INVALID_KRYO_SERIALIZER_BUFFER_SIZE" : {
+"message" : [
+  "The value of the config \"\" must be less than 2048 
MiB, but got  MiB."
+],
+"sqlState" : "F"
+  },
   "INVALID_LAMBDA_FUNCTION_CALL" : {
 "message" : [
   "Invalid lambda function call."
@@ -2006,6 +2022,11 @@
   "The join condition  has the invalid type 
, expected \"BOOLEAN\"."
 ]
   },
+  "KRYO_BUFFER_OVERFLOW" : {
+"message" : [
+  "Kryo serialization failed: . To avoid this, increase 
\"\" value."
+]
+  },
   "LOAD_DATA_PATH_NOT_EXISTS" : {
 "message" : [
   "LOAD DATA input path does not exist: ."
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala
index 7d2923fdf37..d09abff2773 100644
--- 
a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala
@@ -140,9 +140,9 @@ private[serializer] class GenericAvroSerializer[D <: 
GenericContainer]
 case Some(s) => new 
Schema.Parser().setValidateDefaults(false).parse(s)
 case None =>
   throw new SparkException(
-"Error reading attempting to read avro data -- encountered an 
unknown " +
-  s"fingerprint: $fingerprint, not sure what schema to use.  
This could happen " +
-  "if you registered additional schemas after starting your 
spark context.")
+errorClass = "ERROR_READING_AVRO_UNKNOWN_FINGERPRINT",
+messageParameters = Map("fingerprint" -> fingerprint.toString),
+cause = null)
   }
 })
   } else {
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 826d6789f88..f75942cbb87 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -66,15 +66,21 @@ class KryoSerializer(conf: SparkConf)
   private val bufferSizeKb = conf.get(KRYO_SERIALIZER_BUFFER_SIZE)
 
   if (bufferSizeKb >= 

[spark] branch master updated: [SPARK-44697][CORE] Clean up the deprecated usage of `o.a.commons.lang3.RandomUtils`

2023-08-07 Thread yangjie01
This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new a3a32912be0 [SPARK-44697][CORE] Clean up the deprecated usage of 
`o.a.commons.lang3.RandomUtils`
a3a32912be0 is described below

commit a3a32912be04d3760cb34eb4b79d6d481bbec502
Author: yangjie01 
AuthorDate: Mon Aug 7 21:42:38 2023 +0800

[SPARK-44697][CORE] Clean up the deprecated usage of 
`o.a.commons.lang3.RandomUtils`

### What changes were proposed in this pull request?
In `commons-lang3` 3.13.0, `RandomUtils` has been marked as `Deprecated`, 
the Java doc of `commons-lang3` suggests to instead use the api of 
`commons-rng`.


https://github.com/apache/commons-lang/blob/bcc10b359318397a4d12dbaef22b101725bc6323/src/main/java/org/apache/commons/lang3/RandomUtils.java#L33

```
 * deprecated Use Apache Commons RNG's optimized https://commons.apache.org/proper/commons-rng/commons-rng-client-api/apidocs/org/apache/commons/rng/UniformRandomProvider.html";>UniformRandomProvider

```

However, as Spark only uses `RandomUtils` in test code, so this pr attempts 
to replace `RandomUtils` with `ThreadLocalRandom` to avoid introducing 
additional third-party dependencies.

### Why are the changes needed?
Clean up the use of Deprecated api.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

Closes #42370 from LuciferYang/RandomUtils-2-ThreadLocalRandom.

Authored-by: yangjie01 
Signed-off-by: yangjie01 
---
 .../java/org/apache/spark/io/GenericFileInputStreamSuite.java | 8 
 .../org/apache/spark/deploy/master/PersistenceEngineSuite.scala   | 4 ++--
 .../scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala  | 4 ++--
 .../test/scala/org/apache/spark/storage/BlockManagerSuite.scala   | 6 +++---
 4 files changed, 11 insertions(+), 11 deletions(-)

diff --git 
a/core/src/test/java/org/apache/spark/io/GenericFileInputStreamSuite.java 
b/core/src/test/java/org/apache/spark/io/GenericFileInputStreamSuite.java
index ef7c4cbbb79..4bfb4a2c68c 100644
--- a/core/src/test/java/org/apache/spark/io/GenericFileInputStreamSuite.java
+++ b/core/src/test/java/org/apache/spark/io/GenericFileInputStreamSuite.java
@@ -17,7 +17,6 @@
 package org.apache.spark.io;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.RandomUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -25,6 +24,7 @@ import org.junit.Test;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.concurrent.ThreadLocalRandom;
 
 import static org.junit.Assert.assertEquals;
 
@@ -33,7 +33,8 @@ import static org.junit.Assert.assertEquals;
  */
 public abstract class GenericFileInputStreamSuite {
 
-  private byte[] randomBytes;
+  // Create a byte array of size 2 MB with random bytes
+  private byte[] randomBytes = new byte[2 * 1024 * 1024];
 
   protected File inputFile;
 
@@ -41,8 +42,7 @@ public abstract class GenericFileInputStreamSuite {
 
   @Before
   public void setUp() throws IOException {
-// Create a byte array of size 2 MB with random bytes
-randomBytes =  RandomUtils.nextBytes(2 * 1024 * 1024);
+ThreadLocalRandom.current().nextBytes(randomBytes);
 inputFile = File.createTempFile("temp-file", ".tmp");
 FileUtils.writeByteArrayToFile(inputFile, randomBytes);
   }
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
index 39607621b4c..998ad21a50d 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
@@ -19,8 +19,8 @@
 package org.apache.spark.deploy.master
 
 import java.net.ServerSocket
+import java.util.concurrent.ThreadLocalRandom
 
-import org.apache.commons.lang3.RandomUtils
 import org.apache.curator.test.TestingServer
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
@@ -117,7 +117,7 @@ class PersistenceEngineSuite extends SparkFunSuite {
   }
 
   private def findFreePort(conf: SparkConf): Int = {
-val candidatePort = RandomUtils.nextInt(1024, 65536)
+val candidatePort = ThreadLocalRandom.current().nextInt(1024, 65536)
 Utils.startServiceOnPort(candidatePort, (trialPort: Int) => {
   val socket = new ServerSocket(trialPort)
   socket.close()
diff --git 
a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala 
b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
index 905bb811073..3e69f01c09c 100644
--- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scal

[spark] branch branch-3.4 updated: [SPARK-44634][SQL][3.4] Encoders.bean does no longer support nested beans with type arguments

2023-08-07 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new 1f5d78b5952 [SPARK-44634][SQL][3.4] Encoders.bean does no longer 
support nested beans with type arguments
1f5d78b5952 is described below

commit 1f5d78b5952fcc6c7d36d3338a5594070e3a62dd
Author: Giambattista Bloisi 
AuthorDate: Mon Aug 7 15:11:02 2023 +0200

[SPARK-44634][SQL][3.4] Encoders.bean does no longer support nested beans 
with type arguments

### What changes were proposed in this pull request?
This is a port of [42327](https://github.com/apache/spark/pull/42327)

This PR fixes a regression introduced in Spark 3.4.x  where  Encoders.bean 
is no longer able to process nested beans having type arguments. For example:

```
class A {
   T value;
   // value getter and setter
}

class B {
   A stringHolder;
   // stringHolder getter and setter
}

Encoders.bean(B.class); // throws "SparkUnsupportedOperationException: 
[ENCODER_NOT_FOUND]..."
```

### Why are the changes needed?
JavaTypeInference.encoderFor main match does not manage ParameterizedType 
and TypeVariable cases. I think this is a regression introduced after getting 
rid of usage of guava TypeToken: [SPARK-42093 SQL Move JavaTypeInference to 
AgnosticEncoders](https://github.com/apache/spark/commit/18672003513d5a4aa610b6b94dbbc15c33185d3#diff-1191737b908340a2f4c22b71b1c40ebaa0da9d8b40c958089c346a3bda26943b)
 hvanhovell cloud-fan

In this PR I'm leveraging commons lang3 TypeUtils functionalities to solve 
ParameterizedType type arguments for classes

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing tests have been extended to check correct encoding of a nested 
bean having type arguments.

Closes #42379 from gbloisi-openaire/spark-44634-branch-3.4.

Authored-by: Giambattista Bloisi 
Signed-off-by: Herman van Hovell 
---
 .../spark/sql/catalyst/JavaTypeInference.scala | 85 +-
 .../spark/sql/catalyst/JavaBeanWithGenerics.java   | 41 +++
 .../sql/catalyst/JavaTypeInferenceSuite.scala  |  4 +
 3 files changed, 65 insertions(+), 65 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
index 36b98737a20..75aca3ccbdd 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
@@ -18,12 +18,14 @@ package org.apache.spark.sql.catalyst
 
 import java.beans.{Introspector, PropertyDescriptor}
 import java.lang.reflect.{ParameterizedType, Type, TypeVariable}
-import java.util.{ArrayDeque, List => JList, Map => JMap}
+import java.util.{List => JList, Map => JMap}
 import javax.annotation.Nonnull
 
-import scala.annotation.tailrec
+import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
+import org.apache.commons.lang3.reflect.{TypeUtils => JavaTypeUtils}
+
 import org.apache.spark.SPARK_DOC_ROOT
 import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
 import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, 
BinaryEncoder, BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, 
BoxedFloatEncoder, BoxedIntEncoder, BoxedLongEncoder, BoxedShortEncoder, 
DayTimeIntervalEncoder, DEFAULT_JAVA_DECIMAL_ENCODER, EncoderField, 
IterableEncoder, JavaBeanEncoder, JavaBigIntEncoder, JavaEnumEncoder, 
LocalDateTimeEncoder, MapEncoder, PrimitiveBooleanEncoder, 
PrimitiveByteEncoder, PrimitiveDoubleEncoder, PrimitiveFloatEncoder, P [...]
@@ -58,7 +60,8 @@ object JavaTypeInference {
 encoderFor(beanType, Set.empty).asInstanceOf[AgnosticEncoder[T]]
   }
 
-  private def encoderFor(t: Type, seenTypeSet: Set[Class[_]]): 
AgnosticEncoder[_] = t match {
+  private def encoderFor(t: Type, seenTypeSet: Set[Class[_]],
+typeVariables: Map[TypeVariable[_], Type] = Map.empty): AgnosticEncoder[_] 
= t match {
 
 case c: Class[_] if c == java.lang.Boolean.TYPE => PrimitiveBooleanEncoder
 case c: Class[_] if c == java.lang.Byte.TYPE => PrimitiveByteEncoder
@@ -102,18 +105,24 @@ object JavaTypeInference {
   UDTEncoder(udt, udt.getClass)
 
 case c: Class[_] if c.isArray =>
-  val elementEncoder = encoderFor(c.getComponentType, seenTypeSet)
+  val elementEncoder = encoderFor(c.getComponentType, seenTypeSet, 
typeVariables)
   ArrayEncoder(elementEncoder, elementEncoder.nullable)
 
-case ImplementsList(c, Array(elementCls)) =>
-  val element = encoderFor(elementCls, seenTypeSet)
+case c: Class[_] if classOf[JList[_]].isAssignableFrom(c) =>
+  val

[spark] branch branch-3.5 updated: [SPARK-44686][CONNECT][SQL] Add the ability to create a RowEncoder in Encoders.scala

2023-08-07 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 8f6f301fa77 [SPARK-44686][CONNECT][SQL] Add the ability to create a 
RowEncoder in Encoders.scala
8f6f301fa77 is described below

commit 8f6f301fa778dfd0fd7dec4a29df7106846d3277
Author: Herman van Hovell 
AuthorDate: Mon Aug 7 15:09:58 2023 +0200

[SPARK-44686][CONNECT][SQL] Add the ability to create a RowEncoder in 
Encoders.scala

### What changes were proposed in this pull request?
### Why are the changes needed?
It is currently not possible to create a `RowEncoder` using public API. The 
internal APIs for this will change in Spark 3.5, this means that library 
maintainers have to update their code if they use a RowEncoder. To avoid 
happening again, we add this method to the public API.

### Does this PR introduce _any_ user-facing change?
Yes. It adds the `row` method to `Encoders`.

### How was this patch tested?
Added tests to connect and sql.

Closes #42366 from hvanhovell/SPARK-44686.

Lead-authored-by: Herman van Hovell 
Co-authored-by: Hyukjin Kwon 
Signed-off-by: Herman van Hovell 
(cherry picked from commit bf7654998fbbec9d5bdee6f46462cffef495545f)
Signed-off-by: Herman van Hovell 
---
 .../main/scala/org/apache/spark/sql/Encoders.scala | 10 ++-
 .../org/apache/spark/sql/JavaEncoderSuite.java | 31 +++---
 project/MimaExcludes.scala |  2 ++
 .../main/java/org/apache/spark/sql/RowFactory.java |  0
 .../main/scala/org/apache/spark/sql/Encoders.scala |  7 +
 .../org/apache/spark/sql/JavaDatasetSuite.java | 19 +
 6 files changed, 64 insertions(+), 5 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Encoders.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Encoders.scala
index 3f2f7ec96d4..74f01338031 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Encoders.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Encoders.scala
@@ -19,8 +19,9 @@ package org.apache.spark.sql
 import scala.reflect.runtime.universe.TypeTag
 
 import org.apache.spark.sql.catalyst.{JavaTypeInference, ScalaReflection}
-import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
+import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, RowEncoder => 
RowEncoderFactory}
 import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders._
+import org.apache.spark.sql.types.StructType
 
 /**
  * Methods for creating an [[Encoder]].
@@ -168,6 +169,13 @@ object Encoders {
*/
   def bean[T](beanClass: Class[T]): Encoder[T] = 
JavaTypeInference.encoderFor(beanClass)
 
+  /**
+   * Creates a [[Row]] encoder for schema `schema`.
+   *
+   * @since 3.5.0
+   */
+  def row(schema: StructType): Encoder[Row] = 
RowEncoderFactory.encoderFor(schema)
+
   private def tupleEncoder[T](encoders: Encoder[_]*): Encoder[T] = {
 
ProductEncoder.tuple(encoders.asInstanceOf[Seq[AgnosticEncoder[_]]]).asInstanceOf[Encoder[T]]
   }
diff --git 
a/connector/connect/client/jvm/src/test/java/org/apache/spark/sql/JavaEncoderSuite.java
 
b/connector/connect/client/jvm/src/test/java/org/apache/spark/sql/JavaEncoderSuite.java
index c8210a7a485..6e5fb72d496 100644
--- 
a/connector/connect/client/jvm/src/test/java/org/apache/spark/sql/JavaEncoderSuite.java
+++ 
b/connector/connect/client/jvm/src/test/java/org/apache/spark/sql/JavaEncoderSuite.java
@@ -16,21 +16,26 @@
  */
 package org.apache.spark.sql;
 
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.List;
+
 import org.junit.*;
 import static org.junit.Assert.*;
 
 import static org.apache.spark.sql.Encoders.*;
 import static org.apache.spark.sql.functions.*;
+import static org.apache.spark.sql.RowFactory.create;
 import org.apache.spark.sql.connect.client.SparkConnectClient;
 import org.apache.spark.sql.connect.client.util.SparkConnectServerUtils;
-
-import java.math.BigDecimal;
-import java.util.Arrays;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.types.StructType;
 
 /**
  * Tests for the encoders class.
  */
-public class JavaEncoderSuite {
+public class JavaEncoderSuite implements Serializable {
   private static SparkSession spark;
 
   @BeforeClass
@@ -91,4 +96,22 @@ public class JavaEncoderSuite {
 dataset(DECIMAL(), bigDec(1000, 2), bigDec(2, 2))
 .select(sum(v)).as(DECIMAL()).head().setScale(2));
   }
+
+  @Test
+  public void testRowEncoder() {
+final StructType schema = new StructType()
+.add("a", "int")
+.add("b", "string");
+final Dataset df = spark.range(3)
+.map(new MapFunction() {
+   @Override
+

[spark] branch master updated: [SPARK-44686][CONNECT][SQL] Add the ability to create a RowEncoder in Encoders.scala

2023-08-07 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new bf7654998fb [SPARK-44686][CONNECT][SQL] Add the ability to create a 
RowEncoder in Encoders.scala
bf7654998fb is described below

commit bf7654998fbbec9d5bdee6f46462cffef495545f
Author: Herman van Hovell 
AuthorDate: Mon Aug 7 15:09:58 2023 +0200

[SPARK-44686][CONNECT][SQL] Add the ability to create a RowEncoder in 
Encoders.scala

### What changes were proposed in this pull request?
### Why are the changes needed?
It is currently not possible to create a `RowEncoder` using public API. The 
internal APIs for this will change in Spark 3.5, this means that library 
maintainers have to update their code if they use a RowEncoder. To avoid 
happening again, we add this method to the public API.

### Does this PR introduce _any_ user-facing change?
Yes. It adds the `row` method to `Encoders`.

### How was this patch tested?
Added tests to connect and sql.

Closes #42366 from hvanhovell/SPARK-44686.

Lead-authored-by: Herman van Hovell 
Co-authored-by: Hyukjin Kwon 
Signed-off-by: Herman van Hovell 
---
 .../main/scala/org/apache/spark/sql/Encoders.scala | 10 ++-
 .../org/apache/spark/sql/JavaEncoderSuite.java | 31 +++---
 project/MimaExcludes.scala |  2 ++
 .../main/java/org/apache/spark/sql/RowFactory.java |  0
 .../main/scala/org/apache/spark/sql/Encoders.scala |  7 +
 .../org/apache/spark/sql/JavaDatasetSuite.java | 19 +
 6 files changed, 64 insertions(+), 5 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Encoders.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Encoders.scala
index 3f2f7ec96d4..74f01338031 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Encoders.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Encoders.scala
@@ -19,8 +19,9 @@ package org.apache.spark.sql
 import scala.reflect.runtime.universe.TypeTag
 
 import org.apache.spark.sql.catalyst.{JavaTypeInference, ScalaReflection}
-import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
+import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, RowEncoder => 
RowEncoderFactory}
 import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders._
+import org.apache.spark.sql.types.StructType
 
 /**
  * Methods for creating an [[Encoder]].
@@ -168,6 +169,13 @@ object Encoders {
*/
   def bean[T](beanClass: Class[T]): Encoder[T] = 
JavaTypeInference.encoderFor(beanClass)
 
+  /**
+   * Creates a [[Row]] encoder for schema `schema`.
+   *
+   * @since 3.5.0
+   */
+  def row(schema: StructType): Encoder[Row] = 
RowEncoderFactory.encoderFor(schema)
+
   private def tupleEncoder[T](encoders: Encoder[_]*): Encoder[T] = {
 
ProductEncoder.tuple(encoders.asInstanceOf[Seq[AgnosticEncoder[_]]]).asInstanceOf[Encoder[T]]
   }
diff --git 
a/connector/connect/client/jvm/src/test/java/org/apache/spark/sql/JavaEncoderSuite.java
 
b/connector/connect/client/jvm/src/test/java/org/apache/spark/sql/JavaEncoderSuite.java
index c8210a7a485..6e5fb72d496 100644
--- 
a/connector/connect/client/jvm/src/test/java/org/apache/spark/sql/JavaEncoderSuite.java
+++ 
b/connector/connect/client/jvm/src/test/java/org/apache/spark/sql/JavaEncoderSuite.java
@@ -16,21 +16,26 @@
  */
 package org.apache.spark.sql;
 
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.List;
+
 import org.junit.*;
 import static org.junit.Assert.*;
 
 import static org.apache.spark.sql.Encoders.*;
 import static org.apache.spark.sql.functions.*;
+import static org.apache.spark.sql.RowFactory.create;
 import org.apache.spark.sql.connect.client.SparkConnectClient;
 import org.apache.spark.sql.connect.client.util.SparkConnectServerUtils;
-
-import java.math.BigDecimal;
-import java.util.Arrays;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.types.StructType;
 
 /**
  * Tests for the encoders class.
  */
-public class JavaEncoderSuite {
+public class JavaEncoderSuite implements Serializable {
   private static SparkSession spark;
 
   @BeforeClass
@@ -91,4 +96,22 @@ public class JavaEncoderSuite {
 dataset(DECIMAL(), bigDec(1000, 2), bigDec(2, 2))
 .select(sum(v)).as(DECIMAL()).head().setScale(2));
   }
+
+  @Test
+  public void testRowEncoder() {
+final StructType schema = new StructType()
+.add("a", "int")
+.add("b", "string");
+final Dataset df = spark.range(3)
+.map(new MapFunction() {
+   @Override
+   public Row call(Long i) {
+ return create(i.intValue(), "s" + i);
+   }
+   

[spark] branch master updated: [SPARK-44701][PYTHON][TESTS] Skip `ClassificationTestsOnConnect` when `torch` is not installed

2023-08-07 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 433fb2af8a3 [SPARK-44701][PYTHON][TESTS] Skip 
`ClassificationTestsOnConnect` when `torch` is not installed
433fb2af8a3 is described below

commit 433fb2af8a3ca239958cb7b006e2924ecfac0d56
Author: Ruifeng Zheng 
AuthorDate: Mon Aug 7 20:46:38 2023 +0900

[SPARK-44701][PYTHON][TESTS] Skip `ClassificationTestsOnConnect` when 
`torch` is not installed

### What changes were proposed in this pull request?
Skip `ClassificationTestsOnConnect` when `torch` is not installed

### Why are the changes needed?
we moved torch on connect tests to `pyspark_ml_connect`, so module 
`pyspark_connect` won't have `torch`

to fix 
https://github.com/apache/spark/actions/runs/5776211318/job/15655104006 in 3.5 
daily GA:

```
Starting test(python3.9): 
pyspark.ml.tests.connect.test_connect_classification (temp output: 
/__w/spark/spark/python/target/fbb6a495-df65-4334-8c04-4befc9ee81df/python3.9__pyspark.ml.tests.connect.test_connect_classification__jp1htw6f.log)
Traceback (most recent call last):
  File "/usr/lib/python3.9/runpy.py", line 197, in _run_module_as_main
return _run_code(code, main_globals, None,
  File "/usr/lib/python3.9/runpy.py", line 87, in _run_code
exec(code, run_globals)
  File 
"/__w/spark/spark/python/pyspark/ml/tests/connect/test_connect_classification.py",
 line 21, in 
from pyspark.ml.tests.connect.test_legacy_mode_classification import 
ClassificationTestsMixin
  File 
"/__w/spark/spark/python/pyspark/ml/tests/connect/test_legacy_mode_classification.py",
 line 22, in 
from pyspark.ml.connect.classification import (
  File "/__w/spark/spark/python/pyspark/ml/connect/classification.py", line 
46, in 
import torch
ModuleNotFoundError: No module named 'torch'
```

### Does this PR introduce _any_ user-facing change?
no, test-only

### How was this patch tested?
CI

Closes #42375 from zhengruifeng/torch_skip.

Authored-by: Ruifeng Zheng 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/ml/tests/connect/test_connect_classification.py | 7 +++
 1 file changed, 7 insertions(+)

diff --git a/python/pyspark/ml/tests/connect/test_connect_classification.py 
b/python/pyspark/ml/tests/connect/test_connect_classification.py
index 6ad47322234..f3e621c19f0 100644
--- a/python/pyspark/ml/tests/connect/test_connect_classification.py
+++ b/python/pyspark/ml/tests/connect/test_connect_classification.py
@@ -20,7 +20,14 @@ import unittest
 from pyspark.sql import SparkSession
 from pyspark.ml.tests.connect.test_legacy_mode_classification import 
ClassificationTestsMixin
 
+have_torch = True
+try:
+import torch  # noqa: F401
+except ImportError:
+have_torch = False
 
+
+@unittest.skipIf(not have_torch, "torch is required")
 class ClassificationTestsOnConnect(ClassificationTestsMixin, 
unittest.TestCase):
 def setUp(self) -> None:
 self.spark = (


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.5 updated: [SPARK-44701][PYTHON][TESTS] Skip `ClassificationTestsOnConnect` when `torch` is not installed

2023-08-07 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 7ecbdfa2fdb [SPARK-44701][PYTHON][TESTS] Skip 
`ClassificationTestsOnConnect` when `torch` is not installed
7ecbdfa2fdb is described below

commit 7ecbdfa2fdb5d698abd6d09a2d7fd3e81c05d4d1
Author: Ruifeng Zheng 
AuthorDate: Mon Aug 7 20:46:38 2023 +0900

[SPARK-44701][PYTHON][TESTS] Skip `ClassificationTestsOnConnect` when 
`torch` is not installed

### What changes were proposed in this pull request?
Skip `ClassificationTestsOnConnect` when `torch` is not installed

### Why are the changes needed?
we moved torch on connect tests to `pyspark_ml_connect`, so module 
`pyspark_connect` won't have `torch`

to fix 
https://github.com/apache/spark/actions/runs/5776211318/job/15655104006 in 3.5 
daily GA:

```
Starting test(python3.9): 
pyspark.ml.tests.connect.test_connect_classification (temp output: 
/__w/spark/spark/python/target/fbb6a495-df65-4334-8c04-4befc9ee81df/python3.9__pyspark.ml.tests.connect.test_connect_classification__jp1htw6f.log)
Traceback (most recent call last):
  File "/usr/lib/python3.9/runpy.py", line 197, in _run_module_as_main
return _run_code(code, main_globals, None,
  File "/usr/lib/python3.9/runpy.py", line 87, in _run_code
exec(code, run_globals)
  File 
"/__w/spark/spark/python/pyspark/ml/tests/connect/test_connect_classification.py",
 line 21, in 
from pyspark.ml.tests.connect.test_legacy_mode_classification import 
ClassificationTestsMixin
  File 
"/__w/spark/spark/python/pyspark/ml/tests/connect/test_legacy_mode_classification.py",
 line 22, in 
from pyspark.ml.connect.classification import (
  File "/__w/spark/spark/python/pyspark/ml/connect/classification.py", line 
46, in 
import torch
ModuleNotFoundError: No module named 'torch'
```

### Does this PR introduce _any_ user-facing change?
no, test-only

### How was this patch tested?
CI

Closes #42375 from zhengruifeng/torch_skip.

Authored-by: Ruifeng Zheng 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit 433fb2af8a3ca239958cb7b006e2924ecfac0d56)
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/ml/tests/connect/test_connect_classification.py | 7 +++
 1 file changed, 7 insertions(+)

diff --git a/python/pyspark/ml/tests/connect/test_connect_classification.py 
b/python/pyspark/ml/tests/connect/test_connect_classification.py
index 6ad47322234..f3e621c19f0 100644
--- a/python/pyspark/ml/tests/connect/test_connect_classification.py
+++ b/python/pyspark/ml/tests/connect/test_connect_classification.py
@@ -20,7 +20,14 @@ import unittest
 from pyspark.sql import SparkSession
 from pyspark.ml.tests.connect.test_legacy_mode_classification import 
ClassificationTestsMixin
 
+have_torch = True
+try:
+import torch  # noqa: F401
+except ImportError:
+have_torch = False
 
+
+@unittest.skipIf(not have_torch, "torch is required")
 class ClassificationTestsOnConnect(ClassificationTestsMixin, 
unittest.TestCase):
 def setUp(self) -> None:
 self.spark = (


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (1f10cc4a594 -> f139733b92d)

2023-08-07 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 1f10cc4a594 [SPARK-44628][SQL] Clear some unused codes in "***Errors" 
and extract some common logic
 add f139733b92d [SPARK-42321][SQL] Assign name to _LEGACY_ERROR_TEMP_2133

No new revisions were added by this update.

Summary of changes:
 .../utils/src/main/resources/error/error-classes.json | 10 +-
 ...ditions-malformed-record-in-parsing-error-class.md |  4 
 .../spark/sql/catalyst/json/JacksonParser.scala   |  8 
 .../spark/sql/catalyst/util/BadRecordException.scala  |  9 +
 .../spark/sql/catalyst/util/FailureSafeParser.scala   |  3 +++
 .../spark/sql/errors/QueryExecutionErrors.scala   | 19 ---
 .../spark/sql/errors/QueryExecutionErrorsSuite.scala  | 17 +
 7 files changed, 54 insertions(+), 16 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-44628][SQL] Clear some unused codes in "***Errors" and extract some common logic

2023-08-07 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 1f10cc4a594 [SPARK-44628][SQL] Clear some unused codes in "***Errors" 
and extract some common logic
1f10cc4a594 is described below

commit 1f10cc4a59457ed0de0fd4dc0a1c61514d77261a
Author: panbingkun 
AuthorDate: Mon Aug 7 12:01:47 2023 +0500

[SPARK-44628][SQL] Clear some unused codes in "***Errors" and extract some 
common logic

### What changes were proposed in this pull request?
The pr aims to clear some unused codes in "***Errors" and extract some 
common logic.

### Why are the changes needed?
Make code clear.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass GA.

Closes #42238 from panbingkun/clear_error.

Authored-by: panbingkun 
Signed-off-by: Max Gekk 
---
 .../apache/spark/sql/errors/DataTypeErrors.scala   | 18 ++---
 .../apache/spark/sql/errors/QueryErrorsBase.scala  |  6 +-
 .../spark/sql/errors/QueryExecutionErrors.scala| 86 --
 3 files changed, 10 insertions(+), 100 deletions(-)

diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrors.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrors.scala
index 7a34a386cd8..5e52e283338 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrors.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrors.scala
@@ -192,15 +192,7 @@ private[sql] object DataTypeErrors extends 
DataTypeErrorsBase {
   decimalPrecision: Int,
   decimalScale: Int,
   context: SQLQueryContext = null): ArithmeticException = {
-new SparkArithmeticException(
-  errorClass = "NUMERIC_VALUE_OUT_OF_RANGE",
-  messageParameters = Map(
-"value" -> value.toPlainString,
-"precision" -> decimalPrecision.toString,
-"scale" -> decimalScale.toString,
-"config" -> toSQLConf("spark.sql.ansi.enabled")),
-  context = getQueryContext(context),
-  summary = getSummary(context))
+numericValueOutOfRange(value, decimalPrecision, decimalScale, context)
   }
 
   def cannotChangeDecimalPrecisionError(
@@ -208,6 +200,14 @@ private[sql] object DataTypeErrors extends 
DataTypeErrorsBase {
   decimalPrecision: Int,
   decimalScale: Int,
   context: SQLQueryContext = null): ArithmeticException = {
+numericValueOutOfRange(value, decimalPrecision, decimalScale, context)
+  }
+
+  private def numericValueOutOfRange(
+  value: Decimal,
+  decimalPrecision: Int,
+  decimalScale: Int,
+  context: SQLQueryContext): ArithmeticException = {
 new SparkArithmeticException(
   errorClass = "NUMERIC_VALUE_OUT_OF_RANGE",
   messageParameters = Map(
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryErrorsBase.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryErrorsBase.scala
index db256fbee87..26600117a0c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryErrorsBase.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryErrorsBase.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.errors
 
 import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
-import org.apache.spark.sql.catalyst.util.{toPrettySQL, QuotingUtils}
+import org.apache.spark.sql.catalyst.util.toPrettySQL
 import org.apache.spark.sql.types.{DataType, DoubleType, FloatType}
 
 /**
@@ -55,10 +55,6 @@ private[sql] trait QueryErrorsBase extends 
DataTypeErrorsBase {
 quoteByDefault(toPrettySQL(e))
   }
 
-  def toSQLSchema(schema: String): String = {
-QuotingUtils.toSQLSchema(schema)
-  }
-
   // Converts an error class parameter to its SQL representation
   def toSQLValue(v: Any, t: DataType): String = Literal.create(v, t) match {
 case Literal(null, _) => "NULL"
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 45b5d6b6692..f960a091ec0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -32,7 +32,6 @@ import org.apache.spark._
 import org.apache.spark.launcher.SparkLauncher
 import org.apache.spark.memory.SparkOutOfMemoryError
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.ScalaReflection.Schema
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedGenerator
 import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable}
@@ -183,10 +182,6 @@ private[sql] object QueryExecutionErrors extends