[spark] branch master updated (590b77f7628 -> b4b91212b1d)
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 590b77f7628 [SPARK-44005][PYTHON] Improve error messages for regular Python UDTFs that return non-tuple values add b4b91212b1d [SPARK-44703][CORE] Log eventLog rewrite duration when compact old event log files No new revisions were added by this update. Summary of changes: .../scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala | 3 +++ 1 file changed, 3 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.5 updated: [SPARK-44005][PYTHON] Improve error messages for regular Python UDTFs that return non-tuple values
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new b7a9b85716c [SPARK-44005][PYTHON] Improve error messages for regular Python UDTFs that return non-tuple values b7a9b85716c is described below commit b7a9b85716c7fb0aedb78bb8a04b00ba6c132d28 Author: allisonwang-db AuthorDate: Tue Aug 8 11:09:58 2023 +0800 [SPARK-44005][PYTHON] Improve error messages for regular Python UDTFs that return non-tuple values ### What changes were proposed in this pull request? This PR improves error messages for regular Python UDTFs when the result rows are not one of tuple, list and dict. Note this is supported when arrow optimization is enabled. ### Why are the changes needed? To make Python UDTFs more user friendly. ### Does this PR introduce _any_ user-facing change? Yes. ``` class TestUDTF: def eval(self, a: int): yield a ``` Before this PR, this will fail with this error `Unexpected tuple 1 with StructType` After this PR, this will have a more user-friendly error: `[UDTF_INVALID_OUTPUT_ROW_TYPE] The type of an individual output row in the UDTF is invalid. Each row should be a tuple, list, or dict, but got 'int'. Please make sure that the output rows are of the correct type.` ### How was this patch tested? Existing UTs. Closes #42353 from allisonwang-db/spark-44005-non-tuple-return-val. Authored-by: allisonwang-db Signed-off-by: Ruifeng Zheng (cherry picked from commit 590b77f76284ad03ad8b3b6d30b23983c66513fc) Signed-off-by: Ruifeng Zheng --- python/pyspark/errors/error_classes.py | 5 + python/pyspark/sql/tests/test_udtf.py | 26 +++--- python/pyspark/worker.py | 12 +--- 3 files changed, 25 insertions(+), 18 deletions(-) diff --git a/python/pyspark/errors/error_classes.py b/python/pyspark/errors/error_classes.py index 937a8758404..279812ebae1 100644 --- a/python/pyspark/errors/error_classes.py +++ b/python/pyspark/errors/error_classes.py @@ -733,6 +733,11 @@ ERROR_CLASSES_JSON = """ "User defined table function encountered an error in the '' method: " ] }, + "UDTF_INVALID_OUTPUT_ROW_TYPE" : { +"message" : [ +"The type of an individual output row in the UDTF is invalid. Each row should be a tuple, list, or dict, but got ''. Please make sure that the output rows are of the correct type." +] + }, "UDTF_RETURN_NOT_ITERABLE" : { "message" : [ "The return value of the UDTF is invalid. It should be an iterable (e.g., generator or list), but got ''. Please make sure that the UDTF returns one of these types." diff --git a/python/pyspark/sql/tests/test_udtf.py b/python/pyspark/sql/tests/test_udtf.py index 0540ecddde7..d152228fc52 100644 --- a/python/pyspark/sql/tests/test_udtf.py +++ b/python/pyspark/sql/tests/test_udtf.py @@ -139,24 +139,21 @@ class BaseUDTFTestsMixin: self.assertEqual(rows, [Row(a=1, b=2), Row(a=2, b=3)]) def test_udtf_eval_returning_non_tuple(self): +@udtf(returnType="a: int") class TestUDTF: def eval(self, a: int): yield a -func = udtf(TestUDTF, returnType="a: int") -# TODO(SPARK-44005): improve this error message -with self.assertRaisesRegex(PythonException, "Unexpected tuple 1 with StructType"): -func(lit(1)).collect() +with self.assertRaisesRegex(PythonException, "UDTF_INVALID_OUTPUT_ROW_TYPE"): +TestUDTF(lit(1)).collect() -def test_udtf_eval_returning_non_generator(self): +@udtf(returnType="a: int") class TestUDTF: def eval(self, a: int): return (a,) -func = udtf(TestUDTF, returnType="a: int") -# TODO(SPARK-44005): improve this error message -with self.assertRaisesRegex(PythonException, "Unexpected tuple 1 with StructType"): -func(lit(1)).collect() +with self.assertRaisesRegex(PythonException, "UDTF_INVALID_OUTPUT_ROW_TYPE"): +TestUDTF(lit(1)).collect() def test_udtf_with_invalid_return_value(self): @udtf(returnType="x: int") @@ -1159,21 +1156,20 @@ class UDTFArrowTestsMixin(BaseUDTFTestsMixin): self.spark.conf.set("spark.sql.execution.pythonUDTF.arrow.enabled", old_value) def test_udtf_eval_returning_non_tuple(self): +@udtf(returnType="a: int") class TestUDTF: def eval(self, a: int): yield a -func = udtf(TestUDTF, returnType="a: int") # When arrow is enabled, it can handle non-tuple return value. -self.assertEqual(func(lit(1)).collect(), [Row(a=1)]) +assertDataFrameEqual(TestUDTF(lit(1))
[spark] branch master updated: [SPARK-44005][PYTHON] Improve error messages for regular Python UDTFs that return non-tuple values
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 590b77f7628 [SPARK-44005][PYTHON] Improve error messages for regular Python UDTFs that return non-tuple values 590b77f7628 is described below commit 590b77f76284ad03ad8b3b6d30b23983c66513fc Author: allisonwang-db AuthorDate: Tue Aug 8 11:09:58 2023 +0800 [SPARK-44005][PYTHON] Improve error messages for regular Python UDTFs that return non-tuple values ### What changes were proposed in this pull request? This PR improves error messages for regular Python UDTFs when the result rows are not one of tuple, list and dict. Note this is supported when arrow optimization is enabled. ### Why are the changes needed? To make Python UDTFs more user friendly. ### Does this PR introduce _any_ user-facing change? Yes. ``` class TestUDTF: def eval(self, a: int): yield a ``` Before this PR, this will fail with this error `Unexpected tuple 1 with StructType` After this PR, this will have a more user-friendly error: `[UDTF_INVALID_OUTPUT_ROW_TYPE] The type of an individual output row in the UDTF is invalid. Each row should be a tuple, list, or dict, but got 'int'. Please make sure that the output rows are of the correct type.` ### How was this patch tested? Existing UTs. Closes #42353 from allisonwang-db/spark-44005-non-tuple-return-val. Authored-by: allisonwang-db Signed-off-by: Ruifeng Zheng --- python/pyspark/errors/error_classes.py | 5 + python/pyspark/sql/tests/test_udtf.py | 26 +++--- python/pyspark/worker.py | 12 +--- 3 files changed, 25 insertions(+), 18 deletions(-) diff --git a/python/pyspark/errors/error_classes.py b/python/pyspark/errors/error_classes.py index 24885e94d32..bc32afeb87a 100644 --- a/python/pyspark/errors/error_classes.py +++ b/python/pyspark/errors/error_classes.py @@ -743,6 +743,11 @@ ERROR_CLASSES_JSON = """ "User defined table function encountered an error in the '' method: " ] }, + "UDTF_INVALID_OUTPUT_ROW_TYPE" : { +"message" : [ +"The type of an individual output row in the UDTF is invalid. Each row should be a tuple, list, or dict, but got ''. Please make sure that the output rows are of the correct type." +] + }, "UDTF_RETURN_NOT_ITERABLE" : { "message" : [ "The return value of the UDTF is invalid. It should be an iterable (e.g., generator or list), but got ''. Please make sure that the UDTF returns one of these types." diff --git a/python/pyspark/sql/tests/test_udtf.py b/python/pyspark/sql/tests/test_udtf.py index b2f473996bc..300067716e9 100644 --- a/python/pyspark/sql/tests/test_udtf.py +++ b/python/pyspark/sql/tests/test_udtf.py @@ -163,24 +163,21 @@ class BaseUDTFTestsMixin: self.assertEqual(rows, [Row(a=1, b=2), Row(a=2, b=3)]) def test_udtf_eval_returning_non_tuple(self): +@udtf(returnType="a: int") class TestUDTF: def eval(self, a: int): yield a -func = udtf(TestUDTF, returnType="a: int") -# TODO(SPARK-44005): improve this error message -with self.assertRaisesRegex(PythonException, "Unexpected tuple 1 with StructType"): -func(lit(1)).collect() +with self.assertRaisesRegex(PythonException, "UDTF_INVALID_OUTPUT_ROW_TYPE"): +TestUDTF(lit(1)).collect() -def test_udtf_eval_returning_non_generator(self): +@udtf(returnType="a: int") class TestUDTF: def eval(self, a: int): return (a,) -func = udtf(TestUDTF, returnType="a: int") -# TODO(SPARK-44005): improve this error message -with self.assertRaisesRegex(PythonException, "Unexpected tuple 1 with StructType"): -func(lit(1)).collect() +with self.assertRaisesRegex(PythonException, "UDTF_INVALID_OUTPUT_ROW_TYPE"): +TestUDTF(lit(1)).collect() def test_udtf_with_invalid_return_value(self): @udtf(returnType="x: int") @@ -1852,21 +1849,20 @@ class UDTFArrowTestsMixin(BaseUDTFTestsMixin): self.spark.conf.set("spark.sql.execution.pythonUDTF.arrow.enabled", old_value) def test_udtf_eval_returning_non_tuple(self): +@udtf(returnType="a: int") class TestUDTF: def eval(self, a: int): yield a -func = udtf(TestUDTF, returnType="a: int") # When arrow is enabled, it can handle non-tuple return value. -self.assertEqual(func(lit(1)).collect(), [Row(a=1)]) +assertDataFrameEqual(TestUDTF(lit(1)), [Row(a=1)]) -def test_udtf_eval_returning_non_generator(self): +@udtf(returnType="a: int")
[spark] branch branch-3.5 updated: [SPARK-44689][CONNECT] Make the exception handling of function `SparkConnectPlanner#unpackScalarScalaUDF` more universal
This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new cd4ae6e7452 [SPARK-44689][CONNECT] Make the exception handling of function `SparkConnectPlanner#unpackScalarScalaUDF` more universal cd4ae6e7452 is described below commit cd4ae6e7452170422bdf14ab5e1957e61503904f Author: yangjie01 AuthorDate: Tue Aug 8 11:06:21 2023 +0800 [SPARK-44689][CONNECT] Make the exception handling of function `SparkConnectPlanner#unpackScalarScalaUDF` more universal ### What changes were proposed in this pull request? This PR changes the exception handling in the `unpackScalarScalaUD` function in `SparkConnectPlanner` from determining the exception type based on a fixed nesting level to using Guava `Throwables` to get the root cause and then determining the type of the root cause. This makes it compatible with differences between different Java versions. ### Why are the changes needed? The following failure occurred when testing `UDFClassLoadingE2ESuite` in Java 17 daily test: https://github.com/apache/spark/actions/runs/5766913899/job/15635782831 ``` [info] UDFClassLoadingE2ESuite: [info] - update class loader after stubbing: new session *** FAILED *** (101 milliseconds) [info] "Exception in SerializedLambda.readResolve" did not contain "java.lang.NoSuchMethodException: org.apache.spark.sql.connect.client.StubClassDummyUdf" (UDFClassLoadingE2ESuite.scala:57) ... [info] - update class loader after stubbing: same session *** FAILED *** (52 milliseconds) [info] "Exception in SerializedLambda.readResolve" did not contain "java.lang.NoSuchMethodException: org.apache.spark.sql.connect.client.StubClassDummyUdf" (UDFClassLoadingE2ESuite.scala:73) ... ``` After analysis, it was found that there are differences in the exception stack generated on the server side between Java 8 and Java 17: - Java 8 ``` java.io.IOException: unexpected exception type at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1750) at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1280) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) at org.apache.spark.util.SparkSerDeUtils.deserialize(SparkSerDeUtils.scala:50) at org.apache.spark.util.SparkSerDeUtils.deserialize$(SparkSerDeUtils.scala:41) at org.apache.spark.util.Utils$.deserialize(Utils.scala:95) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.unpackScalarScalaUDF(SparkConnectPlanner.scala:1516) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.org$apache$spark$sql$connect$planner$SparkConnectPlanner$$unpackUdf(SparkConnectPlanner.scala:1507) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformScalarScalaFunction(SparkConnectPlanner.scala:1544) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleRegisterScalarScalaUDF(SparkConnectPlanner.scala:2565) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleRegisterUserDefinedFunction(SparkConnectPlanner.scala:2492) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2363) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:202) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:158) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:132) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:184) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:184) at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withContextClassLoader$1(SessionHolder.scala:171) at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:179) at org.apache.spark
[spark] branch master updated: [SPARK-44689][CONNECT] Make the exception handling of function `SparkConnectPlanner#unpackScalarScalaUDF` more universal
This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 25053d98186 [SPARK-44689][CONNECT] Make the exception handling of function `SparkConnectPlanner#unpackScalarScalaUDF` more universal 25053d98186 is described below commit 25053d98186489d9f2061c9b815a5a33f7e309c4 Author: yangjie01 AuthorDate: Tue Aug 8 11:06:21 2023 +0800 [SPARK-44689][CONNECT] Make the exception handling of function `SparkConnectPlanner#unpackScalarScalaUDF` more universal ### What changes were proposed in this pull request? This PR changes the exception handling in the `unpackScalarScalaUD` function in `SparkConnectPlanner` from determining the exception type based on a fixed nesting level to using Guava `Throwables` to get the root cause and then determining the type of the root cause. This makes it compatible with differences between different Java versions. ### Why are the changes needed? The following failure occurred when testing `UDFClassLoadingE2ESuite` in Java 17 daily test: https://github.com/apache/spark/actions/runs/5766913899/job/15635782831 ``` [info] UDFClassLoadingE2ESuite: [info] - update class loader after stubbing: new session *** FAILED *** (101 milliseconds) [info] "Exception in SerializedLambda.readResolve" did not contain "java.lang.NoSuchMethodException: org.apache.spark.sql.connect.client.StubClassDummyUdf" (UDFClassLoadingE2ESuite.scala:57) ... [info] - update class loader after stubbing: same session *** FAILED *** (52 milliseconds) [info] "Exception in SerializedLambda.readResolve" did not contain "java.lang.NoSuchMethodException: org.apache.spark.sql.connect.client.StubClassDummyUdf" (UDFClassLoadingE2ESuite.scala:73) ... ``` After analysis, it was found that there are differences in the exception stack generated on the server side between Java 8 and Java 17: - Java 8 ``` java.io.IOException: unexpected exception type at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1750) at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1280) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) at org.apache.spark.util.SparkSerDeUtils.deserialize(SparkSerDeUtils.scala:50) at org.apache.spark.util.SparkSerDeUtils.deserialize$(SparkSerDeUtils.scala:41) at org.apache.spark.util.Utils$.deserialize(Utils.scala:95) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.unpackScalarScalaUDF(SparkConnectPlanner.scala:1516) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.org$apache$spark$sql$connect$planner$SparkConnectPlanner$$unpackUdf(SparkConnectPlanner.scala:1507) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformScalarScalaFunction(SparkConnectPlanner.scala:1544) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleRegisterScalarScalaUDF(SparkConnectPlanner.scala:2565) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleRegisterUserDefinedFunction(SparkConnectPlanner.scala:2492) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2363) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:202) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:158) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:132) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:184) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:184) at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withContextClassLoader$1(SessionHolder.scala:171) at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:179) at org.apache.spark.sql.con
[spark] branch master updated (aa1261dc129 -> 6dadd188f36)
This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from aa1261dc129 [SPARK-44641][SQL] Incorrect result in certain scenarios when SPJ is not triggered add 6dadd188f36 [SPARK-44554][INFRA] Make Python linter related checks pass of branch-3.3/3.4 daily testing No new revisions were added by this update. Summary of changes: .github/workflows/build_and_test.yml | 17 + 1 file changed, 17 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.4 updated: [SPARK-44641][SQL] Incorrect result in certain scenarios when SPJ is not triggered
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new 5c7d1e2f8a2 [SPARK-44641][SQL] Incorrect result in certain scenarios when SPJ is not triggered 5c7d1e2f8a2 is described below commit 5c7d1e2f8a2110e33c1d1c8f458d14a02c2056b7 Author: Chao Sun AuthorDate: Mon Aug 7 19:16:38 2023 -0700 [SPARK-44641][SQL] Incorrect result in certain scenarios when SPJ is not triggered This PR makes sure we use unique partition values when calculating the final partitions in `BatchScanExec`, to make sure no duplicated partitions are generated. When `spark.sql.sources.v2.bucketing.pushPartValues.enabled` and `spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled` are enabled, and SPJ is not triggered, currently Spark will generate incorrect/duplicated results. This is because with both configs enabled, Spark will delay the partition grouping until the time it calculates the final partitions used by the input RDD. To calculate the partitions, it uses partition values from the `KeyGroupedPartitioning` to find out the right ordering for the partitions. However, since grouping is not done when the partition values is computed, there could be duplicated partition values. This means the result could contain duplicated partitions too. No, this is a bug fix. Added a new test case for this scenario. Closes #42324 from sunchao/SPARK-44641. Authored-by: Chao Sun Signed-off-by: Chao Sun (cherry picked from commit aa1261dc129618d27a1bdc743a5fdd54219f7c01) Signed-off-by: Chao Sun --- .../sql/catalyst/plans/physical/partitioning.scala | 9 +++- .../execution/datasources/v2/BatchScanExec.scala | 9 +++- .../connector/KeyGroupedPartitioningSuite.scala| 56 ++ 3 files changed, 72 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 6512344169b..d2f9e9b5d5b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -313,7 +313,7 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) * by `expressions`. `partitionValuesOpt`, if defined, should contain value of partition key(s) in * ascending order, after evaluated by the transforms in `expressions`, for each input partition. * In addition, its length must be the same as the number of input partitions (and thus is a 1-1 - * mapping), and each row in `partitionValuesOpt` must be unique. + * mapping). The `partitionValues` may contain duplicated partition values. * * For example, if `expressions` is `[years(ts_col)]`, then a valid value of `partitionValuesOpt` is * `[0, 1, 2]`, which represents 3 input partitions with distinct partition values. All rows @@ -355,6 +355,13 @@ case class KeyGroupedPartitioning( override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec = KeyGroupedShuffleSpec(this, distribution) + + lazy val uniquePartitionValues: Seq[InternalRow] = { +partitionValues +.map(InternalRowComparableWrapper(_, expressions)) +.distinct +.map(_.row) + } } object KeyGroupedPartitioning { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala index d43331d57c4..02821d10d50 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala @@ -191,10 +191,17 @@ case class BatchScanExec( Seq.fill(numSplits)(Seq.empty)) } } else { + // either `commonPartitionValues` is not defined, or it is defined but + // `applyPartialClustering` is false. val partitionMapping = groupedPartitions.map { case (row, parts) => InternalRowComparableWrapper(row, p.expressions) -> parts }.toMap - finalPartitions = p.partitionValues.map { partValue => + + // In case `commonPartitionValues` is not defined (e.g., SPJ is not used), there + // could exist duplicated partition values, as partition grouping is not done + // at the beginning and postponed to this method. It is important to use unique + // partition values here so that grouped partitions won't get duplicated. +
[spark] branch branch-3.5 updated: [SPARK-44641][SQL] Incorrect result in certain scenarios when SPJ is not triggered
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 06871f2ae4c [SPARK-44641][SQL] Incorrect result in certain scenarios when SPJ is not triggered 06871f2ae4c is described below commit 06871f2ae4c5f2b5a2208a6ead83e7802d2e0c16 Author: Chao Sun AuthorDate: Mon Aug 7 19:16:38 2023 -0700 [SPARK-44641][SQL] Incorrect result in certain scenarios when SPJ is not triggered This PR makes sure we use unique partition values when calculating the final partitions in `BatchScanExec`, to make sure no duplicated partitions are generated. When `spark.sql.sources.v2.bucketing.pushPartValues.enabled` and `spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled` are enabled, and SPJ is not triggered, currently Spark will generate incorrect/duplicated results. This is because with both configs enabled, Spark will delay the partition grouping until the time it calculates the final partitions used by the input RDD. To calculate the partitions, it uses partition values from the `KeyGroupedPartitioning` to find out the right ordering for the partitions. However, since grouping is not done when the partition values is computed, there could be duplicated partition values. This means the result could contain duplicated partitions too. No, this is a bug fix. Added a new test case for this scenario. Closes #42324 from sunchao/SPARK-44641. Authored-by: Chao Sun Signed-off-by: Chao Sun (cherry picked from commit aa1261dc129618d27a1bdc743a5fdd54219f7c01) Signed-off-by: Chao Sun --- .../sql/catalyst/plans/physical/partitioning.scala | 9 +++- .../execution/datasources/v2/BatchScanExec.scala | 9 +++- .../connector/KeyGroupedPartitioningSuite.scala| 56 ++ 3 files changed, 72 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 6512344169b..d2f9e9b5d5b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -313,7 +313,7 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) * by `expressions`. `partitionValuesOpt`, if defined, should contain value of partition key(s) in * ascending order, after evaluated by the transforms in `expressions`, for each input partition. * In addition, its length must be the same as the number of input partitions (and thus is a 1-1 - * mapping), and each row in `partitionValuesOpt` must be unique. + * mapping). The `partitionValues` may contain duplicated partition values. * * For example, if `expressions` is `[years(ts_col)]`, then a valid value of `partitionValuesOpt` is * `[0, 1, 2]`, which represents 3 input partitions with distinct partition values. All rows @@ -355,6 +355,13 @@ case class KeyGroupedPartitioning( override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec = KeyGroupedShuffleSpec(this, distribution) + + lazy val uniquePartitionValues: Seq[InternalRow] = { +partitionValues +.map(InternalRowComparableWrapper(_, expressions)) +.distinct +.map(_.row) + } } object KeyGroupedPartitioning { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala index 4b538197392..eba3c71f871 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala @@ -190,10 +190,17 @@ case class BatchScanExec( Seq.fill(numSplits)(Seq.empty)) } } else { + // either `commonPartitionValues` is not defined, or it is defined but + // `applyPartialClustering` is false. val partitionMapping = groupedPartitions.map { case (row, parts) => InternalRowComparableWrapper(row, p.expressions) -> parts }.toMap - finalPartitions = p.partitionValues.map { partValue => + + // In case `commonPartitionValues` is not defined (e.g., SPJ is not used), there + // could exist duplicated partition values, as partition grouping is not done + // at the beginning and postponed to this method. It is important to use unique + // partition values here so that grouped partitions won't get duplicated. +
[spark] branch master updated: [SPARK-44641][SQL] Incorrect result in certain scenarios when SPJ is not triggered
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new aa1261dc129 [SPARK-44641][SQL] Incorrect result in certain scenarios when SPJ is not triggered aa1261dc129 is described below commit aa1261dc129618d27a1bdc743a5fdd54219f7c01 Author: Chao Sun AuthorDate: Mon Aug 7 19:16:38 2023 -0700 [SPARK-44641][SQL] Incorrect result in certain scenarios when SPJ is not triggered ### What changes were proposed in this pull request? This PR makes sure we use unique partition values when calculating the final partitions in `BatchScanExec`, to make sure no duplicated partitions are generated. ### Why are the changes needed? When `spark.sql.sources.v2.bucketing.pushPartValues.enabled` and `spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled` are enabled, and SPJ is not triggered, currently Spark will generate incorrect/duplicated results. This is because with both configs enabled, Spark will delay the partition grouping until the time it calculates the final partitions used by the input RDD. To calculate the partitions, it uses partition values from the `KeyGroupedPartitioning` to find out the right ordering for the partitions. However, since grouping is not done when the partition values is computed, there could be duplicated partition values. This means the result could contain duplicated partitions too. ### Does this PR introduce _any_ user-facing change? No, this is a bug fix. ### How was this patch tested? Added a new test case for this scenario. Closes #42324 from sunchao/SPARK-44641. Authored-by: Chao Sun Signed-off-by: Chao Sun --- .../sql/catalyst/plans/physical/partitioning.scala | 9 +++- .../execution/datasources/v2/BatchScanExec.scala | 9 +++- .../connector/KeyGroupedPartitioningSuite.scala| 56 ++ 3 files changed, 72 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index bd8ba54ddd7..456005768bd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -313,7 +313,7 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) * by `expressions`. `partitionValues`, if defined, should contain value of partition key(s) in * ascending order, after evaluated by the transforms in `expressions`, for each input partition. * In addition, its length must be the same as the number of input partitions (and thus is a 1-1 - * mapping), and each row in `partitionValues` must be unique. + * mapping). The `partitionValues` may contain duplicated partition values. * * For example, if `expressions` is `[years(ts_col)]`, then a valid value of `partitionValues` is * `[0, 1, 2]`, which represents 3 input partitions with distinct partition values. All rows @@ -355,6 +355,13 @@ case class KeyGroupedPartitioning( override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec = KeyGroupedShuffleSpec(this, distribution) + + lazy val uniquePartitionValues: Seq[InternalRow] = { +partitionValues +.map(InternalRowComparableWrapper(_, expressions)) +.distinct +.map(_.row) + } } object KeyGroupedPartitioning { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala index 4b538197392..eba3c71f871 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala @@ -190,10 +190,17 @@ case class BatchScanExec( Seq.fill(numSplits)(Seq.empty)) } } else { + // either `commonPartitionValues` is not defined, or it is defined but + // `applyPartialClustering` is false. val partitionMapping = groupedPartitions.map { case (row, parts) => InternalRowComparableWrapper(row, p.expressions) -> parts }.toMap - finalPartitions = p.partitionValues.map { partValue => + + // In case `commonPartitionValues` is not defined (e.g., SPJ is not used), there + // could exist duplicated partition values, as partition grouping is not done + // at the beginning and postponed to this method. It is important to use unique +
[spark] branch master updated: [SPARK-43429][CONNECT] Add Default & Active SparkSession for Scala Client
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 7493c5764f9 [SPARK-43429][CONNECT] Add Default & Active SparkSession for Scala Client 7493c5764f9 is described below commit 7493c5764f9644878babacccd4f688fe13ef84aa Author: Herman van Hovell AuthorDate: Tue Aug 8 04:15:07 2023 +0200 [SPARK-43429][CONNECT] Add Default & Active SparkSession for Scala Client ### What changes were proposed in this pull request? This adds the `default` and `active` session variables to `SparkSession`: - `default` session is global value. It is typically the first session created through `getOrCreate`. It can be changed through `set` or `clear`. If the session is closed and it is the `default` session we clear the `default` session. - `active` session is a thread local value. It is typically the first session created in this thread or it inherits is value from its parent thread. It can be changed through `set` or `clear`, please note that these methods operate thread locally, so they won't change the parent or children. If the session is closed and it is the `active` session for the current thread then we clear the active value (only for the current thread!). ### Why are the changes needed? To increase compatibility with the existing SparkSession API in `sql/core`. ### Does this PR introduce _any_ user-facing change? Yes. It adds a couple methods that were missing from the Scala Client. ### How was this patch tested? Added tests to `SparkSessionSuite`. Closes #42367 from hvanhovell/SPARK-43429. Authored-by: Herman van Hovell Signed-off-by: Herman van Hovell --- .../scala/org/apache/spark/sql/SparkSession.scala | 100 -- .../org/apache/spark/sql/SparkSessionSuite.scala | 144 +++-- .../CheckConnectJvmClientCompatibility.scala | 2 - 3 files changed, 225 insertions(+), 21 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala index 355d7edadc7..7367ed153f7 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql import java.io.Closeable import java.net.URI import java.util.concurrent.TimeUnit._ -import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.atomic.{AtomicLong, AtomicReference} import scala.collection.JavaConverters._ import scala.reflect.runtime.universe.TypeTag @@ -730,6 +730,23 @@ object SparkSession extends Logging { override def load(c: Configuration): SparkSession = create(c) }) + /** The active SparkSession for the current thread. */ + private val activeThreadSession = new InheritableThreadLocal[SparkSession] + + /** Reference to the root SparkSession. */ + private val defaultSession = new AtomicReference[SparkSession] + + /** + * Set the (global) default [[SparkSession]], and (thread-local) active [[SparkSession]] when + * they are not set yet. + */ + private def setDefaultAndActiveSession(session: SparkSession): Unit = { +defaultSession.compareAndSet(null, session) +if (getActiveSession.isEmpty) { + setActiveSession(session) +} + } + /** * Create a new [[SparkSession]] based on the connect client [[Configuration]]. */ @@ -742,8 +759,17 @@ object SparkSession extends Logging { */ private[sql] def onSessionClose(session: SparkSession): Unit = { sessions.invalidate(session.client.configuration) +defaultSession.compareAndSet(session, null) +if (getActiveSession.contains(session)) { + clearActiveSession() +} } + /** + * Creates a [[SparkSession.Builder]] for constructing a [[SparkSession]]. + * + * @since 3.4.0 + */ def builder(): Builder = new Builder() private[sql] lazy val cleaner = { @@ -799,10 +825,15 @@ object SparkSession extends Logging { * * This will always return a newly created session. * + * This method will update the default and/or active session if they are not set. + * * @since 3.5.0 */ def create(): SparkSession = { - tryCreateSessionFromClient().getOrElse(SparkSession.this.create(builder.configuration)) + val session = tryCreateSessionFromClient() +.getOrElse(SparkSession.this.create(builder.configuration)) + setDefaultAndActiveSession(session) + session } /** @@ -811,30 +842,79 @@ object SparkSession extends Logging { * If a session exist with the same configuration that is returned instead of creating a new * sessio
[spark] branch branch-3.5 updated: [SPARK-44683][SS] Logging level isn't passed to RocksDB state store provider correctly
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 19c8001dacb [SPARK-44683][SS] Logging level isn't passed to RocksDB state store provider correctly 19c8001dacb is described below commit 19c8001dacb2933ef0a052babdd976fa886b8b4d Author: Siying Dong AuthorDate: Tue Aug 8 11:11:56 2023 +0900 [SPARK-44683][SS] Logging level isn't passed to RocksDB state store provider correctly ### What changes were proposed in this pull request? The logging level is passed into RocksDB in a correct way. ### Why are the changes needed? We pass log4j's log level to RocksDB so that RocksDB debug log can go to log4j. However, we pass in log level after we create the logger. However, the way it is set isn't effective. This has two impacts: (1) setting DEBUG level don't make RocksDB generate DEBUG level logs; (2) setting WARN or ERROR level does prevent INFO level logging, but RocksDB still makes JNI calls to Scala, which is an unnecessary overhead. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually change the log level and observe the log lines in unit tests. Closes #42354 from siying/rocks_log_level. Authored-by: Siying Dong Signed-off-by: Jungtaek Lim (cherry picked from commit 630b1777904f15c7ac05c3cd61c0006cd692bc93) Signed-off-by: Jungtaek Lim --- .../org/apache/spark/sql/execution/streaming/state/RocksDB.scala | 5 - 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 386df61a9e0..2398b778072 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -613,8 +613,11 @@ class RocksDB( if (log.isWarnEnabled) dbLogLevel = InfoLogLevel.WARN_LEVEL if (log.isInfoEnabled) dbLogLevel = InfoLogLevel.INFO_LEVEL if (log.isDebugEnabled) dbLogLevel = InfoLogLevel.DEBUG_LEVEL -dbOptions.setLogger(dbLogger) +dbLogger.setInfoLogLevel(dbLogLevel) +// The log level set in dbLogger is effective and the one to dbOptions isn't applied to +// customized logger. We still set it as it might show up in RocksDB config file or logging. dbOptions.setInfoLogLevel(dbLogLevel) +dbOptions.setLogger(dbLogger) logInfo(s"Set RocksDB native logging level to $dbLogLevel") dbLogger } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-44683][SS] Logging level isn't passed to RocksDB state store provider correctly
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 630b1777904 [SPARK-44683][SS] Logging level isn't passed to RocksDB state store provider correctly 630b1777904 is described below commit 630b1777904f15c7ac05c3cd61c0006cd692bc93 Author: Siying Dong AuthorDate: Tue Aug 8 11:11:56 2023 +0900 [SPARK-44683][SS] Logging level isn't passed to RocksDB state store provider correctly ### What changes were proposed in this pull request? The logging level is passed into RocksDB in a correct way. ### Why are the changes needed? We pass log4j's log level to RocksDB so that RocksDB debug log can go to log4j. However, we pass in log level after we create the logger. However, the way it is set isn't effective. This has two impacts: (1) setting DEBUG level don't make RocksDB generate DEBUG level logs; (2) setting WARN or ERROR level does prevent INFO level logging, but RocksDB still makes JNI calls to Scala, which is an unnecessary overhead. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually change the log level and observe the log lines in unit tests. Closes #42354 from siying/rocks_log_level. Authored-by: Siying Dong Signed-off-by: Jungtaek Lim --- .../org/apache/spark/sql/execution/streaming/state/RocksDB.scala | 5 - 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index d4366fe732b..a2868df9411 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -611,8 +611,11 @@ class RocksDB( if (log.isWarnEnabled) dbLogLevel = InfoLogLevel.WARN_LEVEL if (log.isInfoEnabled) dbLogLevel = InfoLogLevel.INFO_LEVEL if (log.isDebugEnabled) dbLogLevel = InfoLogLevel.DEBUG_LEVEL -dbOptions.setLogger(dbLogger) +dbLogger.setInfoLogLevel(dbLogLevel) +// The log level set in dbLogger is effective and the one to dbOptions isn't applied to +// customized logger. We still set it as it might show up in RocksDB config file or logging. dbOptions.setInfoLogLevel(dbLogLevel) +dbOptions.setLogger(dbLogger) logInfo(s"Set RocksDB native logging level to $dbLogLevel") dbLogger } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.5 updated: [SPARK-44694][PYTHON][CONNECT] Refactor active sessions and expose them as an API
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 21e7fe81a66 [SPARK-44694][PYTHON][CONNECT] Refactor active sessions and expose them as an API 21e7fe81a66 is described below commit 21e7fe81a662eacebcbc1971a3586a6d470376f3 Author: Hyukjin Kwon AuthorDate: Tue Aug 8 11:03:05 2023 +0900 [SPARK-44694][PYTHON][CONNECT] Refactor active sessions and expose them as an API This PR proposes to (mostly) refactor all the internal workarounds to get the active session correctly. There are few things to note: - _PySpark without Spark Connect does not already support the hierarchy of active sessions_. With pinned thread mode (enabled by default), PySpark does map each Python thread to JVM thread, but the thread creation happens within gateway server, that does not respect the thread hierarchy. Therefore, this PR follows the exactly same behaviour. - New thread will not have an active thread by default. - Other behaviours are same as PySpark without Connect, see also https://github.com/apache/spark/pull/42367 - Since I am here, I piggiyback few documentation changes. We missed document `SparkSession.readStream`, `SparkSession.streams`, `SparkSession.udtf`, `SparkSession.conf` and `SparkSession.version` in Spark Connect. - The changes here are mostly refactoring that reuses existing unittests while I expose two methods: - `SparkSession.getActiveSession` (only for Spark Connect) - `SparkSession.active` (for both in PySpark) For Spark Connect users to be able to play with active and default sessions in Python. Yes, it adds new API: - `SparkSession.getActiveSession` (only for Spark Connect) - `SparkSession.active` (for both in PySpark) Existing unittests should cover all. Closes #42371 from HyukjinKwon/SPARK-44694. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon (cherry picked from commit 9368a0f0c1001fb6fd64799a2e744874b6cd27e4) Signed-off-by: Hyukjin Kwon --- .../source/reference/pyspark.sql/spark_session.rst | 1 + python/pyspark/errors/error_classes.py | 5 + python/pyspark/ml/connect/io_utils.py | 8 +- python/pyspark/ml/connect/tuning.py| 11 ++- python/pyspark/ml/torch/distributor.py | 3 +- python/pyspark/ml/util.py | 13 --- python/pyspark/pandas/utils.py | 7 +- python/pyspark/sql/connect/session.py | 107 ++--- python/pyspark/sql/connect/udf.py | 25 +++-- python/pyspark/sql/connect/udtf.py | 27 +++--- python/pyspark/sql/session.py | 65 +++-- .../sql/tests/connect/test_connect_basic.py| 4 +- python/pyspark/sql/utils.py| 18 13 files changed, 197 insertions(+), 97 deletions(-) diff --git a/python/docs/source/reference/pyspark.sql/spark_session.rst b/python/docs/source/reference/pyspark.sql/spark_session.rst index a5f8bc47d44..74315a0aacc 100644 --- a/python/docs/source/reference/pyspark.sql/spark_session.rst +++ b/python/docs/source/reference/pyspark.sql/spark_session.rst @@ -28,6 +28,7 @@ See also :class:`SparkSession`. .. autosummary:: :toctree: api/ +SparkSession.active SparkSession.builder.appName SparkSession.builder.config SparkSession.builder.enableHiveSupport diff --git a/python/pyspark/errors/error_classes.py b/python/pyspark/errors/error_classes.py index 971dc59bbb2..937a8758404 100644 --- a/python/pyspark/errors/error_classes.py +++ b/python/pyspark/errors/error_classes.py @@ -607,6 +607,11 @@ ERROR_CLASSES_JSON = """ "Argument `` should be a WindowSpec, got ." ] }, + "NO_ACTIVE_OR_DEFAULT_SESSION" : { +"message" : [ + "No active or default Spark session found. Please create a new Spark session before running the code." +] + }, "NO_ACTIVE_SESSION" : { "message" : [ "No active Spark session found. Please create a new Spark session before running the code." diff --git a/python/pyspark/ml/connect/io_utils.py b/python/pyspark/ml/connect/io_utils.py index 9a963086aaf..a09a244862c 100644 --- a/python/pyspark/ml/connect/io_utils.py +++ b/python/pyspark/ml/connect/io_utils.py @@ -23,7 +23,7 @@ import time from urllib.parse import urlparse from typing import Any, Dict, List from pyspark.ml.base import Params -from pyspark.ml.util import _get_active_session +from pyspark.sql import SparkSession from pyspark.sql.utils import is_remote @@ -34,7 +34,7 @@ _META_DATA_FILE_NAME = "metadata.json" def _copy_file_from_local_to_fs(local_path: str, dest_path: str) -> None: -session = _get_active_session(is_remote()) +sessio
[spark] branch master updated: [SPARK-44694][PYTHON][CONNECT] Refactor active sessions and expose them as an API
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 9368a0f0c10 [SPARK-44694][PYTHON][CONNECT] Refactor active sessions and expose them as an API 9368a0f0c10 is described below commit 9368a0f0c1001fb6fd64799a2e744874b6cd27e4 Author: Hyukjin Kwon AuthorDate: Tue Aug 8 11:03:05 2023 +0900 [SPARK-44694][PYTHON][CONNECT] Refactor active sessions and expose them as an API ### What changes were proposed in this pull request? This PR proposes to (mostly) refactor all the internal workarounds to get the active session correctly. There are few things to note: - _PySpark without Spark Connect does not already support the hierarchy of active sessions_. With pinned thread mode (enabled by default), PySpark does map each Python thread to JVM thread, but the thread creation happens within gateway server, that does not respect the thread hierarchy. Therefore, this PR follows the exactly same behaviour. - New thread will not have an active thread by default. - Other behaviours are same as PySpark without Connect, see also https://github.com/apache/spark/pull/42367 - Since I am here, I piggiyback few documentation changes. We missed document `SparkSession.readStream`, `SparkSession.streams`, `SparkSession.udtf`, `SparkSession.conf` and `SparkSession.version` in Spark Connect. - The changes here are mostly refactoring that reuses existing unittests while I expose two methods: - `SparkSession.getActiveSession` (only for Spark Connect) - `SparkSession.active` (for both in PySpark) ### Why are the changes needed? For Spark Connect users to be able to play with active and default sessions in Python. ### Does this PR introduce _any_ user-facing change? Yes, it adds new API: - `SparkSession.getActiveSession` (only for Spark Connect) - `SparkSession.active` (for both in PySpark) ### How was this patch tested? Existing unittests should cover all. Closes #42371 from HyukjinKwon/SPARK-44694. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- .../source/reference/pyspark.sql/spark_session.rst | 1 + python/pyspark/errors/error_classes.py | 5 + python/pyspark/ml/connect/io_utils.py | 8 +- python/pyspark/ml/connect/tuning.py| 11 ++- python/pyspark/ml/torch/distributor.py | 3 +- python/pyspark/ml/util.py | 13 --- python/pyspark/pandas/utils.py | 7 +- python/pyspark/sql/connect/session.py | 107 ++--- python/pyspark/sql/connect/udf.py | 25 +++-- python/pyspark/sql/connect/udtf.py | 27 +++--- python/pyspark/sql/session.py | 65 +++-- .../sql/tests/connect/test_connect_basic.py| 4 +- python/pyspark/sql/utils.py| 18 13 files changed, 197 insertions(+), 97 deletions(-) diff --git a/python/docs/source/reference/pyspark.sql/spark_session.rst b/python/docs/source/reference/pyspark.sql/spark_session.rst index c16ca4f162f..f25dbab5f6b 100644 --- a/python/docs/source/reference/pyspark.sql/spark_session.rst +++ b/python/docs/source/reference/pyspark.sql/spark_session.rst @@ -28,6 +28,7 @@ See also :class:`SparkSession`. .. autosummary:: :toctree: api/ +SparkSession.active SparkSession.builder.appName SparkSession.builder.config SparkSession.builder.enableHiveSupport diff --git a/python/pyspark/errors/error_classes.py b/python/pyspark/errors/error_classes.py index a534bc6deb4..24885e94d32 100644 --- a/python/pyspark/errors/error_classes.py +++ b/python/pyspark/errors/error_classes.py @@ -617,6 +617,11 @@ ERROR_CLASSES_JSON = """ "Argument `` should be a WindowSpec, got ." ] }, + "NO_ACTIVE_OR_DEFAULT_SESSION" : { +"message" : [ + "No active or default Spark session found. Please create a new Spark session before running the code." +] + }, "NO_ACTIVE_SESSION" : { "message" : [ "No active Spark session found. Please create a new Spark session before running the code." diff --git a/python/pyspark/ml/connect/io_utils.py b/python/pyspark/ml/connect/io_utils.py index 9a963086aaf..a09a244862c 100644 --- a/python/pyspark/ml/connect/io_utils.py +++ b/python/pyspark/ml/connect/io_utils.py @@ -23,7 +23,7 @@ import time from urllib.parse import urlparse from typing import Any, Dict, List from pyspark.ml.base import Params -from pyspark.ml.util import _get_active_session +from pyspark.sql import SparkSession from pyspark.sql.utils import is_remote @@ -34,7 +34,7 @@ _META_DATA_FILE_NAME = "metadata.json" def _copy_file_from_local_to_fs(local_path
[spark] branch branch-3.5 updated: [MINOR][UI] Increasing the number of significant digits for Fraction Cached of RDD
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new df02aa29ddc [MINOR][UI] Increasing the number of significant digits for Fraction Cached of RDD df02aa29ddc is described below commit df02aa29ddc9b03e4386128f26bfc98b869485e5 Author: Kent Yao AuthorDate: Tue Aug 8 08:10:09 2023 +0900 [MINOR][UI] Increasing the number of significant digits for Fraction Cached of RDD ### What changes were proposed in this pull request? This PR is a typo improvement for increasing the number of significant digits for Fraction Cached of RDD that shows on the Storage Tab. ### Why are the changes needed? improves accuracy and precision ![image](https://github.com/apache/spark/assets/8326978/7106352c-b806-4953-8938-c4cba8ea1191) ### Does this PR introduce _any_ user-facing change? Yes, the Fraction Cached on Storage Page increases the fractional length from 0 to 2 ### How was this patch tested? locally verified Closes #42373 from yaooqinn/uiminor. Authored-by: Kent Yao Signed-off-by: Hyukjin Kwon (cherry picked from commit f47a2560e6e39ba8eac51a76290614b2fba4d65a) Signed-off-by: Hyukjin Kwon --- .../scala/org/apache/spark/ui/storage/StoragePage.scala | 2 +- .../org/apache/spark/ui/storage/StoragePageSuite.scala| 15 --- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala index c1708c320c5..72662267365 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala @@ -98,7 +98,7 @@ private[ui] class StoragePage(parent: SparkUITab, store: AppStatusStore) extends {rdd.storageLevel} {rdd.numCachedPartitions.toString} - {"%.0f%%".format(rdd.numCachedPartitions * 100.0 / rdd.numPartitions)} + {"%.2f%%".format(rdd.numCachedPartitions * 100.0 / rdd.numPartitions)} {Utils.bytesToString(rdd.memoryUsed)} {Utils.bytesToString(rdd.diskUsed)} diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala index 718c6856cb3..d1e25bf8a23 100644 --- a/core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala @@ -48,8 +48,8 @@ class StoragePageSuite extends SparkFunSuite { val rdd2 = new RDDStorageInfo(2, "rdd2", - 10, - 5, + 1000, + 56, StorageLevel.DISK_ONLY.description, 0L, 200L, @@ -58,8 +58,8 @@ class StoragePageSuite extends SparkFunSuite { val rdd3 = new RDDStorageInfo(3, "rdd3", - 10, - 10, + 1000, + 103, StorageLevel.MEMORY_AND_DISK_SER.description, 400L, 500L, @@ -94,19 +94,20 @@ class StoragePageSuite extends SparkFunSuite { assert((xmlNodes \\ "tr").size === 3) assert(((xmlNodes \\ "tr")(0) \\ "td").map(_.text.trim) === - Seq("1", "rdd1", "Memory Deserialized 1x Replicated", "10", "100%", "100.0 B", "0.0 B")) + Seq("1", "rdd1", "Memory Deserialized 1x Replicated", "10", "100.00%", "100.0 B", "0.0 B")) // Check the url assert(((xmlNodes \\ "tr")(0) \\ "td" \ "a")(0).attribute("href").map(_.text) === Some("http://localhost:4040/storage/rdd/?id=1";)) assert(((xmlNodes \\ "tr")(1) \\ "td").map(_.text.trim) === - Seq("2", "rdd2", "Disk Serialized 1x Replicated", "5", "50%", "0.0 B", "200.0 B")) + Seq("2", "rdd2", "Disk Serialized 1x Replicated", "56", "5.60%", "0.0 B", "200.0 B")) // Check the url assert(((xmlNodes \\ "tr")(1) \\ "td" \ "a")(0).attribute("href").map(_.text) === Some("http://localhost:4040/storage/rdd/?id=2";)) assert(((xmlNodes \\ "tr")(2) \\ "td").map(_.text.trim) === - Seq("3", "rdd3", "Disk Memory Serialized 1x Replicated", "10", "100%", "400.0 B", "500.0 B")) + Seq("3", "rdd3", "Disk Memory Serialized 1x Replicated", "103", "10.30%", "400.0 B", +"500.0 B")) // Check the url assert(((xmlNodes \\ "tr")(2) \\ "td" \ "a")(0).attribute("href").map(_.text) === Some("http://localhost:4040/storage/rdd/?id=3";)) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [MINOR][UI] Increasing the number of significant digits for Fraction Cached of RDD
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f47a2560e6e [MINOR][UI] Increasing the number of significant digits for Fraction Cached of RDD f47a2560e6e is described below commit f47a2560e6e39ba8eac51a76290614b2fba4d65a Author: Kent Yao AuthorDate: Tue Aug 8 08:10:09 2023 +0900 [MINOR][UI] Increasing the number of significant digits for Fraction Cached of RDD ### What changes were proposed in this pull request? This PR is a typo improvement for increasing the number of significant digits for Fraction Cached of RDD that shows on the Storage Tab. ### Why are the changes needed? improves accuracy and precision ![image](https://github.com/apache/spark/assets/8326978/7106352c-b806-4953-8938-c4cba8ea1191) ### Does this PR introduce _any_ user-facing change? Yes, the Fraction Cached on Storage Page increases the fractional length from 0 to 2 ### How was this patch tested? locally verified Closes #42373 from yaooqinn/uiminor. Authored-by: Kent Yao Signed-off-by: Hyukjin Kwon --- .../scala/org/apache/spark/ui/storage/StoragePage.scala | 2 +- .../org/apache/spark/ui/storage/StoragePageSuite.scala| 15 --- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala index c1708c320c5..72662267365 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala @@ -98,7 +98,7 @@ private[ui] class StoragePage(parent: SparkUITab, store: AppStatusStore) extends {rdd.storageLevel} {rdd.numCachedPartitions.toString} - {"%.0f%%".format(rdd.numCachedPartitions * 100.0 / rdd.numPartitions)} + {"%.2f%%".format(rdd.numCachedPartitions * 100.0 / rdd.numPartitions)} {Utils.bytesToString(rdd.memoryUsed)} {Utils.bytesToString(rdd.diskUsed)} diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala index 718c6856cb3..d1e25bf8a23 100644 --- a/core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala @@ -48,8 +48,8 @@ class StoragePageSuite extends SparkFunSuite { val rdd2 = new RDDStorageInfo(2, "rdd2", - 10, - 5, + 1000, + 56, StorageLevel.DISK_ONLY.description, 0L, 200L, @@ -58,8 +58,8 @@ class StoragePageSuite extends SparkFunSuite { val rdd3 = new RDDStorageInfo(3, "rdd3", - 10, - 10, + 1000, + 103, StorageLevel.MEMORY_AND_DISK_SER.description, 400L, 500L, @@ -94,19 +94,20 @@ class StoragePageSuite extends SparkFunSuite { assert((xmlNodes \\ "tr").size === 3) assert(((xmlNodes \\ "tr")(0) \\ "td").map(_.text.trim) === - Seq("1", "rdd1", "Memory Deserialized 1x Replicated", "10", "100%", "100.0 B", "0.0 B")) + Seq("1", "rdd1", "Memory Deserialized 1x Replicated", "10", "100.00%", "100.0 B", "0.0 B")) // Check the url assert(((xmlNodes \\ "tr")(0) \\ "td" \ "a")(0).attribute("href").map(_.text) === Some("http://localhost:4040/storage/rdd/?id=1";)) assert(((xmlNodes \\ "tr")(1) \\ "td").map(_.text.trim) === - Seq("2", "rdd2", "Disk Serialized 1x Replicated", "5", "50%", "0.0 B", "200.0 B")) + Seq("2", "rdd2", "Disk Serialized 1x Replicated", "56", "5.60%", "0.0 B", "200.0 B")) // Check the url assert(((xmlNodes \\ "tr")(1) \\ "td" \ "a")(0).attribute("href").map(_.text) === Some("http://localhost:4040/storage/rdd/?id=2";)) assert(((xmlNodes \\ "tr")(2) \\ "td").map(_.text.trim) === - Seq("3", "rdd3", "Disk Memory Serialized 1x Replicated", "10", "100%", "400.0 B", "500.0 B")) + Seq("3", "rdd3", "Disk Memory Serialized 1x Replicated", "103", "10.30%", "400.0 B", +"500.0 B")) // Check the url assert(((xmlNodes \\ "tr")(2) \\ "td" \ "a")(0).attribute("href").map(_.text) === Some("http://localhost:4040/storage/rdd/?id=3";)) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.5 updated: [SPARK-44132][SQL] Materialize `Stream` of join column names to avoid codegen failure
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 4f869ce6f67 [SPARK-44132][SQL] Materialize `Stream` of join column names to avoid codegen failure 4f869ce6f67 is described below commit 4f869ce6f67c4bb2444c351632adaf9c78c5ec1b Author: Steven Aerts AuthorDate: Tue Aug 8 08:09:05 2023 +0900 [SPARK-44132][SQL] Materialize `Stream` of join column names to avoid codegen failure ### What changes were proposed in this pull request? Materialize passed join columns as an `IndexedSeq` before passing it to the lower layers. ### Why are the changes needed? When nesting multiple full outer joins using column names which are a `Stream`, the code generator will generate faulty code resulting in a NPE or bad `UnsafeRow` access at runtime. See the 2 added test cases. ### Why are the changes needed? Otherwise the code will crash, see the 2 added test cases. Which show an NPE and a bad `UnsafeRow` access. ### Does this PR introduce _any_ user-facing change? No, only bug fix. ### How was this patch tested? A reproduction scenario was created and added to the code base. Closes #41712 from steven-aerts/SPARK-44132-fix. Authored-by: Steven Aerts Signed-off-by: Hyukjin Kwon (cherry picked from commit 8911578020f8a2428b12dd72cb0ed4b7d747d835) Signed-off-by: Hyukjin Kwon --- .../main/scala/org/apache/spark/sql/Dataset.scala| 2 +- .../sql/execution/joins/JoinCodegenSupport.scala | 2 +- .../test/scala/org/apache/spark/sql/JoinSuite.scala | 20 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 61c83829d20..eda017937d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1092,7 +1092,7 @@ class Dataset[T] private[sql]( Join( joined.left, joined.right, -UsingJoin(JoinType(joinType), usingColumns), +UsingJoin(JoinType(joinType), usingColumns.toIndexedSeq), None, JoinHint.NONE) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala index a7d1edefcd6..6496f9a0006 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala @@ -79,7 +79,7 @@ trait JoinCodegenSupport extends CodegenSupport with BaseJoinExec { setDefaultValue: Boolean): Seq[ExprCode] = { ctx.currentVars = null ctx.INPUT_ROW = row -plan.output.zipWithIndex.map { case (a, i) => +plan.output.toIndexedSeq.zipWithIndex.map { case (a, i) => val ev = BoundReference(i, a.dataType, a.nullable).genCode(ctx) if (setDefaultValue) { // the variables are needed even there is no matched rows diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 7f358723eeb..14f1fb27906 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -1709,4 +1709,24 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan checkAnswer(sql(query), expected) } } + + test("SPARK-44132: FULL OUTER JOIN by streamed column name fails with NPE") { +val dsA = Seq((1, "a")).toDF("id", "c1") +val dsB = Seq((2, "b")).toDF("id", "c2") +val dsC = Seq((3, "c")).toDF("id", "c3") +val joined = dsA.join(dsB, Stream("id"), "full_outer").join(dsC, Stream("id"), "full_outer") + +val expected = Seq(Row(1, "a", null, null), Row(2, null, "b", null), Row(3, null, null, "c")) + +checkAnswer(joined, expected) + } + + test("SPARK-44132: FULL OUTER JOIN by streamed column name fails with invalid access") { +val ds = Seq((1, "a")).toDF("id", "c1") +val joined = ds.join(ds, Stream("id"), "full_outer").join(ds, Stream("id"), "full_outer") + +val expected = Seq(Row(1, "a", "a", "a")) + +checkAnswer(joined, expected) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-44132][SQL] Materialize `Stream` of join column names to avoid codegen failure
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 8911578020f [SPARK-44132][SQL] Materialize `Stream` of join column names to avoid codegen failure 8911578020f is described below commit 8911578020f8a2428b12dd72cb0ed4b7d747d835 Author: Steven Aerts AuthorDate: Tue Aug 8 08:09:05 2023 +0900 [SPARK-44132][SQL] Materialize `Stream` of join column names to avoid codegen failure ### What changes were proposed in this pull request? Materialize passed join columns as an `IndexedSeq` before passing it to the lower layers. ### Why are the changes needed? When nesting multiple full outer joins using column names which are a `Stream`, the code generator will generate faulty code resulting in a NPE or bad `UnsafeRow` access at runtime. See the 2 added test cases. ### Why are the changes needed? Otherwise the code will crash, see the 2 added test cases. Which show an NPE and a bad `UnsafeRow` access. ### Does this PR introduce _any_ user-facing change? No, only bug fix. ### How was this patch tested? A reproduction scenario was created and added to the code base. Closes #41712 from steven-aerts/SPARK-44132-fix. Authored-by: Steven Aerts Signed-off-by: Hyukjin Kwon --- .../main/scala/org/apache/spark/sql/Dataset.scala| 2 +- .../sql/execution/joins/JoinCodegenSupport.scala | 2 +- .../test/scala/org/apache/spark/sql/JoinSuite.scala | 20 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 61c83829d20..eda017937d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1092,7 +1092,7 @@ class Dataset[T] private[sql]( Join( joined.left, joined.right, -UsingJoin(JoinType(joinType), usingColumns), +UsingJoin(JoinType(joinType), usingColumns.toIndexedSeq), None, JoinHint.NONE) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala index a7d1edefcd6..6496f9a0006 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala @@ -79,7 +79,7 @@ trait JoinCodegenSupport extends CodegenSupport with BaseJoinExec { setDefaultValue: Boolean): Seq[ExprCode] = { ctx.currentVars = null ctx.INPUT_ROW = row -plan.output.zipWithIndex.map { case (a, i) => +plan.output.toIndexedSeq.zipWithIndex.map { case (a, i) => val ev = BoundReference(i, a.dataType, a.nullable).genCode(ctx) if (setDefaultValue) { // the variables are needed even there is no matched rows diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 7f358723eeb..14f1fb27906 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -1709,4 +1709,24 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan checkAnswer(sql(query), expected) } } + + test("SPARK-44132: FULL OUTER JOIN by streamed column name fails with NPE") { +val dsA = Seq((1, "a")).toDF("id", "c1") +val dsB = Seq((2, "b")).toDF("id", "c2") +val dsC = Seq((3, "c")).toDF("id", "c3") +val joined = dsA.join(dsB, Stream("id"), "full_outer").join(dsC, Stream("id"), "full_outer") + +val expected = Seq(Row(1, "a", null, null), Row(2, null, "b", null), Row(3, null, null, "c")) + +checkAnswer(joined, expected) + } + + test("SPARK-44132: FULL OUTER JOIN by streamed column name fails with invalid access") { +val ds = Seq((1, "a")).toDF("id", "c1") +val joined = ds.join(ds, Stream("id"), "full_outer").join(ds, Stream("id"), "full_outer") + +val expected = Seq(Row(1, "a", "a", "a")) + +checkAnswer(joined, expected) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.5 updated: [SPARK-44692][CONNECT][SQL] Move Trigger(s) to sql/api
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new d8f02274c38 [SPARK-44692][CONNECT][SQL] Move Trigger(s) to sql/api d8f02274c38 is described below commit d8f02274c38c027e2d56f5158ce63f6e74255d2d Author: Herman van Hovell AuthorDate: Tue Aug 8 00:42:13 2023 +0200 [SPARK-44692][CONNECT][SQL] Move Trigger(s) to sql/api This PR moves `Triggers.scala` and `Trigger.scala` from `sql/core` to `sql/api`, and it removes the duplicates from the connect scala client. Not really needed, just some deduplication. No. Existing tests. Closes #42368 from hvanhovell/SPARK-44692. Authored-by: Herman van Hovell Signed-off-by: Herman van Hovell (cherry picked from commit 4eea89d339649152a1afcd8b7a32020454e71d42) Signed-off-by: Herman van Hovell --- .../org/apache/spark/sql/streaming/Trigger.java| 180 - dev/checkstyle-suppressions.xml| 4 +- project/MimaExcludes.scala | 4 +- .../org/apache/spark/sql/streaming/Trigger.java| 0 .../spark/sql/execution/streaming/Triggers.scala | 6 +- .../spark/sql/execution/streaming/Triggers.scala | 113 - 6 files changed, 6 insertions(+), 301 deletions(-) diff --git a/connector/connect/client/jvm/src/main/java/org/apache/spark/sql/streaming/Trigger.java b/connector/connect/client/jvm/src/main/java/org/apache/spark/sql/streaming/Trigger.java deleted file mode 100644 index 27ffe67d990..000 --- a/connector/connect/client/jvm/src/main/java/org/apache/spark/sql/streaming/Trigger.java +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.streaming; - -import java.util.concurrent.TimeUnit; - -import scala.concurrent.duration.Duration; - -import org.apache.spark.annotation.Evolving; -import org.apache.spark.sql.execution.streaming.AvailableNowTrigger$; -import org.apache.spark.sql.execution.streaming.ContinuousTrigger; -import org.apache.spark.sql.execution.streaming.OneTimeTrigger$; -import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger; - -/** - * Policy used to indicate how often results should be produced by a [[StreamingQuery]]. - * - * @since 3.5.0 - */ -@Evolving -public class Trigger { - // This is a copy of the same class in sql/core/.../streaming/Trigger.java - - /** - * A trigger policy that runs a query periodically based on an interval in processing time. - * If `interval` is 0, the query will run as fast as possible. - * - * @since 3.5.0 - */ - public static Trigger ProcessingTime(long intervalMs) { - return ProcessingTimeTrigger.create(intervalMs, TimeUnit.MILLISECONDS); - } - - /** - * (Java-friendly) - * A trigger policy that runs a query periodically based on an interval in processing time. - * If `interval` is 0, the query will run as fast as possible. - * - * {{{ - *import java.util.concurrent.TimeUnit - *df.writeStream().trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS)) - * }}} - * - * @since 3.5.0 - */ - public static Trigger ProcessingTime(long interval, TimeUnit timeUnit) { - return ProcessingTimeTrigger.create(interval, timeUnit); - } - - /** - * (Scala-friendly) - * A trigger policy that runs a query periodically based on an interval in processing time. - * If `duration` is 0, the query will run as fast as possible. - * - * {{{ - *import scala.concurrent.duration._ - *df.writeStream.trigger(Trigger.ProcessingTime(10.seconds)) - * }}} - * @since 3.5.0 - */ - public static Trigger ProcessingTime(Duration interval) { - return ProcessingTimeTrigger.apply(interval); - } - - /** - * A trigger policy that runs a query periodically based on an interval in processing time. - * If `interval` is effectively 0, the query will run as fast as possible. - * - * {{{ - *df.writeStream.trigger(Trigger.ProcessingTime("10 seconds")) - * }}} - * @since 3.5.0
[spark] branch master updated: [SPARK-44692][CONNECT][SQL] Move Trigger(s) to sql/api
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 4eea89d3396 [SPARK-44692][CONNECT][SQL] Move Trigger(s) to sql/api 4eea89d3396 is described below commit 4eea89d339649152a1afcd8b7a32020454e71d42 Author: Herman van Hovell AuthorDate: Tue Aug 8 00:42:13 2023 +0200 [SPARK-44692][CONNECT][SQL] Move Trigger(s) to sql/api ### What changes were proposed in this pull request? This PR moves `Triggers.scala` and `Trigger.scala` from `sql/core` to `sql/api`, and it removes the duplicates from the connect scala client. ### Why are the changes needed? Not really needed, just some deduplication. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. Closes #42368 from hvanhovell/SPARK-44692. Authored-by: Herman van Hovell Signed-off-by: Herman van Hovell --- .../org/apache/spark/sql/streaming/Trigger.java| 180 - dev/checkstyle-suppressions.xml| 4 +- project/MimaExcludes.scala | 4 +- .../org/apache/spark/sql/streaming/Trigger.java| 0 .../spark/sql/execution/streaming/Triggers.scala | 6 +- .../spark/sql/execution/streaming/Triggers.scala | 113 - 6 files changed, 6 insertions(+), 301 deletions(-) diff --git a/connector/connect/client/jvm/src/main/java/org/apache/spark/sql/streaming/Trigger.java b/connector/connect/client/jvm/src/main/java/org/apache/spark/sql/streaming/Trigger.java deleted file mode 100644 index 27ffe67d990..000 --- a/connector/connect/client/jvm/src/main/java/org/apache/spark/sql/streaming/Trigger.java +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.streaming; - -import java.util.concurrent.TimeUnit; - -import scala.concurrent.duration.Duration; - -import org.apache.spark.annotation.Evolving; -import org.apache.spark.sql.execution.streaming.AvailableNowTrigger$; -import org.apache.spark.sql.execution.streaming.ContinuousTrigger; -import org.apache.spark.sql.execution.streaming.OneTimeTrigger$; -import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger; - -/** - * Policy used to indicate how often results should be produced by a [[StreamingQuery]]. - * - * @since 3.5.0 - */ -@Evolving -public class Trigger { - // This is a copy of the same class in sql/core/.../streaming/Trigger.java - - /** - * A trigger policy that runs a query periodically based on an interval in processing time. - * If `interval` is 0, the query will run as fast as possible. - * - * @since 3.5.0 - */ - public static Trigger ProcessingTime(long intervalMs) { - return ProcessingTimeTrigger.create(intervalMs, TimeUnit.MILLISECONDS); - } - - /** - * (Java-friendly) - * A trigger policy that runs a query periodically based on an interval in processing time. - * If `interval` is 0, the query will run as fast as possible. - * - * {{{ - *import java.util.concurrent.TimeUnit - *df.writeStream().trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS)) - * }}} - * - * @since 3.5.0 - */ - public static Trigger ProcessingTime(long interval, TimeUnit timeUnit) { - return ProcessingTimeTrigger.create(interval, timeUnit); - } - - /** - * (Scala-friendly) - * A trigger policy that runs a query periodically based on an interval in processing time. - * If `duration` is 0, the query will run as fast as possible. - * - * {{{ - *import scala.concurrent.duration._ - *df.writeStream.trigger(Trigger.ProcessingTime(10.seconds)) - * }}} - * @since 3.5.0 - */ - public static Trigger ProcessingTime(Duration interval) { - return ProcessingTimeTrigger.apply(interval); - } - - /** - * A trigger policy that runs a query periodically based on an interval in processing time. - * If `interval` is effectively 0, the query will run as fast as possible. - * - * {{{ - *df.writeStream.trigger(T
[spark] branch branch-3.5 updated: [SPARK-44575][SQL][CONNECT] Implement basic error translation
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 3b24f65fd5a [SPARK-44575][SQL][CONNECT] Implement basic error translation 3b24f65fd5a is described below commit 3b24f65fd5a97cb58d87ec0d1d798df59d946fbd Author: Yihong He AuthorDate: Tue Aug 8 06:33:48 2023 +0900 [SPARK-44575][SQL][CONNECT] Implement basic error translation ### What changes were proposed in this pull request? - Implement basic error translation for spark connect scala client. ### Why are the changes needed? - Better compatibility with the existing control flow ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? `build/sbt "connect-client-jvm/testOnly *Suite"` Closes #42266 from heyihong/SPARK-44575. Authored-by: Yihong He Signed-off-by: Hyukjin Kwon (cherry picked from commit 42e5daddf3ba16ff7d08e82e51cd8924cc56e180) Signed-off-by: Hyukjin Kwon --- .../connect/client/GrpcExceptionConverter.scala| 54 ++ .../scala/org/apache/spark/sql/CatalogSuite.scala | 6 +-- .../org/apache/spark/sql/ClientE2ETestSuite.scala | 12 +++-- .../spark/sql/DataFrameNaFunctionSuite.scala | 3 +- .../sql/KeyValueGroupedDatasetE2ETestSuite.scala | 3 +- 5 files changed, 60 insertions(+), 18 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala index 7ff3421a5a0..64d1e5c488a 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala @@ -16,18 +16,26 @@ */ package org.apache.spark.sql.connect.client +import scala.jdk.CollectionConverters._ +import scala.reflect.ClassTag + +import com.google.rpc.ErrorInfo import io.grpc.StatusRuntimeException import io.grpc.protobuf.StatusProto -import org.apache.spark.{SparkException, SparkThrowable} +import org.apache.spark.SparkException +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.catalyst.trees.Origin +import org.apache.spark.util.JsonUtils -private[client] object GrpcExceptionConverter { +private[client] object GrpcExceptionConverter extends JsonUtils { def convert[T](f: => T): T = { try { f } catch { case e: StatusRuntimeException => -throw toSparkThrowable(e) +throw toThrowable(e) } } @@ -53,11 +61,41 @@ private[client] object GrpcExceptionConverter { } } - private def toSparkThrowable(ex: StatusRuntimeException): SparkThrowable with Throwable = { -val status = StatusProto.fromThrowable(ex) -// TODO: Add finer grained error conversion -new SparkException(status.getMessage, ex.getCause) + private def errorConstructor[T <: Throwable: ClassTag]( + throwableCtr: (String, Throwable) => T): (String, (String, Throwable) => Throwable) = { +val className = implicitly[reflect.ClassTag[T]].runtimeClass.getName +(className, throwableCtr) } -} + private val errorFactory = Map( +errorConstructor((message, _) => new ParseException(None, message, Origin(), Origin())), +errorConstructor((message, cause) => new AnalysisException(message, cause = Option(cause + + private def errorInfoToThrowable(info: ErrorInfo, message: String): Option[Throwable] = { +val classes = + mapper.readValue(info.getMetadataOrDefault("classes", "[]"), classOf[Array[String]]) +classes + .find(errorFactory.contains) + .map { cls => +val constructor = errorFactory.get(cls).get +constructor(message, null) + } + } + + private def toThrowable(ex: StatusRuntimeException): Throwable = { +val status = StatusProto.fromThrowable(ex) + +val fallbackEx = new SparkException(status.getMessage, ex.getCause) + +val errorInfoOpt = status.getDetailsList.asScala + .find(_.is(classOf[ErrorInfo])) + +if (errorInfoOpt.isEmpty) { + return fallbackEx +} + +errorInfoToThrowable(errorInfoOpt.get.unpack(classOf[ErrorInfo]), status.getMessage) + .getOrElse(fallbackEx) + } +} diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CatalogSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CatalogSuite.scala index 00a6bcc9b5c..fa97498f7e7 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CatalogSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/
[spark] branch master updated: [SPARK-44575][SQL][CONNECT] Implement basic error translation
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 42e5daddf3b [SPARK-44575][SQL][CONNECT] Implement basic error translation 42e5daddf3b is described below commit 42e5daddf3ba16ff7d08e82e51cd8924cc56e180 Author: Yihong He AuthorDate: Tue Aug 8 06:33:48 2023 +0900 [SPARK-44575][SQL][CONNECT] Implement basic error translation ### What changes were proposed in this pull request? - Implement basic error translation for spark connect scala client. ### Why are the changes needed? - Better compatibility with the existing control flow ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? `build/sbt "connect-client-jvm/testOnly *Suite"` Closes #42266 from heyihong/SPARK-44575. Authored-by: Yihong He Signed-off-by: Hyukjin Kwon --- .../connect/client/GrpcExceptionConverter.scala| 54 ++ .../scala/org/apache/spark/sql/CatalogSuite.scala | 6 +-- .../org/apache/spark/sql/ClientE2ETestSuite.scala | 12 +++-- .../spark/sql/DataFrameNaFunctionSuite.scala | 3 +- .../sql/KeyValueGroupedDatasetE2ETestSuite.scala | 3 +- 5 files changed, 60 insertions(+), 18 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala index 7ff3421a5a0..64d1e5c488a 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala @@ -16,18 +16,26 @@ */ package org.apache.spark.sql.connect.client +import scala.jdk.CollectionConverters._ +import scala.reflect.ClassTag + +import com.google.rpc.ErrorInfo import io.grpc.StatusRuntimeException import io.grpc.protobuf.StatusProto -import org.apache.spark.{SparkException, SparkThrowable} +import org.apache.spark.SparkException +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.catalyst.trees.Origin +import org.apache.spark.util.JsonUtils -private[client] object GrpcExceptionConverter { +private[client] object GrpcExceptionConverter extends JsonUtils { def convert[T](f: => T): T = { try { f } catch { case e: StatusRuntimeException => -throw toSparkThrowable(e) +throw toThrowable(e) } } @@ -53,11 +61,41 @@ private[client] object GrpcExceptionConverter { } } - private def toSparkThrowable(ex: StatusRuntimeException): SparkThrowable with Throwable = { -val status = StatusProto.fromThrowable(ex) -// TODO: Add finer grained error conversion -new SparkException(status.getMessage, ex.getCause) + private def errorConstructor[T <: Throwable: ClassTag]( + throwableCtr: (String, Throwable) => T): (String, (String, Throwable) => Throwable) = { +val className = implicitly[reflect.ClassTag[T]].runtimeClass.getName +(className, throwableCtr) } -} + private val errorFactory = Map( +errorConstructor((message, _) => new ParseException(None, message, Origin(), Origin())), +errorConstructor((message, cause) => new AnalysisException(message, cause = Option(cause + + private def errorInfoToThrowable(info: ErrorInfo, message: String): Option[Throwable] = { +val classes = + mapper.readValue(info.getMetadataOrDefault("classes", "[]"), classOf[Array[String]]) +classes + .find(errorFactory.contains) + .map { cls => +val constructor = errorFactory.get(cls).get +constructor(message, null) + } + } + + private def toThrowable(ex: StatusRuntimeException): Throwable = { +val status = StatusProto.fromThrowable(ex) + +val fallbackEx = new SparkException(status.getMessage, ex.getCause) + +val errorInfoOpt = status.getDetailsList.asScala + .find(_.is(classOf[ErrorInfo])) + +if (errorInfoOpt.isEmpty) { + return fallbackEx +} + +errorInfoToThrowable(errorInfoOpt.get.unpack(classOf[ErrorInfo]), status.getMessage) + .getOrElse(fallbackEx) + } +} diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CatalogSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CatalogSuite.scala index 00a6bcc9b5c..fa97498f7e7 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CatalogSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CatalogSuite.scala @@ -46,7 +46,7 @@ class CatalogSuite extends RemoteSparkSession with SQLHelper
[spark] branch master updated: [SPARK-43606][PS] Remove `Int64Index` & `Float64Index`
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new d5d3f393f16 [SPARK-43606][PS] Remove `Int64Index` & `Float64Index` d5d3f393f16 is described below commit d5d3f393f16d1c17f88857b81e9bd7573d594d87 Author: itholic AuthorDate: Tue Aug 8 06:31:52 2023 +0900 [SPARK-43606][PS] Remove `Int64Index` & `Float64Index` ### What changes were proposed in this pull request? This PR proposes to remove `Int64Index` & `Float64Index` from pandas API on Spark. ### Why are the changes needed? To match the behavior with pandas 2 and above. ### Does this PR introduce _any_ user-facing change? Yes, the `Int64Index` & `Float64Index` will be removed. ### How was this patch tested? Enabling the existing doctests & UTs. Closes #42267 from itholic/SPARK-43245. Authored-by: itholic Signed-off-by: Hyukjin Kwon --- dev/sparktestsupport/modules.py| 1 - .../source/migration_guide/pyspark_upgrade.rst | 1 + .../source/reference/pyspark.pandas/indexing.rst | 10 - python/pyspark/pandas/__init__.py | 3 - python/pyspark/pandas/base.py | 28 +-- python/pyspark/pandas/frame.py | 8 +- python/pyspark/pandas/indexes/__init__.py | 1 - python/pyspark/pandas/indexes/base.py | 159 +++- python/pyspark/pandas/indexes/category.py | 4 +- python/pyspark/pandas/indexes/datetimes.py | 12 +- python/pyspark/pandas/indexes/numeric.py | 210 - python/pyspark/pandas/series.py| 4 +- python/pyspark/pandas/spark/accessors.py | 16 +- python/pyspark/pandas/tests/indexes/test_base.py | 45 + python/pyspark/pandas/tests/series/test_compute.py | 4 +- python/pyspark/pandas/tests/series/test_series.py | 3 +- python/pyspark/pandas/usage_logging/__init__.py| 3 - 17 files changed, 124 insertions(+), 388 deletions(-) diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index b2f978c47ea..c5be1957a7d 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -668,7 +668,6 @@ pyspark_pandas = Module( "pyspark.pandas.indexes.datetimes", "pyspark.pandas.indexes.timedelta", "pyspark.pandas.indexes.multi", -"pyspark.pandas.indexes.numeric", "pyspark.pandas.spark.accessors", "pyspark.pandas.spark.utils", "pyspark.pandas.typedef.typehints", diff --git a/python/docs/source/migration_guide/pyspark_upgrade.rst b/python/docs/source/migration_guide/pyspark_upgrade.rst index 9bd879fb1a1..7a691ee2645 100644 --- a/python/docs/source/migration_guide/pyspark_upgrade.rst +++ b/python/docs/source/migration_guide/pyspark_upgrade.rst @@ -22,6 +22,7 @@ Upgrading PySpark Upgrading from PySpark 3.5 to 4.0 - +* In Spark 4.0, ``Int64Index`` and ``Float64Index`` have been removed from pandas API on Spark, ``Index`` should be used directly. * In Spark 4.0, ``DataFrame.iteritems`` has been removed from pandas API on Spark, use ``DataFrame.items`` instead. * In Spark 4.0, ``Series.iteritems`` has been removed from pandas API on Spark, use ``Series.items`` instead. * In Spark 4.0, ``DataFrame.append`` has been removed from pandas API on Spark, use ``ps.concat`` instead. diff --git a/python/docs/source/reference/pyspark.pandas/indexing.rst b/python/docs/source/reference/pyspark.pandas/indexing.rst index 15539fa2266..70d463c052a 100644 --- a/python/docs/source/reference/pyspark.pandas/indexing.rst +++ b/python/docs/source/reference/pyspark.pandas/indexing.rst @@ -166,16 +166,6 @@ Selecting Index.asof Index.isin -.. _api.numeric: - -Numeric Index -- -.. autosummary:: - :toctree: api/ - - Int64Index - Float64Index - .. _api.categorical: CategoricalIndex diff --git a/python/pyspark/pandas/__init__.py b/python/pyspark/pandas/__init__.py index 980aeab2bee..d8ce385639c 100644 --- a/python/pyspark/pandas/__init__.py +++ b/python/pyspark/pandas/__init__.py @@ -61,7 +61,6 @@ from pyspark.pandas.indexes.base import Index from pyspark.pandas.indexes.category import CategoricalIndex from pyspark.pandas.indexes.datetimes import DatetimeIndex from pyspark.pandas.indexes.multi import MultiIndex -from pyspark.pandas.indexes.numeric import Float64Index, Int64Index from pyspark.pandas.indexes.timedelta import TimedeltaIndex from pyspark.pandas.series import Series from pyspark.pandas.groupby import NamedAgg @@ -77,8 +76,6 @@ __all__ = [ # noqa: F405 "Series", "Index", "MultiIndex", -"Int64Index", -"Float64Index", "CategoricalIndex", "DatetimeIndex", "Tim
[spark] branch master updated: [SPARK-44707][K8S] Use INFO log in `ExecutorPodsWatcher.onClose` if `SparkContext` is stopped
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 726ccb532a1 [SPARK-44707][K8S] Use INFO log in `ExecutorPodsWatcher.onClose` if `SparkContext` is stopped 726ccb532a1 is described below commit 726ccb532a1b5dbe9c55e68a71d3125570c6738d Author: Dongjoon Hyun AuthorDate: Mon Aug 7 13:56:24 2023 -0700 [SPARK-44707][K8S] Use INFO log in `ExecutorPodsWatcher.onClose` if `SparkContext` is stopped ### What changes were proposed in this pull request? This PR is a minor log change which aims to use `INFO`-level log instead of `WARN`-level in `ExecutorPodsWatcher.onClose` if `SparkContext` is stopped. Since Spark can distinguish the expected behavior from the error cases, Spark had better avoid WARNING. ### Why are the changes needed? Previously, we have `WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed` message. ``` 23/08/07 18:10:14 INFO SparkContext: SparkContext is stopping with exitCode 0. 23/08/07 18:10:14 WARN TaskSetManager: Lost task 2594.0 in stage 0.0 (TID 2594) ([2620:149:100d:1813::3f86] executor 1615): TaskKilled (another attempt succeeded) 23/08/07 18:10:14 INFO TaskSetManager: task 2594.0 in stage 0.0 (TID 2594) failed, but the task will not be re-executed (either because the task failed with a shuffle data fetch failure, so the previous stage needs to be re-run, or because a different copy of the task has already succeeded). 23/08/07 18:10:14 INFO SparkUI: Stopped Spark web UI at http://xxx:4040 23/08/07 18:10:14 INFO KubernetesClusterSchedulerBackend: Shutting down all executors 23/08/07 18:10:14 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down 23/08/07 18:10:14 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed. ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. Closes #42381 from dongjoon-hyun/SPARK-44707. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- .../cluster/k8s/ExecutorPodsWatchSnapshotSource.scala | 14 +++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala index 4809222650d..6953ed789f7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala @@ -86,12 +86,20 @@ class ExecutorPodsWatchSnapshotSource( } override def onClose(e: WatcherException): Unit = { - logWarning("Kubernetes client has been closed (this is expected if the application is" + -" shutting down.)", e) + if (SparkContext.getActive.map(_.isStopped).getOrElse(true)) { +logInfo("Kubernetes client has been closed.") + } else { +logWarning("Kubernetes client has been closed (this is expected if the application is" + + " shutting down.)", e) + } } override def onClose(): Unit = { - logWarning("Kubernetes client has been closed.") + if (SparkContext.getActive.map(_.isStopped).getOrElse(true)) { +logInfo("Kubernetes client has been closed.") + } else { +logWarning("Kubernetes client has been closed.") + } } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.5 updated: [SPARK-44561][PYTHON] Fix AssertionError when converting UDTF output to a complex type
This is an automated email from the ASF dual-hosted git repository. ueshin pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new ea6b41cb398 [SPARK-44561][PYTHON] Fix AssertionError when converting UDTF output to a complex type ea6b41cb398 is described below commit ea6b41cb3989996e45102b1930b1498324761093 Author: Takuya UESHIN AuthorDate: Mon Aug 7 11:48:24 2023 -0700 [SPARK-44561][PYTHON] Fix AssertionError when converting UDTF output to a complex type ### What changes were proposed in this pull request? Fixes AssertionError when converting UDTF output to a complex type by ignore assertions in `_create_converter_from_pandas` to make Arrow raise an error. ### Why are the changes needed? There is an assertion in `_create_converter_from_pandas`, but it should not be applied for Python UDTF case. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added/modified the related tests. Closes #42310 from ueshin/issues/SPARK-44561/udtf_complex_types. Authored-by: Takuya UESHIN Signed-off-by: Takuya UESHIN (cherry picked from commit f1a161cb39504bd625ea7fa50d2cc72a1a2a59e9) Signed-off-by: Takuya UESHIN --- python/pyspark/sql/pandas/serializers.py | 5 +- python/pyspark/sql/pandas/types.py | 108 ++--- .../pyspark/sql/tests/connect/test_parity_udtf.py | 3 + python/pyspark/sql/tests/test_udtf.py | 247 +++-- 4 files changed, 314 insertions(+), 49 deletions(-) diff --git a/python/pyspark/sql/pandas/serializers.py b/python/pyspark/sql/pandas/serializers.py index f3037c8b39c..d1a3babb1fd 100644 --- a/python/pyspark/sql/pandas/serializers.py +++ b/python/pyspark/sql/pandas/serializers.py @@ -571,7 +571,10 @@ class ArrowStreamPandasUDTFSerializer(ArrowStreamPandasUDFSerializer): dt = spark_type or from_arrow_type(arrow_type, prefer_timestamp_ntz=True) # TODO(SPARK-43579): cache the converter for reuse conv = _create_converter_from_pandas( -dt, timezone=self._timezone, error_on_duplicated_field_names=False +dt, +timezone=self._timezone, +error_on_duplicated_field_names=False, +ignore_unexpected_complex_type_values=True, ) series = conv(series) diff --git a/python/pyspark/sql/pandas/types.py b/python/pyspark/sql/pandas/types.py index 53362047604..b02a003e632 100644 --- a/python/pyspark/sql/pandas/types.py +++ b/python/pyspark/sql/pandas/types.py @@ -21,7 +21,7 @@ pandas instances during the type conversion. """ import datetime import itertools -from typing import Any, Callable, List, Optional, Union, TYPE_CHECKING +from typing import Any, Callable, Iterable, List, Optional, Union, TYPE_CHECKING from pyspark.sql.types import ( cast, @@ -750,6 +750,7 @@ def _create_converter_from_pandas( *, timezone: Optional[str], error_on_duplicated_field_names: bool = True, +ignore_unexpected_complex_type_values: bool = False, ) -> Callable[["pd.Series"], "pd.Series"]: """ Create a converter of pandas Series to create Spark DataFrame with Arrow optimization. @@ -763,6 +764,17 @@ def _create_converter_from_pandas( error_on_duplicated_field_names : bool, optional Whether raise an exception when there are duplicated field names. (default ``True``) +ignore_unexpected_complex_type_values : bool, optional +Whether ignore the case where unexpected values are given for complex types. +If ``False``, each complex type expects: + +* array type: :class:`Iterable` +* map type: :class:`dict` +* struct type: :class:`dict` or :class:`tuple` + +and raise an AssertionError when the given value is not the expected type. +If ``True``, just ignore and return the give value. +(default ``False``) Returns --- @@ -781,15 +793,26 @@ def _create_converter_from_pandas( def _converter(dt: DataType) -> Optional[Callable[[Any], Any]]: if isinstance(dt, ArrayType): -_element_conv = _converter(dt.elementType) -if _element_conv is None: -return None +_element_conv = _converter(dt.elementType) or (lambda x: x) -def convert_array(value: Any) -> Any: -if value is None: -return None -else: -return [_element_conv(v) for v in value] # type: ignore[misc] +if ignore_unexpected_complex_type_values: + +def convert_array(value: Any) -> Any: +if value is None: +return None +elif isinstance(value, Itera
[spark] branch master updated: [SPARK-44561][PYTHON] Fix AssertionError when converting UDTF output to a complex type
This is an automated email from the ASF dual-hosted git repository. ueshin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f1a161cb395 [SPARK-44561][PYTHON] Fix AssertionError when converting UDTF output to a complex type f1a161cb395 is described below commit f1a161cb39504bd625ea7fa50d2cc72a1a2a59e9 Author: Takuya UESHIN AuthorDate: Mon Aug 7 11:48:24 2023 -0700 [SPARK-44561][PYTHON] Fix AssertionError when converting UDTF output to a complex type ### What changes were proposed in this pull request? Fixes AssertionError when converting UDTF output to a complex type by ignore assertions in `_create_converter_from_pandas` to make Arrow raise an error. ### Why are the changes needed? There is an assertion in `_create_converter_from_pandas`, but it should not be applied for Python UDTF case. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added/modified the related tests. Closes #42310 from ueshin/issues/SPARK-44561/udtf_complex_types. Authored-by: Takuya UESHIN Signed-off-by: Takuya UESHIN --- python/pyspark/sql/pandas/serializers.py | 5 +- python/pyspark/sql/pandas/types.py | 108 ++--- .../pyspark/sql/tests/connect/test_parity_udtf.py | 3 + python/pyspark/sql/tests/test_udtf.py | 247 +++-- 4 files changed, 314 insertions(+), 49 deletions(-) diff --git a/python/pyspark/sql/pandas/serializers.py b/python/pyspark/sql/pandas/serializers.py index f3037c8b39c..d1a3babb1fd 100644 --- a/python/pyspark/sql/pandas/serializers.py +++ b/python/pyspark/sql/pandas/serializers.py @@ -571,7 +571,10 @@ class ArrowStreamPandasUDTFSerializer(ArrowStreamPandasUDFSerializer): dt = spark_type or from_arrow_type(arrow_type, prefer_timestamp_ntz=True) # TODO(SPARK-43579): cache the converter for reuse conv = _create_converter_from_pandas( -dt, timezone=self._timezone, error_on_duplicated_field_names=False +dt, +timezone=self._timezone, +error_on_duplicated_field_names=False, +ignore_unexpected_complex_type_values=True, ) series = conv(series) diff --git a/python/pyspark/sql/pandas/types.py b/python/pyspark/sql/pandas/types.py index 53362047604..b02a003e632 100644 --- a/python/pyspark/sql/pandas/types.py +++ b/python/pyspark/sql/pandas/types.py @@ -21,7 +21,7 @@ pandas instances during the type conversion. """ import datetime import itertools -from typing import Any, Callable, List, Optional, Union, TYPE_CHECKING +from typing import Any, Callable, Iterable, List, Optional, Union, TYPE_CHECKING from pyspark.sql.types import ( cast, @@ -750,6 +750,7 @@ def _create_converter_from_pandas( *, timezone: Optional[str], error_on_duplicated_field_names: bool = True, +ignore_unexpected_complex_type_values: bool = False, ) -> Callable[["pd.Series"], "pd.Series"]: """ Create a converter of pandas Series to create Spark DataFrame with Arrow optimization. @@ -763,6 +764,17 @@ def _create_converter_from_pandas( error_on_duplicated_field_names : bool, optional Whether raise an exception when there are duplicated field names. (default ``True``) +ignore_unexpected_complex_type_values : bool, optional +Whether ignore the case where unexpected values are given for complex types. +If ``False``, each complex type expects: + +* array type: :class:`Iterable` +* map type: :class:`dict` +* struct type: :class:`dict` or :class:`tuple` + +and raise an AssertionError when the given value is not the expected type. +If ``True``, just ignore and return the give value. +(default ``False``) Returns --- @@ -781,15 +793,26 @@ def _create_converter_from_pandas( def _converter(dt: DataType) -> Optional[Callable[[Any], Any]]: if isinstance(dt, ArrayType): -_element_conv = _converter(dt.elementType) -if _element_conv is None: -return None +_element_conv = _converter(dt.elementType) or (lambda x: x) -def convert_array(value: Any) -> Any: -if value is None: -return None -else: -return [_element_conv(v) for v in value] # type: ignore[misc] +if ignore_unexpected_complex_type_values: + +def convert_array(value: Any) -> Any: +if value is None: +return None +elif isinstance(value, Iterable): +return [_element_conv(v) for v in value] +else: +
[spark] branch master updated: [SPARK-38475][CORE] Use error class in org.apache.spark.serializer
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 2a23c7a18a0 [SPARK-38475][CORE] Use error class in org.apache.spark.serializer 2a23c7a18a0 is described below commit 2a23c7a18a0ba75d95ee1d898896a8f0dc2c5531 Author: Bo Zhang AuthorDate: Mon Aug 7 22:10:01 2023 +0500 [SPARK-38475][CORE] Use error class in org.apache.spark.serializer ### What changes were proposed in this pull request? This PR aims to change exceptions created in package org.apache.spark.serializer to use error class. ### Why are the changes needed? This is to move exceptions created in package org.apache.spark.serializer to error class. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. Closes #42243 from bozhang2820/spark-38475. Lead-authored-by: Bo Zhang Co-authored-by: Bo Zhang Signed-off-by: Max Gekk --- .../src/main/resources/error/error-classes.json| 21 + .../spark/serializer/GenericAvroSerializer.scala | 6 ++--- .../apache/spark/serializer/KryoSerializer.scala | 27 -- docs/sql-error-conditions.md | 24 +++ 4 files changed, 68 insertions(+), 10 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 680f787429c..0ea1eed35e4 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -831,6 +831,11 @@ "Not found an encoder of the type to Spark SQL internal representation. Consider to change the input type to one of supported at '/sql-ref-datatypes.html'." ] }, + "ERROR_READING_AVRO_UNKNOWN_FINGERPRINT" : { +"message" : [ + "Error reading avro data -- encountered an unknown fingerprint: , not sure what schema to use. This could happen if you registered additional schemas after starting your spark context." +] + }, "EVENT_TIME_IS_NOT_ON_TIMESTAMP_TYPE" : { "message" : [ "The event time has the invalid type , but expected \"TIMESTAMP\"." @@ -864,6 +869,11 @@ ], "sqlState" : "22018" }, + "FAILED_REGISTER_CLASS_WITH_KRYO" : { +"message" : [ + "Failed to register classes with Kryo." +] + }, "FAILED_RENAME_PATH" : { "message" : [ "Failed to rename to as destination already exists." @@ -1564,6 +1574,12 @@ ], "sqlState" : "22032" }, + "INVALID_KRYO_SERIALIZER_BUFFER_SIZE" : { +"message" : [ + "The value of the config \"\" must be less than 2048 MiB, but got MiB." +], +"sqlState" : "F" + }, "INVALID_LAMBDA_FUNCTION_CALL" : { "message" : [ "Invalid lambda function call." @@ -2006,6 +2022,11 @@ "The join condition has the invalid type , expected \"BOOLEAN\"." ] }, + "KRYO_BUFFER_OVERFLOW" : { +"message" : [ + "Kryo serialization failed: . To avoid this, increase \"\" value." +] + }, "LOAD_DATA_PATH_NOT_EXISTS" : { "message" : [ "LOAD DATA input path does not exist: ." diff --git a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala index 7d2923fdf37..d09abff2773 100644 --- a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala @@ -140,9 +140,9 @@ private[serializer] class GenericAvroSerializer[D <: GenericContainer] case Some(s) => new Schema.Parser().setValidateDefaults(false).parse(s) case None => throw new SparkException( -"Error reading attempting to read avro data -- encountered an unknown " + - s"fingerprint: $fingerprint, not sure what schema to use. This could happen " + - "if you registered additional schemas after starting your spark context.") +errorClass = "ERROR_READING_AVRO_UNKNOWN_FINGERPRINT", +messageParameters = Map("fingerprint" -> fingerprint.toString), +cause = null) } }) } else { diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 826d6789f88..f75942cbb87 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -66,15 +66,21 @@ class KryoSerializer(conf: SparkConf) private val bufferSizeKb = conf.get(KRYO_SERIALIZER_BUFFER_SIZE) if (bufferSizeKb >=
[spark] branch master updated: [SPARK-44697][CORE] Clean up the deprecated usage of `o.a.commons.lang3.RandomUtils`
This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a3a32912be0 [SPARK-44697][CORE] Clean up the deprecated usage of `o.a.commons.lang3.RandomUtils` a3a32912be0 is described below commit a3a32912be04d3760cb34eb4b79d6d481bbec502 Author: yangjie01 AuthorDate: Mon Aug 7 21:42:38 2023 +0800 [SPARK-44697][CORE] Clean up the deprecated usage of `o.a.commons.lang3.RandomUtils` ### What changes were proposed in this pull request? In `commons-lang3` 3.13.0, `RandomUtils` has been marked as `Deprecated`, the Java doc of `commons-lang3` suggests to instead use the api of `commons-rng`. https://github.com/apache/commons-lang/blob/bcc10b359318397a4d12dbaef22b101725bc6323/src/main/java/org/apache/commons/lang3/RandomUtils.java#L33 ``` * deprecated Use Apache Commons RNG's optimized https://commons.apache.org/proper/commons-rng/commons-rng-client-api/apidocs/org/apache/commons/rng/UniformRandomProvider.html";>UniformRandomProvider ``` However, as Spark only uses `RandomUtils` in test code, so this pr attempts to replace `RandomUtils` with `ThreadLocalRandom` to avoid introducing additional third-party dependencies. ### Why are the changes needed? Clean up the use of Deprecated api. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions Closes #42370 from LuciferYang/RandomUtils-2-ThreadLocalRandom. Authored-by: yangjie01 Signed-off-by: yangjie01 --- .../java/org/apache/spark/io/GenericFileInputStreamSuite.java | 8 .../org/apache/spark/deploy/master/PersistenceEngineSuite.scala | 4 ++-- .../scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala | 4 ++-- .../test/scala/org/apache/spark/storage/BlockManagerSuite.scala | 6 +++--- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/core/src/test/java/org/apache/spark/io/GenericFileInputStreamSuite.java b/core/src/test/java/org/apache/spark/io/GenericFileInputStreamSuite.java index ef7c4cbbb79..4bfb4a2c68c 100644 --- a/core/src/test/java/org/apache/spark/io/GenericFileInputStreamSuite.java +++ b/core/src/test/java/org/apache/spark/io/GenericFileInputStreamSuite.java @@ -17,7 +17,6 @@ package org.apache.spark.io; import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.RandomUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -25,6 +24,7 @@ import org.junit.Test; import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.util.concurrent.ThreadLocalRandom; import static org.junit.Assert.assertEquals; @@ -33,7 +33,8 @@ import static org.junit.Assert.assertEquals; */ public abstract class GenericFileInputStreamSuite { - private byte[] randomBytes; + // Create a byte array of size 2 MB with random bytes + private byte[] randomBytes = new byte[2 * 1024 * 1024]; protected File inputFile; @@ -41,8 +42,7 @@ public abstract class GenericFileInputStreamSuite { @Before public void setUp() throws IOException { -// Create a byte array of size 2 MB with random bytes -randomBytes = RandomUtils.nextBytes(2 * 1024 * 1024); +ThreadLocalRandom.current().nextBytes(randomBytes); inputFile = File.createTempFile("temp-file", ".tmp"); FileUtils.writeByteArrayToFile(inputFile, randomBytes); } diff --git a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala index 39607621b4c..998ad21a50d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala @@ -19,8 +19,8 @@ package org.apache.spark.deploy.master import java.net.ServerSocket +import java.util.concurrent.ThreadLocalRandom -import org.apache.commons.lang3.RandomUtils import org.apache.curator.test.TestingServer import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} @@ -117,7 +117,7 @@ class PersistenceEngineSuite extends SparkFunSuite { } private def findFreePort(conf: SparkConf): Int = { -val candidatePort = RandomUtils.nextInt(1024, 65536) +val candidatePort = ThreadLocalRandom.current().nextInt(1024, 65536) Utils.startServiceOnPort(candidatePort, (trialPort: Int) => { val socket = new ServerSocket(trialPort) socket.close() diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala index 905bb811073..3e69f01c09c 100644 --- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scal
[spark] branch branch-3.4 updated: [SPARK-44634][SQL][3.4] Encoders.bean does no longer support nested beans with type arguments
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new 1f5d78b5952 [SPARK-44634][SQL][3.4] Encoders.bean does no longer support nested beans with type arguments 1f5d78b5952 is described below commit 1f5d78b5952fcc6c7d36d3338a5594070e3a62dd Author: Giambattista Bloisi AuthorDate: Mon Aug 7 15:11:02 2023 +0200 [SPARK-44634][SQL][3.4] Encoders.bean does no longer support nested beans with type arguments ### What changes were proposed in this pull request? This is a port of [42327](https://github.com/apache/spark/pull/42327) This PR fixes a regression introduced in Spark 3.4.x where Encoders.bean is no longer able to process nested beans having type arguments. For example: ``` class A { T value; // value getter and setter } class B { A stringHolder; // stringHolder getter and setter } Encoders.bean(B.class); // throws "SparkUnsupportedOperationException: [ENCODER_NOT_FOUND]..." ``` ### Why are the changes needed? JavaTypeInference.encoderFor main match does not manage ParameterizedType and TypeVariable cases. I think this is a regression introduced after getting rid of usage of guava TypeToken: [SPARK-42093 SQL Move JavaTypeInference to AgnosticEncoders](https://github.com/apache/spark/commit/18672003513d5a4aa610b6b94dbbc15c33185d3#diff-1191737b908340a2f4c22b71b1c40ebaa0da9d8b40c958089c346a3bda26943b) hvanhovell cloud-fan In this PR I'm leveraging commons lang3 TypeUtils functionalities to solve ParameterizedType type arguments for classes ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests have been extended to check correct encoding of a nested bean having type arguments. Closes #42379 from gbloisi-openaire/spark-44634-branch-3.4. Authored-by: Giambattista Bloisi Signed-off-by: Herman van Hovell --- .../spark/sql/catalyst/JavaTypeInference.scala | 85 +- .../spark/sql/catalyst/JavaBeanWithGenerics.java | 41 +++ .../sql/catalyst/JavaTypeInferenceSuite.scala | 4 + 3 files changed, 65 insertions(+), 65 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala index 36b98737a20..75aca3ccbdd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala @@ -18,12 +18,14 @@ package org.apache.spark.sql.catalyst import java.beans.{Introspector, PropertyDescriptor} import java.lang.reflect.{ParameterizedType, Type, TypeVariable} -import java.util.{ArrayDeque, List => JList, Map => JMap} +import java.util.{List => JList, Map => JMap} import javax.annotation.Nonnull -import scala.annotation.tailrec +import scala.collection.JavaConverters._ import scala.reflect.ClassTag +import org.apache.commons.lang3.reflect.{TypeUtils => JavaTypeUtils} + import org.apache.spark.SPARK_DOC_ROOT import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, BinaryEncoder, BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, BoxedIntEncoder, BoxedLongEncoder, BoxedShortEncoder, DayTimeIntervalEncoder, DEFAULT_JAVA_DECIMAL_ENCODER, EncoderField, IterableEncoder, JavaBeanEncoder, JavaBigIntEncoder, JavaEnumEncoder, LocalDateTimeEncoder, MapEncoder, PrimitiveBooleanEncoder, PrimitiveByteEncoder, PrimitiveDoubleEncoder, PrimitiveFloatEncoder, P [...] @@ -58,7 +60,8 @@ object JavaTypeInference { encoderFor(beanType, Set.empty).asInstanceOf[AgnosticEncoder[T]] } - private def encoderFor(t: Type, seenTypeSet: Set[Class[_]]): AgnosticEncoder[_] = t match { + private def encoderFor(t: Type, seenTypeSet: Set[Class[_]], +typeVariables: Map[TypeVariable[_], Type] = Map.empty): AgnosticEncoder[_] = t match { case c: Class[_] if c == java.lang.Boolean.TYPE => PrimitiveBooleanEncoder case c: Class[_] if c == java.lang.Byte.TYPE => PrimitiveByteEncoder @@ -102,18 +105,24 @@ object JavaTypeInference { UDTEncoder(udt, udt.getClass) case c: Class[_] if c.isArray => - val elementEncoder = encoderFor(c.getComponentType, seenTypeSet) + val elementEncoder = encoderFor(c.getComponentType, seenTypeSet, typeVariables) ArrayEncoder(elementEncoder, elementEncoder.nullable) -case ImplementsList(c, Array(elementCls)) => - val element = encoderFor(elementCls, seenTypeSet) +case c: Class[_] if classOf[JList[_]].isAssignableFrom(c) => + val
[spark] branch branch-3.5 updated: [SPARK-44686][CONNECT][SQL] Add the ability to create a RowEncoder in Encoders.scala
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 8f6f301fa77 [SPARK-44686][CONNECT][SQL] Add the ability to create a RowEncoder in Encoders.scala 8f6f301fa77 is described below commit 8f6f301fa778dfd0fd7dec4a29df7106846d3277 Author: Herman van Hovell AuthorDate: Mon Aug 7 15:09:58 2023 +0200 [SPARK-44686][CONNECT][SQL] Add the ability to create a RowEncoder in Encoders.scala ### What changes were proposed in this pull request? ### Why are the changes needed? It is currently not possible to create a `RowEncoder` using public API. The internal APIs for this will change in Spark 3.5, this means that library maintainers have to update their code if they use a RowEncoder. To avoid happening again, we add this method to the public API. ### Does this PR introduce _any_ user-facing change? Yes. It adds the `row` method to `Encoders`. ### How was this patch tested? Added tests to connect and sql. Closes #42366 from hvanhovell/SPARK-44686. Lead-authored-by: Herman van Hovell Co-authored-by: Hyukjin Kwon Signed-off-by: Herman van Hovell (cherry picked from commit bf7654998fbbec9d5bdee6f46462cffef495545f) Signed-off-by: Herman van Hovell --- .../main/scala/org/apache/spark/sql/Encoders.scala | 10 ++- .../org/apache/spark/sql/JavaEncoderSuite.java | 31 +++--- project/MimaExcludes.scala | 2 ++ .../main/java/org/apache/spark/sql/RowFactory.java | 0 .../main/scala/org/apache/spark/sql/Encoders.scala | 7 + .../org/apache/spark/sql/JavaDatasetSuite.java | 19 + 6 files changed, 64 insertions(+), 5 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Encoders.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Encoders.scala index 3f2f7ec96d4..74f01338031 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Encoders.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Encoders.scala @@ -19,8 +19,9 @@ package org.apache.spark.sql import scala.reflect.runtime.universe.TypeTag import org.apache.spark.sql.catalyst.{JavaTypeInference, ScalaReflection} -import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder +import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, RowEncoder => RowEncoderFactory} import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders._ +import org.apache.spark.sql.types.StructType /** * Methods for creating an [[Encoder]]. @@ -168,6 +169,13 @@ object Encoders { */ def bean[T](beanClass: Class[T]): Encoder[T] = JavaTypeInference.encoderFor(beanClass) + /** + * Creates a [[Row]] encoder for schema `schema`. + * + * @since 3.5.0 + */ + def row(schema: StructType): Encoder[Row] = RowEncoderFactory.encoderFor(schema) + private def tupleEncoder[T](encoders: Encoder[_]*): Encoder[T] = { ProductEncoder.tuple(encoders.asInstanceOf[Seq[AgnosticEncoder[_]]]).asInstanceOf[Encoder[T]] } diff --git a/connector/connect/client/jvm/src/test/java/org/apache/spark/sql/JavaEncoderSuite.java b/connector/connect/client/jvm/src/test/java/org/apache/spark/sql/JavaEncoderSuite.java index c8210a7a485..6e5fb72d496 100644 --- a/connector/connect/client/jvm/src/test/java/org/apache/spark/sql/JavaEncoderSuite.java +++ b/connector/connect/client/jvm/src/test/java/org/apache/spark/sql/JavaEncoderSuite.java @@ -16,21 +16,26 @@ */ package org.apache.spark.sql; +import java.io.Serializable; +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.List; + import org.junit.*; import static org.junit.Assert.*; import static org.apache.spark.sql.Encoders.*; import static org.apache.spark.sql.functions.*; +import static org.apache.spark.sql.RowFactory.create; import org.apache.spark.sql.connect.client.SparkConnectClient; import org.apache.spark.sql.connect.client.util.SparkConnectServerUtils; - -import java.math.BigDecimal; -import java.util.Arrays; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.types.StructType; /** * Tests for the encoders class. */ -public class JavaEncoderSuite { +public class JavaEncoderSuite implements Serializable { private static SparkSession spark; @BeforeClass @@ -91,4 +96,22 @@ public class JavaEncoderSuite { dataset(DECIMAL(), bigDec(1000, 2), bigDec(2, 2)) .select(sum(v)).as(DECIMAL()).head().setScale(2)); } + + @Test + public void testRowEncoder() { +final StructType schema = new StructType() +.add("a", "int") +.add("b", "string"); +final Dataset df = spark.range(3) +.map(new MapFunction() { + @Override +
[spark] branch master updated: [SPARK-44686][CONNECT][SQL] Add the ability to create a RowEncoder in Encoders.scala
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new bf7654998fb [SPARK-44686][CONNECT][SQL] Add the ability to create a RowEncoder in Encoders.scala bf7654998fb is described below commit bf7654998fbbec9d5bdee6f46462cffef495545f Author: Herman van Hovell AuthorDate: Mon Aug 7 15:09:58 2023 +0200 [SPARK-44686][CONNECT][SQL] Add the ability to create a RowEncoder in Encoders.scala ### What changes were proposed in this pull request? ### Why are the changes needed? It is currently not possible to create a `RowEncoder` using public API. The internal APIs for this will change in Spark 3.5, this means that library maintainers have to update their code if they use a RowEncoder. To avoid happening again, we add this method to the public API. ### Does this PR introduce _any_ user-facing change? Yes. It adds the `row` method to `Encoders`. ### How was this patch tested? Added tests to connect and sql. Closes #42366 from hvanhovell/SPARK-44686. Lead-authored-by: Herman van Hovell Co-authored-by: Hyukjin Kwon Signed-off-by: Herman van Hovell --- .../main/scala/org/apache/spark/sql/Encoders.scala | 10 ++- .../org/apache/spark/sql/JavaEncoderSuite.java | 31 +++--- project/MimaExcludes.scala | 2 ++ .../main/java/org/apache/spark/sql/RowFactory.java | 0 .../main/scala/org/apache/spark/sql/Encoders.scala | 7 + .../org/apache/spark/sql/JavaDatasetSuite.java | 19 + 6 files changed, 64 insertions(+), 5 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Encoders.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Encoders.scala index 3f2f7ec96d4..74f01338031 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Encoders.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Encoders.scala @@ -19,8 +19,9 @@ package org.apache.spark.sql import scala.reflect.runtime.universe.TypeTag import org.apache.spark.sql.catalyst.{JavaTypeInference, ScalaReflection} -import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder +import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, RowEncoder => RowEncoderFactory} import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders._ +import org.apache.spark.sql.types.StructType /** * Methods for creating an [[Encoder]]. @@ -168,6 +169,13 @@ object Encoders { */ def bean[T](beanClass: Class[T]): Encoder[T] = JavaTypeInference.encoderFor(beanClass) + /** + * Creates a [[Row]] encoder for schema `schema`. + * + * @since 3.5.0 + */ + def row(schema: StructType): Encoder[Row] = RowEncoderFactory.encoderFor(schema) + private def tupleEncoder[T](encoders: Encoder[_]*): Encoder[T] = { ProductEncoder.tuple(encoders.asInstanceOf[Seq[AgnosticEncoder[_]]]).asInstanceOf[Encoder[T]] } diff --git a/connector/connect/client/jvm/src/test/java/org/apache/spark/sql/JavaEncoderSuite.java b/connector/connect/client/jvm/src/test/java/org/apache/spark/sql/JavaEncoderSuite.java index c8210a7a485..6e5fb72d496 100644 --- a/connector/connect/client/jvm/src/test/java/org/apache/spark/sql/JavaEncoderSuite.java +++ b/connector/connect/client/jvm/src/test/java/org/apache/spark/sql/JavaEncoderSuite.java @@ -16,21 +16,26 @@ */ package org.apache.spark.sql; +import java.io.Serializable; +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.List; + import org.junit.*; import static org.junit.Assert.*; import static org.apache.spark.sql.Encoders.*; import static org.apache.spark.sql.functions.*; +import static org.apache.spark.sql.RowFactory.create; import org.apache.spark.sql.connect.client.SparkConnectClient; import org.apache.spark.sql.connect.client.util.SparkConnectServerUtils; - -import java.math.BigDecimal; -import java.util.Arrays; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.types.StructType; /** * Tests for the encoders class. */ -public class JavaEncoderSuite { +public class JavaEncoderSuite implements Serializable { private static SparkSession spark; @BeforeClass @@ -91,4 +96,22 @@ public class JavaEncoderSuite { dataset(DECIMAL(), bigDec(1000, 2), bigDec(2, 2)) .select(sum(v)).as(DECIMAL()).head().setScale(2)); } + + @Test + public void testRowEncoder() { +final StructType schema = new StructType() +.add("a", "int") +.add("b", "string"); +final Dataset df = spark.range(3) +.map(new MapFunction() { + @Override + public Row call(Long i) { + return create(i.intValue(), "s" + i); + } +
[spark] branch master updated: [SPARK-44701][PYTHON][TESTS] Skip `ClassificationTestsOnConnect` when `torch` is not installed
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 433fb2af8a3 [SPARK-44701][PYTHON][TESTS] Skip `ClassificationTestsOnConnect` when `torch` is not installed 433fb2af8a3 is described below commit 433fb2af8a3ca239958cb7b006e2924ecfac0d56 Author: Ruifeng Zheng AuthorDate: Mon Aug 7 20:46:38 2023 +0900 [SPARK-44701][PYTHON][TESTS] Skip `ClassificationTestsOnConnect` when `torch` is not installed ### What changes were proposed in this pull request? Skip `ClassificationTestsOnConnect` when `torch` is not installed ### Why are the changes needed? we moved torch on connect tests to `pyspark_ml_connect`, so module `pyspark_connect` won't have `torch` to fix https://github.com/apache/spark/actions/runs/5776211318/job/15655104006 in 3.5 daily GA: ``` Starting test(python3.9): pyspark.ml.tests.connect.test_connect_classification (temp output: /__w/spark/spark/python/target/fbb6a495-df65-4334-8c04-4befc9ee81df/python3.9__pyspark.ml.tests.connect.test_connect_classification__jp1htw6f.log) Traceback (most recent call last): File "/usr/lib/python3.9/runpy.py", line 197, in _run_module_as_main return _run_code(code, main_globals, None, File "/usr/lib/python3.9/runpy.py", line 87, in _run_code exec(code, run_globals) File "/__w/spark/spark/python/pyspark/ml/tests/connect/test_connect_classification.py", line 21, in from pyspark.ml.tests.connect.test_legacy_mode_classification import ClassificationTestsMixin File "/__w/spark/spark/python/pyspark/ml/tests/connect/test_legacy_mode_classification.py", line 22, in from pyspark.ml.connect.classification import ( File "/__w/spark/spark/python/pyspark/ml/connect/classification.py", line 46, in import torch ModuleNotFoundError: No module named 'torch' ``` ### Does this PR introduce _any_ user-facing change? no, test-only ### How was this patch tested? CI Closes #42375 from zhengruifeng/torch_skip. Authored-by: Ruifeng Zheng Signed-off-by: Hyukjin Kwon --- python/pyspark/ml/tests/connect/test_connect_classification.py | 7 +++ 1 file changed, 7 insertions(+) diff --git a/python/pyspark/ml/tests/connect/test_connect_classification.py b/python/pyspark/ml/tests/connect/test_connect_classification.py index 6ad47322234..f3e621c19f0 100644 --- a/python/pyspark/ml/tests/connect/test_connect_classification.py +++ b/python/pyspark/ml/tests/connect/test_connect_classification.py @@ -20,7 +20,14 @@ import unittest from pyspark.sql import SparkSession from pyspark.ml.tests.connect.test_legacy_mode_classification import ClassificationTestsMixin +have_torch = True +try: +import torch # noqa: F401 +except ImportError: +have_torch = False + +@unittest.skipIf(not have_torch, "torch is required") class ClassificationTestsOnConnect(ClassificationTestsMixin, unittest.TestCase): def setUp(self) -> None: self.spark = ( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.5 updated: [SPARK-44701][PYTHON][TESTS] Skip `ClassificationTestsOnConnect` when `torch` is not installed
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 7ecbdfa2fdb [SPARK-44701][PYTHON][TESTS] Skip `ClassificationTestsOnConnect` when `torch` is not installed 7ecbdfa2fdb is described below commit 7ecbdfa2fdb5d698abd6d09a2d7fd3e81c05d4d1 Author: Ruifeng Zheng AuthorDate: Mon Aug 7 20:46:38 2023 +0900 [SPARK-44701][PYTHON][TESTS] Skip `ClassificationTestsOnConnect` when `torch` is not installed ### What changes were proposed in this pull request? Skip `ClassificationTestsOnConnect` when `torch` is not installed ### Why are the changes needed? we moved torch on connect tests to `pyspark_ml_connect`, so module `pyspark_connect` won't have `torch` to fix https://github.com/apache/spark/actions/runs/5776211318/job/15655104006 in 3.5 daily GA: ``` Starting test(python3.9): pyspark.ml.tests.connect.test_connect_classification (temp output: /__w/spark/spark/python/target/fbb6a495-df65-4334-8c04-4befc9ee81df/python3.9__pyspark.ml.tests.connect.test_connect_classification__jp1htw6f.log) Traceback (most recent call last): File "/usr/lib/python3.9/runpy.py", line 197, in _run_module_as_main return _run_code(code, main_globals, None, File "/usr/lib/python3.9/runpy.py", line 87, in _run_code exec(code, run_globals) File "/__w/spark/spark/python/pyspark/ml/tests/connect/test_connect_classification.py", line 21, in from pyspark.ml.tests.connect.test_legacy_mode_classification import ClassificationTestsMixin File "/__w/spark/spark/python/pyspark/ml/tests/connect/test_legacy_mode_classification.py", line 22, in from pyspark.ml.connect.classification import ( File "/__w/spark/spark/python/pyspark/ml/connect/classification.py", line 46, in import torch ModuleNotFoundError: No module named 'torch' ``` ### Does this PR introduce _any_ user-facing change? no, test-only ### How was this patch tested? CI Closes #42375 from zhengruifeng/torch_skip. Authored-by: Ruifeng Zheng Signed-off-by: Hyukjin Kwon (cherry picked from commit 433fb2af8a3ca239958cb7b006e2924ecfac0d56) Signed-off-by: Hyukjin Kwon --- python/pyspark/ml/tests/connect/test_connect_classification.py | 7 +++ 1 file changed, 7 insertions(+) diff --git a/python/pyspark/ml/tests/connect/test_connect_classification.py b/python/pyspark/ml/tests/connect/test_connect_classification.py index 6ad47322234..f3e621c19f0 100644 --- a/python/pyspark/ml/tests/connect/test_connect_classification.py +++ b/python/pyspark/ml/tests/connect/test_connect_classification.py @@ -20,7 +20,14 @@ import unittest from pyspark.sql import SparkSession from pyspark.ml.tests.connect.test_legacy_mode_classification import ClassificationTestsMixin +have_torch = True +try: +import torch # noqa: F401 +except ImportError: +have_torch = False + +@unittest.skipIf(not have_torch, "torch is required") class ClassificationTestsOnConnect(ClassificationTestsMixin, unittest.TestCase): def setUp(self) -> None: self.spark = ( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (1f10cc4a594 -> f139733b92d)
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 1f10cc4a594 [SPARK-44628][SQL] Clear some unused codes in "***Errors" and extract some common logic add f139733b92d [SPARK-42321][SQL] Assign name to _LEGACY_ERROR_TEMP_2133 No new revisions were added by this update. Summary of changes: .../utils/src/main/resources/error/error-classes.json | 10 +- ...ditions-malformed-record-in-parsing-error-class.md | 4 .../spark/sql/catalyst/json/JacksonParser.scala | 8 .../spark/sql/catalyst/util/BadRecordException.scala | 9 + .../spark/sql/catalyst/util/FailureSafeParser.scala | 3 +++ .../spark/sql/errors/QueryExecutionErrors.scala | 19 --- .../spark/sql/errors/QueryExecutionErrorsSuite.scala | 17 + 7 files changed, 54 insertions(+), 16 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-44628][SQL] Clear some unused codes in "***Errors" and extract some common logic
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 1f10cc4a594 [SPARK-44628][SQL] Clear some unused codes in "***Errors" and extract some common logic 1f10cc4a594 is described below commit 1f10cc4a59457ed0de0fd4dc0a1c61514d77261a Author: panbingkun AuthorDate: Mon Aug 7 12:01:47 2023 +0500 [SPARK-44628][SQL] Clear some unused codes in "***Errors" and extract some common logic ### What changes were proposed in this pull request? The pr aims to clear some unused codes in "***Errors" and extract some common logic. ### Why are the changes needed? Make code clear. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA. Closes #42238 from panbingkun/clear_error. Authored-by: panbingkun Signed-off-by: Max Gekk --- .../apache/spark/sql/errors/DataTypeErrors.scala | 18 ++--- .../apache/spark/sql/errors/QueryErrorsBase.scala | 6 +- .../spark/sql/errors/QueryExecutionErrors.scala| 86 -- 3 files changed, 10 insertions(+), 100 deletions(-) diff --git a/sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrors.scala b/sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrors.scala index 7a34a386cd8..5e52e283338 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrors.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrors.scala @@ -192,15 +192,7 @@ private[sql] object DataTypeErrors extends DataTypeErrorsBase { decimalPrecision: Int, decimalScale: Int, context: SQLQueryContext = null): ArithmeticException = { -new SparkArithmeticException( - errorClass = "NUMERIC_VALUE_OUT_OF_RANGE", - messageParameters = Map( -"value" -> value.toPlainString, -"precision" -> decimalPrecision.toString, -"scale" -> decimalScale.toString, -"config" -> toSQLConf("spark.sql.ansi.enabled")), - context = getQueryContext(context), - summary = getSummary(context)) +numericValueOutOfRange(value, decimalPrecision, decimalScale, context) } def cannotChangeDecimalPrecisionError( @@ -208,6 +200,14 @@ private[sql] object DataTypeErrors extends DataTypeErrorsBase { decimalPrecision: Int, decimalScale: Int, context: SQLQueryContext = null): ArithmeticException = { +numericValueOutOfRange(value, decimalPrecision, decimalScale, context) + } + + private def numericValueOutOfRange( + value: Decimal, + decimalPrecision: Int, + decimalScale: Int, + context: SQLQueryContext): ArithmeticException = { new SparkArithmeticException( errorClass = "NUMERIC_VALUE_OUT_OF_RANGE", messageParameters = Map( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryErrorsBase.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryErrorsBase.scala index db256fbee87..26600117a0c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryErrorsBase.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryErrorsBase.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.errors import org.apache.spark.sql.catalyst.expressions.{Expression, Literal} -import org.apache.spark.sql.catalyst.util.{toPrettySQL, QuotingUtils} +import org.apache.spark.sql.catalyst.util.toPrettySQL import org.apache.spark.sql.types.{DataType, DoubleType, FloatType} /** @@ -55,10 +55,6 @@ private[sql] trait QueryErrorsBase extends DataTypeErrorsBase { quoteByDefault(toPrettySQL(e)) } - def toSQLSchema(schema: String): String = { -QuotingUtils.toSQLSchema(schema) - } - // Converts an error class parameter to its SQL representation def toSQLValue(v: Any, t: DataType): String = Literal.create(v, t) match { case Literal(null, _) => "NULL" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 45b5d6b6692..f960a091ec0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -32,7 +32,6 @@ import org.apache.spark._ import org.apache.spark.launcher.SparkLauncher import org.apache.spark.memory.SparkOutOfMemoryError import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.ScalaReflection.Schema import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedGenerator import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable} @@ -183,10 +182,6 @@ private[sql] object QueryExecutionErrors extends