[spark] branch branch-3.5 updated: [SPARK-43968][PYTHON][3.5] Improve error messages for Python UDTFs with wrong number of outputs
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 7059b69e67d [SPARK-43968][PYTHON][3.5] Improve error messages for Python UDTFs with wrong number of outputs 7059b69e67d is described below commit 7059b69e67db8126dafc3d4b1f3b39e947c4c3ca Author: allisonwang-db AuthorDate: Fri Jul 28 15:30:17 2023 +0900 [SPARK-43968][PYTHON][3.5] Improve error messages for Python UDTFs with wrong number of outputs ### What changes were proposed in this pull request? This PR cherry-picks 7194ce9263fe1683c039a1aaf9462657b1672a99. It improves the error messages for Python UDTFs when the number of outputs mismatches the number of outputs specified in the return type of the UDTFs. ### Why are the changes needed? To make Python UDTFs more user-friendly. ### Does this PR introduce _any_ user-facing change? Yes. This PR improves the error messages. Before this change, the error thrown by Spark will be a java IllegalStateException: ``` java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema ``` After this PR, it will throw a clearer error message with an error class: ``` [UDTF_RETURN_SCHEMA_MISMATCH] The number of columns in the result does not match the specified schema ``` ### How was this patch tested? Existing tests and new unit tests. Closes #42192 from allisonwang-db/spark-43968-3.5. Authored-by: allisonwang-db Signed-off-by: Hyukjin Kwon --- python/pyspark/errors/error_classes.py | 5 + python/pyspark/sql/connect/udtf.py | 4 +- .../pyspark/sql/tests/connect/test_parity_udtf.py | 50 python/pyspark/sql/tests/test_udtf.py | 133 +++-- python/pyspark/sql/udtf.py | 9 +- python/pyspark/worker.py | 22 +++- 6 files changed, 99 insertions(+), 124 deletions(-) diff --git a/python/pyspark/errors/error_classes.py b/python/pyspark/errors/error_classes.py index b1bf6b47af9..f6411fac1da 100644 --- a/python/pyspark/errors/error_classes.py +++ b/python/pyspark/errors/error_classes.py @@ -320,6 +320,11 @@ ERROR_CLASSES_JSON = """ "The eval type for the UDTF '' is invalid. It must be one of ." ] }, + "INVALID_UDTF_HANDLER_TYPE" : { +"message" : [ + "The UDTF is invalid. The function handler must be a class, but got ''. Please provide a class as the function handler." +] + }, "INVALID_UDTF_NO_EVAL" : { "message" : [ "The UDTF '' is invalid. It does not implement the required 'eval' method. Please implement the 'eval' method in '' and try again." diff --git a/python/pyspark/sql/connect/udtf.py b/python/pyspark/sql/connect/udtf.py index 1fe8e1024ee..3747e37459e 100644 --- a/python/pyspark/sql/connect/udtf.py +++ b/python/pyspark/sql/connect/udtf.py @@ -124,6 +124,8 @@ class UserDefinedTableFunction: evalType: int = PythonEvalType.SQL_TABLE_UDF, deterministic: bool = True, ) -> None: +_validate_udtf_handler(func) + self.func = func self.returnType: DataType = ( UnparsedDataType(returnType) if isinstance(returnType, str) else returnType @@ -132,8 +134,6 @@ class UserDefinedTableFunction: self.evalType = evalType self.deterministic = deterministic -_validate_udtf_handler(func) - def _build_common_inline_user_defined_table_function( self, *cols: "ColumnOrName" ) -> CommonInlineUserDefinedTableFunction: diff --git a/python/pyspark/sql/tests/connect/test_parity_udtf.py b/python/pyspark/sql/tests/connect/test_parity_udtf.py index e18e116e003..355f5288d2c 100644 --- a/python/pyspark/sql/tests/connect/test_parity_udtf.py +++ b/python/pyspark/sql/tests/connect/test_parity_udtf.py @@ -54,56 +54,6 @@ class UDTFParityTests(BaseUDTFTestsMixin, ReusedConnectTestCase): ): TestUDTF(lit(1)).collect() -def test_udtf_with_wrong_num_output(self): -err_msg = ( -"java.lang.IllegalStateException: Input row doesn't have expected number of " -+ "values required by the schema." -) - -@udtf(returnType="a: int, b: int") -class TestUDTF: -def eval(self, a: int): -yield a, - -with self.assertRaisesRegex(SparkConnectGrpcException, err_msg): -TestUDTF(lit(1)).collect() - -@udtf(returnType="a: int") -class TestUDTF: -def eval(self, a: int): -yield a, a + 1 - -with self.assertRaisesRegex(SparkConnectGrpcException, err_msg): -TestUDTF(lit(1)).collect() - -def test_udtf_terminate_with_wrong_num_output(self
[spark] branch master updated: [SPARK-44548][PYTHON] Add support for pandas-on-Spark DataFrame assertDataFrameEqual
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 7c1ad5bb60c [SPARK-44548][PYTHON] Add support for pandas-on-Spark DataFrame assertDataFrameEqual 7c1ad5bb60c is described below commit 7c1ad5bb60c88c1c659c131e5119aab1f8212af5 Author: Amanda Liu AuthorDate: Fri Jul 28 14:41:43 2023 +0900 [SPARK-44548][PYTHON] Add support for pandas-on-Spark DataFrame assertDataFrameEqual ### What changes were proposed in this pull request? This PR adds support for pandas-on-Spark DataFrame for the testing util, `assertDataFrameEqual` ### Why are the changes needed? The change allows users to call the same PySpark API for both Spark and pandas DataFrames. ### Does this PR introduce _any_ user-facing change? Yes, the PR affects the user-facing util `assertDataFrameEqual` ### How was this patch tested? Added tests to `python/pyspark/sql/tests/test_utils.py` and `python/pyspark/sql/tests/connect/test_utils.py` and existing pandas util tests. Closes #42158 from asl3/pandas-or-pyspark-df. Authored-by: Amanda Liu Signed-off-by: Hyukjin Kwon --- dev/sparktestsupport/modules.py | 1 + python/docs/source/reference/pyspark.testing.rst | 1 + python/pyspark/errors/error_classes.py | 42 ++ python/pyspark/pandas/tests/test_utils.py| 171 +++- python/pyspark/sql/tests/test_utils.py | 60 ++- python/pyspark/testing/__init__.py | 4 +- python/pyspark/testing/pandasutils.py| 506 +-- python/pyspark/testing/utils.py | 82 +++- 8 files changed, 689 insertions(+), 178 deletions(-) diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 3cfd82c3d31..79c3f8f26b1 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -514,6 +514,7 @@ pyspark_testing = Module( python_test_goals=[ # doctests "pyspark.testing.utils", +"pyspark.testing.pandasutils", ], ) diff --git a/python/docs/source/reference/pyspark.testing.rst b/python/docs/source/reference/pyspark.testing.rst index 7a6b6cc0d70..96b0c72a7bb 100644 --- a/python/docs/source/reference/pyspark.testing.rst +++ b/python/docs/source/reference/pyspark.testing.rst @@ -26,4 +26,5 @@ Testing :toctree: api/ assertDataFrameEqual +assertPandasOnSparkEqual assertSchemaEqual diff --git a/python/pyspark/errors/error_classes.py b/python/pyspark/errors/error_classes.py index f4b643f1d32..5ecba294d0c 100644 --- a/python/pyspark/errors/error_classes.py +++ b/python/pyspark/errors/error_classes.py @@ -164,6 +164,42 @@ ERROR_CLASSES_JSON = """ "Remote client cannot create a SparkContext. Create SparkSession instead." ] }, + "DIFFERENT_PANDAS_DATAFRAME" : { +"message" : [ + "DataFrames are not almost equal:", + "Left: ", + "", + "Right: ", + "" +] + }, + "DIFFERENT_PANDAS_INDEX" : { +"message" : [ + "Indices are not almost equal:", + "Left: ", + "", + "Right: ", + "" +] + }, + "DIFFERENT_PANDAS_MULTIINDEX" : { +"message" : [ + "MultiIndices are not almost equal:", + "Left: ", + "", + "Right: ", + "" +] + }, + "DIFFERENT_PANDAS_SERIES" : { +"message" : [ + "Series are not almost equal:", + "Left: ", + "", + "Right: ", + "" +] + }, "DIFFERENT_ROWS" : { "message" : [ "" @@ -233,6 +269,12 @@ ERROR_CLASSES_JSON = """ "NumPy array input should be of dimensions." ] }, + "INVALID_PANDAS_ON_SPARK_COMPARISON" : { +"message" : [ + "Expected two pandas-on-Spark DataFrames", + "but got actual: and expected: " +] + }, "INVALID_PANDAS_UDF" : { "message" : [ "Invalid function: " diff --git a/python/pyspark/pandas/tests/test_utils.py b/python/pyspark/pandas/tests/test_utils.py index 35ebcf17a0f..de7b0449dae 100644 --- a/python/pyspark/pandas/tests/test_utils.py +++ b/python/pyspark/pandas/tests/test_utils.py @@ -16,6 +16,7 @@ # import pandas as pd +from typing import Union from pyspark.pandas.indexes.base import Index from pyspark.pandas.utils import ( @@ -25,8 +26,14 @@ from pyspark.pandas.utils import ( validate_index_loc, validate_mode, ) -from pyspark.testing.pandasutils import PandasOnSparkTestCase +from pyspark.testing.pandasutils import ( +PandasOnSparkTestCase, +assertPandasOnSparkEqual, +_assert_pandas_equal, +_assert_pandas_almost_equal, +) from pyspark.testing.sqlutils import SQLTestUtils +from pyspark.errors import PySparkAssertionError some_global_variable = 0 @@ -105,6 +112,168 @@ class UtilsTestsMixin: with self.assertRaisesRe
[spark] branch branch-3.5 updated: [SPARK-44361][SQL][FOLLOWUP] Use PartitionEvaluator API in MapInBatchExec
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 75679f72173 [SPARK-44361][SQL][FOLLOWUP] Use PartitionEvaluator API in MapInBatchExec 75679f72173 is described below commit 75679f72173a507faf94353d37a6d223ff23c9b4 Author: Vinod KC AuthorDate: Fri Jul 28 12:49:13 2023 +0800 [SPARK-44361][SQL][FOLLOWUP] Use PartitionEvaluator API in MapInBatchExec ### What changes were proposed in this pull request? This is a follow-up of https://github.com/apache/spark/pull/42024, to set the partition index correctly even if it's not used for now. ### Why are the changes needed? future-proof ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing tests Closes #42189 from vinodkc/br_SPARK-44361_Followup. Authored-by: Vinod KC Signed-off-by: Wenchen Fan (cherry picked from commit 3cf88cb6c42e802fc4828c397df61623663be9b0) Signed-off-by: Wenchen Fan --- .../org/apache/spark/sql/execution/python/MapInBatchExec.scala| 8 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchExec.scala index 4a47c2089d6..8db389f0266 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchExec.scala @@ -65,8 +65,8 @@ trait MapInBatchExec extends UnaryExecNode with PythonSQLMetrics { if (conf.usePartitionEvaluator) { rddBarrier.mapPartitionsWithEvaluator(evaluatorFactory) } else { -rddBarrier.mapPartitions { iter => - evaluatorFactory.createEvaluator().eval(0, iter) +rddBarrier.mapPartitionsWithIndex { (index, iter) => + evaluatorFactory.createEvaluator().eval(index, iter) } } } else { @@ -74,8 +74,8 @@ trait MapInBatchExec extends UnaryExecNode with PythonSQLMetrics { if (conf.usePartitionEvaluator) { inputRdd.mapPartitionsWithEvaluator(evaluatorFactory) } else { -inputRdd.mapPartitionsInternal { iter => - evaluatorFactory.createEvaluator().eval(0, iter) +inputRdd.mapPartitionsWithIndexInternal { (index, iter) => + evaluatorFactory.createEvaluator().eval(index, iter) } } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-44361][SQL][FOLLOWUP] Use PartitionEvaluator API in MapInBatchExec
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 3cf88cb6c42 [SPARK-44361][SQL][FOLLOWUP] Use PartitionEvaluator API in MapInBatchExec 3cf88cb6c42 is described below commit 3cf88cb6c42e802fc4828c397df61623663be9b0 Author: Vinod KC AuthorDate: Fri Jul 28 12:49:13 2023 +0800 [SPARK-44361][SQL][FOLLOWUP] Use PartitionEvaluator API in MapInBatchExec ### What changes were proposed in this pull request? This is a follow-up of https://github.com/apache/spark/pull/42024, to set the partition index correctly even if it's not used for now. ### Why are the changes needed? future-proof ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing tests Closes #42189 from vinodkc/br_SPARK-44361_Followup. Authored-by: Vinod KC Signed-off-by: Wenchen Fan --- .../org/apache/spark/sql/execution/python/MapInBatchExec.scala| 8 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchExec.scala index 4a47c2089d6..8db389f0266 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchExec.scala @@ -65,8 +65,8 @@ trait MapInBatchExec extends UnaryExecNode with PythonSQLMetrics { if (conf.usePartitionEvaluator) { rddBarrier.mapPartitionsWithEvaluator(evaluatorFactory) } else { -rddBarrier.mapPartitions { iter => - evaluatorFactory.createEvaluator().eval(0, iter) +rddBarrier.mapPartitionsWithIndex { (index, iter) => + evaluatorFactory.createEvaluator().eval(index, iter) } } } else { @@ -74,8 +74,8 @@ trait MapInBatchExec extends UnaryExecNode with PythonSQLMetrics { if (conf.usePartitionEvaluator) { inputRdd.mapPartitionsWithEvaluator(evaluatorFactory) } else { -inputRdd.mapPartitionsInternal { iter => - evaluatorFactory.createEvaluator().eval(0, iter) +inputRdd.mapPartitionsWithIndexInternal { (index, iter) => + evaluatorFactory.createEvaluator().eval(index, iter) } } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.5 updated: [SPARK-42098][SQL] Fix ResolveInlineTables can not handle with RuntimeReplaceable expression
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 81d56603b30 [SPARK-42098][SQL] Fix ResolveInlineTables can not handle with RuntimeReplaceable expression 81d56603b30 is described below commit 81d56603b30375790ca0203ed877d5bcf0924c77 Author: Jia Fan AuthorDate: Fri Jul 28 12:44:02 2023 +0800 [SPARK-42098][SQL] Fix ResolveInlineTables can not handle with RuntimeReplaceable expression ### What changes were proposed in this pull request? Fix `ResolveInlineTables` can not handle with `RuntimeReplaceable` expression eg: ```sql select * from values (try_add(5, 0)) ``` The error will throw: ```java [INVALID_INLINE_TABLE.CANNOT_EVALUATE_EXPRESSION_IN_INLINE_TABLE] Invalid inline table. Cannot evaluate the expression "try_add(5, 0)" in inline table definition.; line 1 pos 22 ``` ### Why are the changes needed? Fix the bug ResolveInlineTables can not handle with RuntimeReplaceable expression ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? add new test. Closes #42110 from Hisoka-X/SPARK-42098_inline_table_replaceable. Authored-by: Jia Fan Signed-off-by: Wenchen Fan (cherry picked from commit f235c9f622ddeb4c6a5cce8903130709dcd2217c) Signed-off-by: Wenchen Fan --- .../catalyst/analysis/ResolveInlineTables.scala| 6 -- .../analyzer-results/inline-table.sql.out | 21 +++ .../resources/sql-tests/inputs/inline-table.sql| 5 + .../sql-tests/results/inline-table.sql.out | 24 ++ 4 files changed, 54 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala index cf706171cd9..c203f9fa39d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala @@ -21,6 +21,7 @@ import scala.util.control.NonFatal import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.AliasHelper +import org.apache.spark.sql.catalyst.optimizer.ReplaceExpressions import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.AlwaysProcess @@ -36,8 +37,9 @@ object ResolveInlineTables extends Rule[LogicalPlan] with CastSupport with Alias AlwaysProcess.fn, ruleId) { case table: UnresolvedInlineTable if table.expressionsResolved => validateInputDimension(table) - validateInputEvaluable(table) - convert(table) + val newTable = ReplaceExpressions(table).asInstanceOf[UnresolvedInlineTable] + validateInputEvaluable(newTable) + convert(newTable) } /** diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/inline-table.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/inline-table.sql.out index f3e6eb4d8dc..2a17f092a06 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/inline-table.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/inline-table.sql.out @@ -220,3 +220,24 @@ select * from values (timestamp('1991-12-06 00:00:00.0'), array(timestamp('1991- Project [a#x, b#x] +- SubqueryAlias data +- LocalRelation [a#x, b#x] + + +-- !query +select * from values (try_add(5, 0)) +-- !query analysis +Project [col1#x] ++- LocalRelation [col1#x] + + +-- !query +select * from values (try_divide(5, 0)) +-- !query analysis +Project [col1#x] ++- LocalRelation [col1#x] + + +-- !query +select * from values (10 + try_divide(5, 0)) +-- !query analysis +Project [col1#x] ++- LocalRelation [col1#x] diff --git a/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql index fd8bb2d837d..6867248f576 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql @@ -55,3 +55,8 @@ select * from values ("one", count(1)), ("two", 2) as data(a, b); -- string to timestamp select * from values (timestamp('1991-12-06 00:00:00.0'), array(timestamp('1991-12-06 01:00:00.0'), timestamp('1991-12-06 12:00:00.0'))) as data(a, b); + +-- ReplaceExpressions as row +select * from values (try_add(5, 0)); +select * from values (try_divide(5, 0)); +select * from values (10 + try_divide(5, 0)); diff --git a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out b/sql/core/src/test/resources/sql-tests/results/i
[spark] branch master updated: [SPARK-42098][SQL] Fix ResolveInlineTables can not handle with RuntimeReplaceable expression
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f235c9f622d [SPARK-42098][SQL] Fix ResolveInlineTables can not handle with RuntimeReplaceable expression f235c9f622d is described below commit f235c9f622ddeb4c6a5cce8903130709dcd2217c Author: Jia Fan AuthorDate: Fri Jul 28 12:44:02 2023 +0800 [SPARK-42098][SQL] Fix ResolveInlineTables can not handle with RuntimeReplaceable expression ### What changes were proposed in this pull request? Fix `ResolveInlineTables` can not handle with `RuntimeReplaceable` expression eg: ```sql select * from values (try_add(5, 0)) ``` The error will throw: ```java [INVALID_INLINE_TABLE.CANNOT_EVALUATE_EXPRESSION_IN_INLINE_TABLE] Invalid inline table. Cannot evaluate the expression "try_add(5, 0)" in inline table definition.; line 1 pos 22 ``` ### Why are the changes needed? Fix the bug ResolveInlineTables can not handle with RuntimeReplaceable expression ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? add new test. Closes #42110 from Hisoka-X/SPARK-42098_inline_table_replaceable. Authored-by: Jia Fan Signed-off-by: Wenchen Fan --- .../catalyst/analysis/ResolveInlineTables.scala| 6 -- .../analyzer-results/inline-table.sql.out | 21 +++ .../resources/sql-tests/inputs/inline-table.sql| 5 + .../sql-tests/results/inline-table.sql.out | 24 ++ 4 files changed, 54 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala index cf706171cd9..c203f9fa39d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala @@ -21,6 +21,7 @@ import scala.util.control.NonFatal import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.AliasHelper +import org.apache.spark.sql.catalyst.optimizer.ReplaceExpressions import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.AlwaysProcess @@ -36,8 +37,9 @@ object ResolveInlineTables extends Rule[LogicalPlan] with CastSupport with Alias AlwaysProcess.fn, ruleId) { case table: UnresolvedInlineTable if table.expressionsResolved => validateInputDimension(table) - validateInputEvaluable(table) - convert(table) + val newTable = ReplaceExpressions(table).asInstanceOf[UnresolvedInlineTable] + validateInputEvaluable(newTable) + convert(newTable) } /** diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/inline-table.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/inline-table.sql.out index f3e6eb4d8dc..2a17f092a06 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/inline-table.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/inline-table.sql.out @@ -220,3 +220,24 @@ select * from values (timestamp('1991-12-06 00:00:00.0'), array(timestamp('1991- Project [a#x, b#x] +- SubqueryAlias data +- LocalRelation [a#x, b#x] + + +-- !query +select * from values (try_add(5, 0)) +-- !query analysis +Project [col1#x] ++- LocalRelation [col1#x] + + +-- !query +select * from values (try_divide(5, 0)) +-- !query analysis +Project [col1#x] ++- LocalRelation [col1#x] + + +-- !query +select * from values (10 + try_divide(5, 0)) +-- !query analysis +Project [col1#x] ++- LocalRelation [col1#x] diff --git a/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql index fd8bb2d837d..6867248f576 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql @@ -55,3 +55,8 @@ select * from values ("one", count(1)), ("two", 2) as data(a, b); -- string to timestamp select * from values (timestamp('1991-12-06 00:00:00.0'), array(timestamp('1991-12-06 01:00:00.0'), timestamp('1991-12-06 12:00:00.0'))) as data(a, b); + +-- ReplaceExpressions as row +select * from values (try_add(5, 0)); +select * from values (try_divide(5, 0)); +select * from values (10 + try_divide(5, 0)); diff --git a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out index e735dd0dd5a..709d7ab73f6 100644 --- a/sql/core/src/test/resources/sql-tests/results/inl
[spark] branch branch-3.5 updated: [SPARK-44287][SQL][FOLLOWUP] Set partition index correctly
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 4f7187885a6 [SPARK-44287][SQL][FOLLOWUP] Set partition index correctly 4f7187885a6 is described below commit 4f7187885a6c6a0c944af97a493a42dabca3cc1b Author: Wenchen Fan AuthorDate: Fri Jul 28 11:36:59 2023 +0800 [SPARK-44287][SQL][FOLLOWUP] Set partition index correctly ### What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/41839, to set the partition index correctly even if it's not used for now. It also contains a few code cleanup. ### Why are the changes needed? future-proof ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests Closes #42185 from cloud-fan/follow. Authored-by: Wenchen Fan Signed-off-by: Wenchen Fan (cherry picked from commit bf1bbc514ebf25fd8041f566dc9d13593c307931) Signed-off-by: Wenchen Fan --- .../org/apache/spark/sql/execution/Columnar.scala | 41 ++ .../spark/sql/SparkSessionExtensionSuite.scala | 64 ++ .../spark/sql/execution/SparkPlanSuite.scala | 26 - 3 files changed, 57 insertions(+), 74 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala index a2029816c23..fc879f7e98f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala @@ -85,20 +85,16 @@ case class ColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition w ) override def doExecute(): RDD[InternalRow] = { -val numOutputRows = longMetric("numOutputRows") -val numInputBatches = longMetric("numInputBatches") -val evaluatorFactory = - new ColumnarToRowEvaluatorFactory( -child.output, -numOutputRows, -numInputBatches) - +val evaluatorFactory = new ColumnarToRowEvaluatorFactory( + child.output, + longMetric("numOutputRows"), + longMetric("numInputBatches")) if (conf.usePartitionEvaluator) { child.executeColumnar().mapPartitionsWithEvaluator(evaluatorFactory) } else { - child.executeColumnar().mapPartitionsInternal { batches => + child.executeColumnar().mapPartitionsWithIndexInternal { (index, batches) => val evaluator = evaluatorFactory.createEvaluator() -evaluator.eval(0, batches) +evaluator.eval(index, batches) } } } @@ -454,25 +450,20 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition { ) override def doExecuteColumnar(): RDD[ColumnarBatch] = { -val numInputRows = longMetric("numInputRows") -val numOutputBatches = longMetric("numOutputBatches") -// Instead of creating a new config we are reusing columnBatchSize. In the future if we do -// combine with some of the Arrow conversion tools we will need to unify some of the configs. -val numRows = conf.columnBatchSize -val evaluatorFactory = - new RowToColumnarEvaluatorFactory( -conf.offHeapColumnVectorEnabled, -numRows, -schema, -numInputRows, -numOutputBatches) - +val evaluatorFactory = new RowToColumnarEvaluatorFactory( + conf.offHeapColumnVectorEnabled, + // Instead of creating a new config we are reusing columnBatchSize. In the future if we do + // combine with some of the Arrow conversion tools we will need to unify some of the configs. + conf.columnBatchSize, + schema, + longMetric("numInputRows"), + longMetric("numOutputBatches")) if (conf.usePartitionEvaluator) { child.execute().mapPartitionsWithEvaluator(evaluatorFactory) } else { - child.execute().mapPartitionsInternal { rowIterator => + child.execute().mapPartitionsWithIndexInternal { (index, rowIterator) => val evaluator = evaluatorFactory.createEvaluator() -evaluator.eval(0, rowIterator) +evaluator.eval(index, rowIterator) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala index 043a3b1a7e5..d4a871c00a1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala @@ -279,40 +279,36 @@ class SparkSessionExtensionSuite extends SparkFunSuite with SQLHelper { } withSession(extensions) { session => session.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED, enableAQE) -
[spark] branch master updated: [SPARK-44287][SQL][FOLLOWUP] Set partition index correctly
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new bf1bbc514eb [SPARK-44287][SQL][FOLLOWUP] Set partition index correctly bf1bbc514eb is described below commit bf1bbc514ebf25fd8041f566dc9d13593c307931 Author: Wenchen Fan AuthorDate: Fri Jul 28 11:36:59 2023 +0800 [SPARK-44287][SQL][FOLLOWUP] Set partition index correctly ### What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/41839, to set the partition index correctly even if it's not used for now. It also contains a few code cleanup. ### Why are the changes needed? future-proof ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests Closes #42185 from cloud-fan/follow. Authored-by: Wenchen Fan Signed-off-by: Wenchen Fan --- .../org/apache/spark/sql/execution/Columnar.scala | 41 ++ .../spark/sql/SparkSessionExtensionSuite.scala | 64 ++ .../spark/sql/execution/SparkPlanSuite.scala | 26 - 3 files changed, 57 insertions(+), 74 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala index a2029816c23..fc879f7e98f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala @@ -85,20 +85,16 @@ case class ColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition w ) override def doExecute(): RDD[InternalRow] = { -val numOutputRows = longMetric("numOutputRows") -val numInputBatches = longMetric("numInputBatches") -val evaluatorFactory = - new ColumnarToRowEvaluatorFactory( -child.output, -numOutputRows, -numInputBatches) - +val evaluatorFactory = new ColumnarToRowEvaluatorFactory( + child.output, + longMetric("numOutputRows"), + longMetric("numInputBatches")) if (conf.usePartitionEvaluator) { child.executeColumnar().mapPartitionsWithEvaluator(evaluatorFactory) } else { - child.executeColumnar().mapPartitionsInternal { batches => + child.executeColumnar().mapPartitionsWithIndexInternal { (index, batches) => val evaluator = evaluatorFactory.createEvaluator() -evaluator.eval(0, batches) +evaluator.eval(index, batches) } } } @@ -454,25 +450,20 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition { ) override def doExecuteColumnar(): RDD[ColumnarBatch] = { -val numInputRows = longMetric("numInputRows") -val numOutputBatches = longMetric("numOutputBatches") -// Instead of creating a new config we are reusing columnBatchSize. In the future if we do -// combine with some of the Arrow conversion tools we will need to unify some of the configs. -val numRows = conf.columnBatchSize -val evaluatorFactory = - new RowToColumnarEvaluatorFactory( -conf.offHeapColumnVectorEnabled, -numRows, -schema, -numInputRows, -numOutputBatches) - +val evaluatorFactory = new RowToColumnarEvaluatorFactory( + conf.offHeapColumnVectorEnabled, + // Instead of creating a new config we are reusing columnBatchSize. In the future if we do + // combine with some of the Arrow conversion tools we will need to unify some of the configs. + conf.columnBatchSize, + schema, + longMetric("numInputRows"), + longMetric("numOutputBatches")) if (conf.usePartitionEvaluator) { child.execute().mapPartitionsWithEvaluator(evaluatorFactory) } else { - child.execute().mapPartitionsInternal { rowIterator => + child.execute().mapPartitionsWithIndexInternal { (index, rowIterator) => val evaluator = evaluatorFactory.createEvaluator() -evaluator.eval(0, rowIterator) +evaluator.eval(index, rowIterator) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala index 043a3b1a7e5..d4a871c00a1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala @@ -279,40 +279,36 @@ class SparkSessionExtensionSuite extends SparkFunSuite with SQLHelper { } withSession(extensions) { session => session.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED, enableAQE) - Seq(true, false).foreach { enableEvaluator => -withSQLConf(SQLConf.USE_PARTITION_EVALUATOR.key -> en
[spark] branch branch-3.5 updated: [SPARK-44558][CONNECT][PYTHON] Export Spark Log Level
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 4f90c3232ca [SPARK-44558][CONNECT][PYTHON] Export Spark Log Level 4f90c3232ca is described below commit 4f90c3232cabd8bef05f46fc863d7a8f1f968ee4 Author: Alice Sayutina AuthorDate: Fri Jul 28 11:34:16 2023 +0900 [SPARK-44558][CONNECT][PYTHON] Export Spark Log Level ### What changes were proposed in this pull request? Export Spark Connect Log Level in pyspark. ### Why are the changes needed? This is convenient for software dependent on spark-connect so it can be possible to enable debug logging just in one place. ### Does this PR introduce _any_ user-facing change? New api is suggested. ### How was this patch tested? Checked it works from shell Closes #42175 from cdkrot/spark_connect_debug_logs. Lead-authored-by: Alice Sayutina Co-authored-by: Alice Sayutina Signed-off-by: Hyukjin Kwon (cherry picked from commit 9c7f613f0f10f4ee90bb29920157a32017696a43) Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/connect/client/core.py | 17 - 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/connect/client/core.py b/python/pyspark/sql/connect/client/core.py index 482482123c0..53d3d10d647 100644 --- a/python/pyspark/sql/connect/client/core.py +++ b/python/pyspark/sql/connect/client/core.py @@ -17,6 +17,7 @@ __all__ = [ "ChannelBuilder", "SparkConnectClient", +"getLogLevel", ] from pyspark.sql.connect.utils import check_dependencies @@ -104,7 +105,7 @@ def _configure_logging() -> logging.Logger: # Check the environment variables for log levels: if "SPARK_CONNECT_LOG_LEVEL" in os.environ: -logger.setLevel(os.getenv("SPARK_CONNECT_LOG_LEVEL", "error").upper()) +logger.setLevel(os.environ["SPARK_CONNECT_LOG_LEVEL"].upper()) else: logger.disabled = True return logger @@ -114,6 +115,20 @@ def _configure_logging() -> logging.Logger: logger = _configure_logging() +def getLogLevel() -> Optional[int]: +""" +This returns this log level as integer, or none (if no logging is enabled). + +Spark Connect logging can be configured with environment variable 'SPARK_CONNECT_LOG_LEVEL' + +.. versionadded:: 3.5.0 +""" + +if not logger.disabled: +return logger.level +return None + + class ChannelBuilder: """ This is a helper class that is used to create a GRPC channel based on the given - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-44558][CONNECT][PYTHON] Export Spark Log Level
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 9c7f613f0f1 [SPARK-44558][CONNECT][PYTHON] Export Spark Log Level 9c7f613f0f1 is described below commit 9c7f613f0f10f4ee90bb29920157a32017696a43 Author: Alice Sayutina AuthorDate: Fri Jul 28 11:34:16 2023 +0900 [SPARK-44558][CONNECT][PYTHON] Export Spark Log Level ### What changes were proposed in this pull request? Export Spark Connect Log Level in pyspark. ### Why are the changes needed? This is convenient for software dependent on spark-connect so it can be possible to enable debug logging just in one place. ### Does this PR introduce _any_ user-facing change? New api is suggested. ### How was this patch tested? Checked it works from shell Closes #42175 from cdkrot/spark_connect_debug_logs. Lead-authored-by: Alice Sayutina Co-authored-by: Alice Sayutina Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/connect/client/core.py | 17 - 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/connect/client/core.py b/python/pyspark/sql/connect/client/core.py index 473a00f4001..0288bbc6508 100644 --- a/python/pyspark/sql/connect/client/core.py +++ b/python/pyspark/sql/connect/client/core.py @@ -17,6 +17,7 @@ __all__ = [ "ChannelBuilder", "SparkConnectClient", +"getLogLevel", ] from pyspark.sql.connect.utils import check_dependencies @@ -104,7 +105,7 @@ def _configure_logging() -> logging.Logger: # Check the environment variables for log levels: if "SPARK_CONNECT_LOG_LEVEL" in os.environ: -logger.setLevel(os.getenv("SPARK_CONNECT_LOG_LEVEL", "error").upper()) +logger.setLevel(os.environ["SPARK_CONNECT_LOG_LEVEL"].upper()) else: logger.disabled = True return logger @@ -114,6 +115,20 @@ def _configure_logging() -> logging.Logger: logger = _configure_logging() +def getLogLevel() -> Optional[int]: +""" +This returns this log level as integer, or none (if no logging is enabled). + +Spark Connect logging can be configured with environment variable 'SPARK_CONNECT_LOG_LEVEL' + +.. versionadded:: 3.5.0 +""" + +if not logger.disabled: +return logger.level +return None + + class ChannelBuilder: """ This is a helper class that is used to create a GRPC channel based on the given - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.5 updated: [SPARK-44264][PYTHON] E2E Testing for Deepspeed
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 3c6971149dc [SPARK-44264][PYTHON] E2E Testing for Deepspeed 3c6971149dc is described below commit 3c6971149dcad55de6aa5eeeff29cb54a07acb8f Author: Mathew Jacob AuthorDate: Fri Jul 28 09:14:45 2023 +0900 [SPARK-44264][PYTHON] E2E Testing for Deepspeed ### What Changes Were Proposed This PR adds some end to end tests for the DeepspeedTorchDistributor. Due to the lack of support currently available to deepspeed, we use proxy functions that are very simple to test that the command works. For the future, once Deepspeed supports more CPU, these end to end tests should instead migrate to more actual Deepspeed workloads, such as those described in the PR comments. ### Why Do We Need These Changes Previously, we only had unit tests for helper functions. These test actual workloads that a user may use DeepspeedTorchDistributor from end to end. ### Any User Facing Changes No, these are end to end tests. ### How Was This Tested Running the tests and seeing if they pass. Closes #42118 from mathewjacob1002/gpu_e2e_tests. Authored-by: Mathew Jacob Signed-off-by: Hyukjin Kwon (cherry picked from commit 9ae33b82d78d65f58acb253c0b710b5807f9912d) Signed-off-by: Hyukjin Kwon --- dev/requirements.txt | 3 + dev/tox.ini| 1 + python/mypy.ini| 3 + .../deepspeed/tests/test_deepspeed_distributor.py | 134 - 4 files changed, 139 insertions(+), 2 deletions(-) diff --git a/dev/requirements.txt b/dev/requirements.txt index f5fe5fa071f..38a9b244710 100644 --- a/dev/requirements.txt +++ b/dev/requirements.txt @@ -65,3 +65,6 @@ torch torchvision torcheval +# DeepspeedTorchDistributor dependencies +deepspeed + diff --git a/dev/tox.ini b/dev/tox.ini index e2a77786ed4..438f82fec1e 100644 --- a/dev/tox.ini +++ b/dev/tox.ini @@ -46,6 +46,7 @@ exclude = */target/*, docs/.local_ruby_bundle/, *python/pyspark/cloudpickle/*.py, +*python/pyspark/ml/deepspeed/tests/*.py *python/docs/build/*, *python/docs/source/conf.py, *python/.eggs/*, diff --git a/python/mypy.ini b/python/mypy.ini index a845cd88bd8..4d1fc3ceb66 100644 --- a/python/mypy.ini +++ b/python/mypy.ini @@ -88,6 +88,9 @@ ignore_errors = True [mypy-pyspark.ml.torch.tests.*] ignore_errors = True +[mypy-pyspark.ml.deepspeed.tests.*] +ignore_errors = True + [mypy-pyspark.mllib.tests.*] ignore_errors = True diff --git a/python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py b/python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py index 4c4606699a3..590e541c384 100644 --- a/python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py +++ b/python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py @@ -1,4 +1,4 @@ -# +# mypy: ignore-errors # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. @@ -14,12 +14,28 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from contextlib import contextmanager import os +import shutil import sys -from typing import Any, Tuple, Dict +import textwrap +from typing import Any, Callable, Dict, Tuple import unittest +from pyspark import SparkConf, SparkContext from pyspark.ml.deepspeed.deepspeed_distributor import DeepspeedTorchDistributor +from pyspark.sql import SparkSession +from pyspark.ml.torch.tests.test_distributor import ( +get_local_mode_conf, +set_up_test_dirs, +get_distributed_mode_conf, +) + +have_deepspeed = True +try: +import deepspeed # noqa: F401 +except ImportError: +have_deepspeed = False class DeepspeedTorchDistributorUnitTests(unittest.TestCase): @@ -164,6 +180,120 @@ class DeepspeedTorchDistributorUnitTests(unittest.TestCase): self.assertEqual(distributed_cmd_args_expected, distributed_command_with_args) +def _create_basic_function() -> Callable: +# TODO: swap out with better test function +# once Deepspeed better supports CPU +def pythagoras(leg1: float, leg2: float) -> float: +import deepspeed + +print(deepspeed.__version__) +return (leg1 * leg1 + leg2 * leg2) ** 0.5 + +return pythagoras + + +@contextmanager +def _create_pytorch_training_test_file(): +# Note: when Deepspeed CPU support becomes better, +# switch in more realistic training files using Deepspeed +# optimizations + constructs +str_to_write = textwrap.dedent( +""" +import sys +
[spark] branch master updated: [SPARK-44264][PYTHON] E2E Testing for Deepspeed
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 9ae33b82d78 [SPARK-44264][PYTHON] E2E Testing for Deepspeed 9ae33b82d78 is described below commit 9ae33b82d78d65f58acb253c0b710b5807f9912d Author: Mathew Jacob AuthorDate: Fri Jul 28 09:14:45 2023 +0900 [SPARK-44264][PYTHON] E2E Testing for Deepspeed ### What Changes Were Proposed This PR adds some end to end tests for the DeepspeedTorchDistributor. Due to the lack of support currently available to deepspeed, we use proxy functions that are very simple to test that the command works. For the future, once Deepspeed supports more CPU, these end to end tests should instead migrate to more actual Deepspeed workloads, such as those described in the PR comments. ### Why Do We Need These Changes Previously, we only had unit tests for helper functions. These test actual workloads that a user may use DeepspeedTorchDistributor from end to end. ### Any User Facing Changes No, these are end to end tests. ### How Was This Tested Running the tests and seeing if they pass. Closes #42118 from mathewjacob1002/gpu_e2e_tests. Authored-by: Mathew Jacob Signed-off-by: Hyukjin Kwon --- dev/requirements.txt | 3 + dev/tox.ini| 1 + python/mypy.ini| 3 + .../deepspeed/tests/test_deepspeed_distributor.py | 134 - 4 files changed, 139 insertions(+), 2 deletions(-) diff --git a/dev/requirements.txt b/dev/requirements.txt index f5fe5fa071f..38a9b244710 100644 --- a/dev/requirements.txt +++ b/dev/requirements.txt @@ -65,3 +65,6 @@ torch torchvision torcheval +# DeepspeedTorchDistributor dependencies +deepspeed + diff --git a/dev/tox.ini b/dev/tox.ini index e2a77786ed4..438f82fec1e 100644 --- a/dev/tox.ini +++ b/dev/tox.ini @@ -46,6 +46,7 @@ exclude = */target/*, docs/.local_ruby_bundle/, *python/pyspark/cloudpickle/*.py, +*python/pyspark/ml/deepspeed/tests/*.py *python/docs/build/*, *python/docs/source/conf.py, *python/.eggs/*, diff --git a/python/mypy.ini b/python/mypy.ini index a845cd88bd8..4d1fc3ceb66 100644 --- a/python/mypy.ini +++ b/python/mypy.ini @@ -88,6 +88,9 @@ ignore_errors = True [mypy-pyspark.ml.torch.tests.*] ignore_errors = True +[mypy-pyspark.ml.deepspeed.tests.*] +ignore_errors = True + [mypy-pyspark.mllib.tests.*] ignore_errors = True diff --git a/python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py b/python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py index 4c4606699a3..590e541c384 100644 --- a/python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py +++ b/python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py @@ -1,4 +1,4 @@ -# +# mypy: ignore-errors # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. @@ -14,12 +14,28 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from contextlib import contextmanager import os +import shutil import sys -from typing import Any, Tuple, Dict +import textwrap +from typing import Any, Callable, Dict, Tuple import unittest +from pyspark import SparkConf, SparkContext from pyspark.ml.deepspeed.deepspeed_distributor import DeepspeedTorchDistributor +from pyspark.sql import SparkSession +from pyspark.ml.torch.tests.test_distributor import ( +get_local_mode_conf, +set_up_test_dirs, +get_distributed_mode_conf, +) + +have_deepspeed = True +try: +import deepspeed # noqa: F401 +except ImportError: +have_deepspeed = False class DeepspeedTorchDistributorUnitTests(unittest.TestCase): @@ -164,6 +180,120 @@ class DeepspeedTorchDistributorUnitTests(unittest.TestCase): self.assertEqual(distributed_cmd_args_expected, distributed_command_with_args) +def _create_basic_function() -> Callable: +# TODO: swap out with better test function +# once Deepspeed better supports CPU +def pythagoras(leg1: float, leg2: float) -> float: +import deepspeed + +print(deepspeed.__version__) +return (leg1 * leg1 + leg2 * leg2) ** 0.5 + +return pythagoras + + +@contextmanager +def _create_pytorch_training_test_file(): +# Note: when Deepspeed CPU support becomes better, +# switch in more realistic training files using Deepspeed +# optimizations + constructs +str_to_write = textwrap.dedent( +""" +import sys +def pythagorean_thm(x : int, y: int): # type: ignore +import deepspeed # type: ignore +
[spark] branch master updated: [SPARK-44198][CORE] Support propagation of the log level to the executors
This is an automated email from the ASF dual-hosted git repository. attilapiros pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5fc90fbd4e3 [SPARK-44198][CORE] Support propagation of the log level to the executors 5fc90fbd4e3 is described below commit 5fc90fbd4e3235fbcf038f4725037321b8234d94 Author: Vinod KC AuthorDate: Thu Jul 27 16:39:33 2023 -0700 [SPARK-44198][CORE] Support propagation of the log level to the executors ### What changes were proposed in this pull request? Currently, the **sc.setLogLevel()** method only sets the log level on the Spark driver, failing to reflect the desired log level on the executors. With _--conf **spark.log.level**_ or **sc.setLogLevel()**, spark allows tuning the log level in the driver process, but it is not reflecting the log level on executors. ### Why are the changes needed? This inconsistency can lead to difficulties in debugging and monitoring Spark applications, as log messages from the executors may not align with the expected log level set on the user code. This PR aims to propagate the log level changes to executors when sc.setLogLevel() is called or send the current log level when a new executor is getting registered ### Does this PR introduce _any_ user-facing change? No, but with this PR, both driver and executor will show same log level ### How was this patch tested? Tested manually to verify the same log levels on both driver and executor Closes #41746 from vinodkc/br_support_setloglevel_executors. Authored-by: Vinod KC Signed-off-by: attilapiros --- .../main/scala/org/apache/spark/SparkContext.scala | 11 -- .../executor/CoarseGrainedExecutorBackend.scala| 4 .../org/apache/spark/internal/config/package.scala | 8 +++ .../apache/spark/scheduler/SchedulerBackend.scala | 1 + .../cluster/CoarseGrainedClusterMessage.scala | 7 +- .../cluster/CoarseGrainedSchedulerBackend.scala| 20 - .../main/scala/org/apache/spark/util/Utils.scala | 25 +++--- 7 files changed, 69 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 26fdb86d299..f48cb32b319 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -40,7 +40,6 @@ import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, Doub import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat, TextInputFormat} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob} import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} -import org.apache.logging.log4j.Level import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.broadcast.Broadcast @@ -396,7 +395,10 @@ class SparkContext(config: SparkConf) extends Logging { require(SparkContext.VALID_LOG_LEVELS.contains(upperCased), s"Supplied level $logLevel did not match one of:" + s" ${SparkContext.VALID_LOG_LEVELS.mkString(",")}") -Utils.setLogLevel(Level.toLevel(upperCased)) +Utils.setLogLevelIfNeeded(upperCased) +if (conf.get(EXECUTOR_ALLOW_SYNC_LOG_LEVEL) && _schedulerBackend != null) { + _schedulerBackend.updateExecutorsLogLevel(upperCased) +} } try { @@ -585,6 +587,11 @@ class SparkContext(config: SparkConf) extends Logging { _dagScheduler = new DAGScheduler(this) _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet) +if (_conf.get(EXECUTOR_ALLOW_SYNC_LOG_LEVEL)) { + _conf.get(SPARK_LOG_LEVEL) +.foreach(logLevel => _schedulerBackend.updateExecutorsLogLevel(logLevel)) +} + val _executorMetricsSource = if (_conf.get(METRICS_EXECUTORMETRICS_SOURCE_ENABLED)) { Some(new ExecutorMetricsSource) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index ab238626efe..da009f5addb 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -177,6 +177,8 @@ private[spark] class CoarseGrainedExecutorBackend( case NonFatal(e) => exitExecutor(1, "Unable to create executor due to " + e.getMessage, e) } +case UpdateExecutorLogLevel(newLogLevel) => + Utils.setLogLevelIfNeeded(newLogLevel) case LaunchTask(data) => if (executor == null) { @@ -473,6 +475,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { } driv
[spark] branch branch-3.5 updated: [SPARK-44425][CONNECT] Validate that user provided sessionId is an UUID
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new cbcd85c5e57 [SPARK-44425][CONNECT] Validate that user provided sessionId is an UUID cbcd85c5e57 is described below commit cbcd85c5e57695f3992eaf694d61be86f84449c3 Author: Juliusz Sompolski AuthorDate: Fri Jul 28 07:55:02 2023 +0900 [SPARK-44425][CONNECT] Validate that user provided sessionId is an UUID We want to validate that user provided sessionId is an UUID. Existing Spark Connect python and scala clients already do that, we would like to depend on it being in this format moving forward, just like we already validate that operatoinId is an UUID. Validate what's already assumed. No. Existing CI. Closes #42150 from juliuszsompolski/SPARK-44425. Authored-by: Juliusz Sompolski Signed-off-by: Hyukjin Kwon (cherry picked from commit a3bd477a6d8c317ee1e9a6aae6ebd2ef4fc67cce) Signed-off-by: Hyukjin Kwon --- .../src/main/protobuf/spark/connect/base.proto | 6 ++ .../sql/connect/service/SparkConnectService.scala | 12 ++- .../connect/artifact/ArtifactManagerSuite.scala| 15 +- .../connect/planner/SparkConnectServiceSuite.scala | 23 +- .../connect/service/AddArtifactsHandlerSuite.scala | 13 +++- python/pyspark/sql/connect/proto/base_pb2.pyi | 6 ++ 6 files changed, 55 insertions(+), 20 deletions(-) diff --git a/connector/connect/common/src/main/protobuf/spark/connect/base.proto b/connector/connect/common/src/main/protobuf/spark/connect/base.proto index d935ae65328..21fd167f6b5 100644 --- a/connector/connect/common/src/main/protobuf/spark/connect/base.proto +++ b/connector/connect/common/src/main/protobuf/spark/connect/base.proto @@ -63,6 +63,7 @@ message AnalyzePlanRequest { // The session_id specifies a spark session for a user id (which is specified // by user_context.user_id). The session_id is set by the client to be able to // collate streaming responses from different queries within the dedicated session. + // The id should be an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` string session_id = 1; // (Required) User context @@ -273,6 +274,7 @@ message ExecutePlanRequest { // The session_id specifies a spark session for a user id (which is specified // by user_context.user_id). The session_id is set by the client to be able to // collate streaming responses from different queries within the dedicated session. + // The id should be an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` string session_id = 1; // (Required) User context @@ -407,6 +409,7 @@ message ConfigRequest { // The session_id specifies a spark session for a user id (which is specified // by user_context.user_id). The session_id is set by the client to be able to // collate streaming responses from different queries within the dedicated session. + // The id should be an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` string session_id = 1; // (Required) User context @@ -492,6 +495,7 @@ message AddArtifactsRequest { // The session_id specifies a spark session for a user id (which is specified // by user_context.user_id). The session_id is set by the client to be able to // collate streaming responses from different queries within the dedicated session. + // The id should be an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` string session_id = 1; // User context @@ -581,6 +585,7 @@ message ArtifactStatusesRequest { // The session_id specifies a spark session for a user id (which is specified // by user_context.user_id). The session_id is set by the client to be able to // collate streaming responses from different queries within the dedicated session. + // The id should be an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` string session_id = 1; // User context @@ -617,6 +622,7 @@ message InterruptRequest { // The session_id specifies a spark session for a user id (which is specified // by user_context.user_id). The session_id is set by the client to be able to // collate streaming responses from different queries within the dedicated session. + // The id should be an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` string session_id = 1; // (Required) User context diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala index ad40c94d549..c8fbfca6f70 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/Spark
[spark] branch master updated: [SPARK-44425][CONNECT] Validate that user provided sessionId is an UUID
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a3bd477a6d8 [SPARK-44425][CONNECT] Validate that user provided sessionId is an UUID a3bd477a6d8 is described below commit a3bd477a6d8c317ee1e9a6aae6ebd2ef4fc67cce Author: Juliusz Sompolski AuthorDate: Fri Jul 28 07:55:02 2023 +0900 [SPARK-44425][CONNECT] Validate that user provided sessionId is an UUID ### What changes were proposed in this pull request? We want to validate that user provided sessionId is an UUID. Existing Spark Connect python and scala clients already do that, we would like to depend on it being in this format moving forward, just like we already validate that operatoinId is an UUID. ### Why are the changes needed? Validate what's already assumed. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing CI. Closes #42150 from juliuszsompolski/SPARK-44425. Authored-by: Juliusz Sompolski Signed-off-by: Hyukjin Kwon --- .../src/main/protobuf/spark/connect/base.proto | 6 ++ .../sql/connect/service/SparkConnectService.scala | 12 ++- .../connect/artifact/ArtifactManagerSuite.scala| 15 +- .../connect/planner/SparkConnectServiceSuite.scala | 23 +- .../connect/service/AddArtifactsHandlerSuite.scala | 13 +++- python/pyspark/sql/connect/proto/base_pb2.pyi | 6 ++ 6 files changed, 55 insertions(+), 20 deletions(-) diff --git a/connector/connect/common/src/main/protobuf/spark/connect/base.proto b/connector/connect/common/src/main/protobuf/spark/connect/base.proto index d935ae65328..21fd167f6b5 100644 --- a/connector/connect/common/src/main/protobuf/spark/connect/base.proto +++ b/connector/connect/common/src/main/protobuf/spark/connect/base.proto @@ -63,6 +63,7 @@ message AnalyzePlanRequest { // The session_id specifies a spark session for a user id (which is specified // by user_context.user_id). The session_id is set by the client to be able to // collate streaming responses from different queries within the dedicated session. + // The id should be an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` string session_id = 1; // (Required) User context @@ -273,6 +274,7 @@ message ExecutePlanRequest { // The session_id specifies a spark session for a user id (which is specified // by user_context.user_id). The session_id is set by the client to be able to // collate streaming responses from different queries within the dedicated session. + // The id should be an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` string session_id = 1; // (Required) User context @@ -407,6 +409,7 @@ message ConfigRequest { // The session_id specifies a spark session for a user id (which is specified // by user_context.user_id). The session_id is set by the client to be able to // collate streaming responses from different queries within the dedicated session. + // The id should be an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` string session_id = 1; // (Required) User context @@ -492,6 +495,7 @@ message AddArtifactsRequest { // The session_id specifies a spark session for a user id (which is specified // by user_context.user_id). The session_id is set by the client to be able to // collate streaming responses from different queries within the dedicated session. + // The id should be an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` string session_id = 1; // User context @@ -581,6 +585,7 @@ message ArtifactStatusesRequest { // The session_id specifies a spark session for a user id (which is specified // by user_context.user_id). The session_id is set by the client to be able to // collate streaming responses from different queries within the dedicated session. + // The id should be an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` string session_id = 1; // User context @@ -617,6 +622,7 @@ message InterruptRequest { // The session_id specifies a spark session for a user id (which is specified // by user_context.user_id). The session_id is set by the client to be able to // collate streaming responses from different queries within the dedicated session. + // The id should be an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` string session_id = 1; // (Required) User context diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala index 6b7007130be..87e4f21732f 10064
[spark] branch branch-3.5 updated: [SPARK-44560][PYTHON][CONNECT] Improve tests and documentation for Arrow Python UDF
This is an automated email from the ASF dual-hosted git repository. xinrong pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 36b93d07eb9 [SPARK-44560][PYTHON][CONNECT] Improve tests and documentation for Arrow Python UDF 36b93d07eb9 is described below commit 36b93d07eb961905647c42fac80e22efdfb15f4f Author: Xinrong Meng AuthorDate: Thu Jul 27 13:45:05 2023 -0700 [SPARK-44560][PYTHON][CONNECT] Improve tests and documentation for Arrow Python UDF ### What changes were proposed in this pull request? - Test on complex return type - Remove complex return type constraints for Arrow Python UDF on Spark Connect - Update documentation of the related Spark conf The change targets both Spark 3.5 and 4.0. ### Why are the changes needed? Testability and parity with vanilla PySpark. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit tests. Closes #42178 from xinrong-meng/conf. Authored-by: Xinrong Meng Signed-off-by: Xinrong Meng (cherry picked from commit 5f6537409383e2dbdd699108f708567c37db8151) Signed-off-by: Xinrong Meng --- python/pyspark/sql/connect/udf.py| 10 ++ python/pyspark/sql/tests/test_arrow_python_udf.py| 5 - python/pyspark/sql/tests/test_udf.py | 16 .../scala/org/apache/spark/sql/internal/SQLConf.scala| 3 +-- 4 files changed, 19 insertions(+), 15 deletions(-) diff --git a/python/pyspark/sql/connect/udf.py b/python/pyspark/sql/connect/udf.py index 0a5d06618b3..2d7e423d3d5 100644 --- a/python/pyspark/sql/connect/udf.py +++ b/python/pyspark/sql/connect/udf.py @@ -35,7 +35,7 @@ from pyspark.sql.connect.expressions import ( ) from pyspark.sql.connect.column import Column from pyspark.sql.connect.types import UnparsedDataType -from pyspark.sql.types import ArrayType, DataType, MapType, StringType, StructType +from pyspark.sql.types import DataType, StringType from pyspark.sql.udf import UDFRegistration as PySparkUDFRegistration from pyspark.errors import PySparkTypeError @@ -70,18 +70,12 @@ def _create_py_udf( is_arrow_enabled = useArrow regular_udf = _create_udf(f, returnType, PythonEvalType.SQL_BATCHED_UDF) -return_type = regular_udf.returnType try: is_func_with_args = len(getfullargspec(f).args) > 0 except TypeError: is_func_with_args = False -is_output_atomic_type = ( -not isinstance(return_type, StructType) -and not isinstance(return_type, MapType) -and not isinstance(return_type, ArrayType) -) if is_arrow_enabled: -if is_output_atomic_type and is_func_with_args: +if is_func_with_args: return _create_arrow_py_udf(regular_udf) else: warnings.warn( diff --git a/python/pyspark/sql/tests/test_arrow_python_udf.py b/python/pyspark/sql/tests/test_arrow_python_udf.py index 264ea0b901f..f48f07666e1 100644 --- a/python/pyspark/sql/tests/test_arrow_python_udf.py +++ b/python/pyspark/sql/tests/test_arrow_python_udf.py @@ -47,11 +47,6 @@ class PythonUDFArrowTestsMixin(BaseUDFTestsMixin): def test_register_java_udaf(self): super(PythonUDFArrowTests, self).test_register_java_udaf() -# TODO(SPARK-43903): Standardize ArrayType conversion for Python UDF -@unittest.skip("Inconsistent ArrayType conversion with/without Arrow.") -def test_nested_array(self): -super(PythonUDFArrowTests, self).test_nested_array() - def test_complex_input_types(self): row = ( self.spark.range(1) diff --git a/python/pyspark/sql/tests/test_udf.py b/python/pyspark/sql/tests/test_udf.py index 8ffcb5e05a2..239ff27813b 100644 --- a/python/pyspark/sql/tests/test_udf.py +++ b/python/pyspark/sql/tests/test_udf.py @@ -882,6 +882,22 @@ class BaseUDFTestsMixin(object): row = df.select(f("nested_array")).first() self.assertEquals(row[0], [[1, 2], [3, 4], [4, 5]]) +def test_complex_return_types(self): +row = ( +self.spark.range(1) +.selectExpr("array(1, 2, 3) as array", "map('a', 'b') as map", "struct(1, 2) as struct") +.select( +udf(lambda x: x, "array")("array"), +udf(lambda x: x, "map")("map"), +udf(lambda x: x, "struct")("struct"), +) +.first() +) + +self.assertEquals(row[0], [1, 2, 3]) +self.assertEquals(row[1], {"a": "b"}) +self.assertEquals(row[2], Row(col1=1, col2=2)) + class UDFTests(BaseUDFTestsMixin, ReusedSQLTestCase): @classmethod diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scal
[spark] branch master updated: [SPARK-44560][PYTHON][CONNECT] Improve tests and documentation for Arrow Python UDF
This is an automated email from the ASF dual-hosted git repository. xinrong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5f653740938 [SPARK-44560][PYTHON][CONNECT] Improve tests and documentation for Arrow Python UDF 5f653740938 is described below commit 5f6537409383e2dbdd699108f708567c37db8151 Author: Xinrong Meng AuthorDate: Thu Jul 27 13:45:05 2023 -0700 [SPARK-44560][PYTHON][CONNECT] Improve tests and documentation for Arrow Python UDF ### What changes were proposed in this pull request? - Test on complex return type - Remove complex return type constraints for Arrow Python UDF on Spark Connect - Update documentation of the related Spark conf The change targets both Spark 3.5 and 4.0. ### Why are the changes needed? Testability and parity with vanilla PySpark. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit tests. Closes #42178 from xinrong-meng/conf. Authored-by: Xinrong Meng Signed-off-by: Xinrong Meng --- python/pyspark/sql/connect/udf.py| 10 ++ python/pyspark/sql/tests/test_arrow_python_udf.py| 5 - python/pyspark/sql/tests/test_udf.py | 16 .../scala/org/apache/spark/sql/internal/SQLConf.scala| 3 +-- 4 files changed, 19 insertions(+), 15 deletions(-) diff --git a/python/pyspark/sql/connect/udf.py b/python/pyspark/sql/connect/udf.py index 0a5d06618b3..2d7e423d3d5 100644 --- a/python/pyspark/sql/connect/udf.py +++ b/python/pyspark/sql/connect/udf.py @@ -35,7 +35,7 @@ from pyspark.sql.connect.expressions import ( ) from pyspark.sql.connect.column import Column from pyspark.sql.connect.types import UnparsedDataType -from pyspark.sql.types import ArrayType, DataType, MapType, StringType, StructType +from pyspark.sql.types import DataType, StringType from pyspark.sql.udf import UDFRegistration as PySparkUDFRegistration from pyspark.errors import PySparkTypeError @@ -70,18 +70,12 @@ def _create_py_udf( is_arrow_enabled = useArrow regular_udf = _create_udf(f, returnType, PythonEvalType.SQL_BATCHED_UDF) -return_type = regular_udf.returnType try: is_func_with_args = len(getfullargspec(f).args) > 0 except TypeError: is_func_with_args = False -is_output_atomic_type = ( -not isinstance(return_type, StructType) -and not isinstance(return_type, MapType) -and not isinstance(return_type, ArrayType) -) if is_arrow_enabled: -if is_output_atomic_type and is_func_with_args: +if is_func_with_args: return _create_arrow_py_udf(regular_udf) else: warnings.warn( diff --git a/python/pyspark/sql/tests/test_arrow_python_udf.py b/python/pyspark/sql/tests/test_arrow_python_udf.py index 264ea0b901f..f48f07666e1 100644 --- a/python/pyspark/sql/tests/test_arrow_python_udf.py +++ b/python/pyspark/sql/tests/test_arrow_python_udf.py @@ -47,11 +47,6 @@ class PythonUDFArrowTestsMixin(BaseUDFTestsMixin): def test_register_java_udaf(self): super(PythonUDFArrowTests, self).test_register_java_udaf() -# TODO(SPARK-43903): Standardize ArrayType conversion for Python UDF -@unittest.skip("Inconsistent ArrayType conversion with/without Arrow.") -def test_nested_array(self): -super(PythonUDFArrowTests, self).test_nested_array() - def test_complex_input_types(self): row = ( self.spark.range(1) diff --git a/python/pyspark/sql/tests/test_udf.py b/python/pyspark/sql/tests/test_udf.py index 8ffcb5e05a2..239ff27813b 100644 --- a/python/pyspark/sql/tests/test_udf.py +++ b/python/pyspark/sql/tests/test_udf.py @@ -882,6 +882,22 @@ class BaseUDFTestsMixin(object): row = df.select(f("nested_array")).first() self.assertEquals(row[0], [[1, 2], [3, 4], [4, 5]]) +def test_complex_return_types(self): +row = ( +self.spark.range(1) +.selectExpr("array(1, 2, 3) as array", "map('a', 'b') as map", "struct(1, 2) as struct") +.select( +udf(lambda x: x, "array")("array"), +udf(lambda x: x, "map")("map"), +udf(lambda x: x, "struct")("struct"), +) +.first() +) + +self.assertEquals(row[0], [1, 2, 3]) +self.assertEquals(row[1], {"a": "b"}) +self.assertEquals(row[2], Row(col1=1, col2=2)) + class UDFTests(BaseUDFTestsMixin, ReusedSQLTestCase): @classmethod diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index af2ec777d6b..2674ae7f4a6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.sc
[spark] branch master updated: [SPARK-43968][PYTHON] Improve error messages for Python UDTFs with wrong number of outputs
This is an automated email from the ASF dual-hosted git repository. ueshin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 7194ce9263f [SPARK-43968][PYTHON] Improve error messages for Python UDTFs with wrong number of outputs 7194ce9263f is described below commit 7194ce9263fe1683c039a1aaf9462657b1672a99 Author: allisonwang-db AuthorDate: Thu Jul 27 13:18:39 2023 -0700 [SPARK-43968][PYTHON] Improve error messages for Python UDTFs with wrong number of outputs ### What changes were proposed in this pull request? This PR improves the error messages for Python UDTFs when the number of outputs mismatches the number of outputs specified in the return type of the UDTFs. ### Why are the changes needed? To make Python UDTFs more user-friendly. ### Does this PR introduce _any_ user-facing change? Yes. This PR improves the error messages. Before this change, the error thrown by Spark will be a java IllegalStateException: ``` java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema ``` After this PR, it will throw a clearer error message with an error class: ``` [UDTF_RETURN_SCHEMA_MISMATCH] The number of columns in the result does not match the specified schema ``` ### How was this patch tested? Existing tests and new unit tests. Closes #42157 from allisonwang-db/spark-43968-py-udtf-checks. Authored-by: allisonwang-db Signed-off-by: Takuya UESHIN --- python/pyspark/errors/error_classes.py | 5 + python/pyspark/sql/connect/udtf.py | 4 +- .../pyspark/sql/tests/connect/test_parity_udtf.py | 50 python/pyspark/sql/tests/test_udtf.py | 133 +++-- python/pyspark/sql/udtf.py | 9 +- python/pyspark/worker.py | 22 +++- 6 files changed, 99 insertions(+), 124 deletions(-) diff --git a/python/pyspark/errors/error_classes.py b/python/pyspark/errors/error_classes.py index e0d1c30b604..f4b643f1d32 100644 --- a/python/pyspark/errors/error_classes.py +++ b/python/pyspark/errors/error_classes.py @@ -283,6 +283,11 @@ ERROR_CLASSES_JSON = """ "The eval type for the UDTF '' is invalid. It must be one of ." ] }, + "INVALID_UDTF_HANDLER_TYPE" : { +"message" : [ + "The UDTF is invalid. The function handler must be a class, but got ''. Please provide a class as the function handler." +] + }, "INVALID_UDTF_NO_EVAL" : { "message" : [ "The UDTF '' is invalid. It does not implement the required 'eval' method. Please implement the 'eval' method in '' and try again." diff --git a/python/pyspark/sql/connect/udtf.py b/python/pyspark/sql/connect/udtf.py index 74c55cc42cd..919994401c8 100644 --- a/python/pyspark/sql/connect/udtf.py +++ b/python/pyspark/sql/connect/udtf.py @@ -124,6 +124,8 @@ class UserDefinedTableFunction: evalType: int = PythonEvalType.SQL_TABLE_UDF, deterministic: bool = True, ) -> None: +_validate_udtf_handler(func, returnType) + self.func = func self.returnType: Optional[DataType] = ( None @@ -136,8 +138,6 @@ class UserDefinedTableFunction: self.evalType = evalType self.deterministic = deterministic -_validate_udtf_handler(func, returnType) - def _build_common_inline_user_defined_table_function( self, *cols: "ColumnOrName" ) -> CommonInlineUserDefinedTableFunction: diff --git a/python/pyspark/sql/tests/connect/test_parity_udtf.py b/python/pyspark/sql/tests/connect/test_parity_udtf.py index 1aff1bd0686..748b611e667 100644 --- a/python/pyspark/sql/tests/connect/test_parity_udtf.py +++ b/python/pyspark/sql/tests/connect/test_parity_udtf.py @@ -56,56 +56,6 @@ class UDTFParityTests(BaseUDTFTestsMixin, ReusedConnectTestCase): ): TestUDTF(lit(1)).collect() -def test_udtf_with_wrong_num_output(self): -err_msg = ( -"java.lang.IllegalStateException: Input row doesn't have expected number of " -+ "values required by the schema." -) - -@udtf(returnType="a: int, b: int") -class TestUDTF: -def eval(self, a: int): -yield a, - -with self.assertRaisesRegex(SparkConnectGrpcException, err_msg): -TestUDTF(lit(1)).collect() - -@udtf(returnType="a: int") -class TestUDTF: -def eval(self, a: int): -yield a, a + 1 - -with self.assertRaisesRegex(SparkConnectGrpcException, err_msg): -TestUDTF(lit(1)).collect() - -def test_udtf_terminate_with_wrong_num_output(self): -err_msg = ( -"java.lang.IllegalStateException: Input row doesn't have expected n
[spark] branch master updated: [MINOR][DOCS] fix: some minor typos
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 921fb289f00 [MINOR][DOCS] fix: some minor typos 921fb289f00 is described below commit 921fb289f003317d89120faa6937e4abd359195c Author: Eric Blanco AuthorDate: Thu Jul 27 08:53:54 2023 -0500 [MINOR][DOCS] fix: some minor typos ### What changes were proposed in this pull request? Change `the the` to `the` ### Why are the changes needed? To fix the typo ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Closes #42188 from ejblanco/docs/spark-typos. Authored-by: Eric Blanco Signed-off-by: Sean Owen --- .../spark/sql/connect/service/SparkConnectStreamingQueryCache.scala | 2 +- .../org/apache/spark/ui/static/vis-timeline-graph2d.min.js.map | 2 +- dev/connect-jvm-client-mima-check | 2 +- .../main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala| 2 +- .../scala/org/apache/spark/sql/catalyst/expressions/WindowTime.scala| 2 +- sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala index 133686df018..87004242da9 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamingQueryCache.scala @@ -84,7 +84,7 @@ private[connect] class SparkConnectStreamingQueryCache( /** * Returns [[StreamingQuery]] if it is cached and session matches the cached query. It ensures - * the the session associated with it matches the session passed into the call. If the query is + * the session associated with it matches the session passed into the call. If the query is * inactive (i.e. it has a cache expiry time set), this access extends its expiry time. So if a * client keeps accessing a query, it stays in the cache. */ diff --git a/core/src/main/resources/org/apache/spark/ui/static/vis-timeline-graph2d.min.js.map b/core/src/main/resources/org/apache/spark/ui/static/vis-timeline-graph2d.min.js.map index 95fdc523cf4..250b375e545 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/vis-timeline-graph2d.min.js.map +++ b/core/src/main/resources/org/apache/spark/ui/static/vis-timeline-graph2d.min.js.map @@ -1 +1 @@ -{"version":3,"file":"vis-timeline-graph2d.min.js","sources":["../../node_modules/moment/locale/de.js","../../node_modules/moment/moment.js","../../node_modules/moment/locale/es.js","../../node_modules/moment/locale/fr.js","../../node_modules/moment/locale/it.js","../../node_modules/moment/locale/ja.js","../../node_modules/moment/locale/nl.js","../../node_modules/moment/locale/pl.js","../../node_modules/moment/locale/ru.js","../../node_modules/moment/locale/uk.js","../../node_modules/core [...] \ No newline at end of file +{"version":3,"file":"vis-timeline-graph2d.min.js","sources":["../../node_modules/moment/locale/de.js","../../node_modules/moment/moment.js","../../node_modules/moment/locale/es.js","../../node_modules/moment/locale/fr.js","../../node_modules/moment/locale/it.js","../../node_modules/moment/locale/ja.js","../../node_modules/moment/locale/nl.js","../../node_modules/moment/locale/pl.js","../../node_modules/moment/locale/ru.js","../../node_modules/moment/locale/uk.js","../../node_modules/core [...] \ No newline at end of file diff --git a/dev/connect-jvm-client-mima-check b/dev/connect-jvm-client-mima-check index ac4b95935b9..6a29cbf08ce 100755 --- a/dev/connect-jvm-client-mima-check +++ b/dev/connect-jvm-client-mima-check @@ -52,7 +52,7 @@ echo "finish connect-client-jvm module mima check ..." RESULT_SIZE=$(wc -l .connect-mima-check-result | awk '{print $1}') -# The the file has no content if check passed. +# The file has no content if check passed. if [[ $RESULT_SIZE -eq "0" ]]; then ERRORS="" else diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 3ece74a4d18..92e550ea941 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2623,7 +2623,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor withOrigin(t.origin)(t.copy(hasTried =
[spark] branch branch-3.5 updated: [SPARK-44505][SQL] Provide override for columnar support in Scan for DSv2
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 48494e1fa49 [SPARK-44505][SQL] Provide override for columnar support in Scan for DSv2 48494e1fa49 is described below commit 48494e1fa49e93239a3fb240e7fdbce015f5cc0c Author: Martin Grund AuthorDate: Thu Jul 27 20:53:43 2023 +0800 [SPARK-44505][SQL] Provide override for columnar support in Scan for DSv2 ### What changes were proposed in this pull request? Previously, when a new DSv2 data source is implemented during planning, it will always call `BatchScanExec:supportsColumnar` which will in turn iterate over all input partitions to check if they support columnar or not. When the `planInputPartitions` method is expensive this can be problematic. This patch adds an option to the Scan interface that allows specifying a default value. For backward compatibility the default value provided by the Scan interface is partition defined, but a Scan can change it accordingly. To fully support the changes of this PR, the following additional changes had to be done: * `DataSourceV2ScanExecBase::outputPartitioning` removed the case for single partitions. * `lazyval DataSourceV2ScanExecBase::groupedPartitions` added a special check for empty key group partitioning so that the simple case does not trigger a materialization of the input partitions during planning. Additionally: * Fixes similar issues as https://github.com/apache/spark/pull/40004 ### Why are the changes needed? Avoid costly operations during explain operations. ### Does this PR introduce _any_ user-facing change? Np ### How was this patch tested? Added new UT. Closes #42099 from grundprinzip/SPARK-44505. Authored-by: Martin Grund Signed-off-by: Wenchen Fan (cherry picked from commit 01191c83f8c77f5dcc85b9017551023d81ed0d45) Signed-off-by: Wenchen Fan --- .../org/apache/spark/sql/connector/read/Scan.java | 24 .../datasources/v2/DataSourceV2ScanExecBase.scala | 44 +- .../spark/sql/connector/DataSourceV2Suite.scala| 37 ++ .../connector/KeyGroupedPartitioningSuite.scala| 5 ++- .../command/AlignAssignmentsSuiteBase.scala| 4 +- 5 files changed, 93 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java index 8f79c656210..969a47be707 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java @@ -125,4 +125,28 @@ public interface Scan { default CustomTaskMetric[] reportDriverMetrics() { return new CustomTaskMetric[]{}; } + + /** + * This enum defines how the columnar support for the partitions of the data source + * should be determined. The default value is `PARTITION_DEFINED` which indicates that each + * partition can determine if it should be columnar or not. SUPPORTED and UNSUPPORTED provide + * default shortcuts to indicate support for columnar data or not. + * + * @since 3.5.0 + */ + enum ColumnarSupportMode { +PARTITION_DEFINED, +SUPPORTED, +UNSUPPORTED + } + + /** + * Subclasses can implement this method to indicate if the support for columnar data should + * be determined by each partition or is set as a default for the whole scan. + * + * @since 3.5.0 + */ + default ColumnarSupportMode columnarSupportMode() { +return ColumnarSupportMode.PARTITION_DEFINED; + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala index e539b1c4ee3..f688d3514d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala @@ -21,7 +21,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Expression, RowOrdering, SortOrder} import org.apache.spark.sql.catalyst.plans.physical -import org.apache.spark.sql.catalyst.plans.physical.{KeyGroupedPartitioning, SinglePartition} +import org.apache.spark.sql.catalyst.plans.physical.KeyGroupedPartitioning import org.apache.spark.sql.catalyst.util.{truncatedString, InternalRowComparableWrapper} import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition, PartitionReaderFactory, Scan} import org.apache.spark.sql.execution.{ExplainUtils, Lea
[spark] branch master updated: [SPARK-44505][SQL] Provide override for columnar support in Scan for DSv2
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 01191c83f8c [SPARK-44505][SQL] Provide override for columnar support in Scan for DSv2 01191c83f8c is described below commit 01191c83f8c77f5dcc85b9017551023d81ed0d45 Author: Martin Grund AuthorDate: Thu Jul 27 20:53:43 2023 +0800 [SPARK-44505][SQL] Provide override for columnar support in Scan for DSv2 ### What changes were proposed in this pull request? Previously, when a new DSv2 data source is implemented during planning, it will always call `BatchScanExec:supportsColumnar` which will in turn iterate over all input partitions to check if they support columnar or not. When the `planInputPartitions` method is expensive this can be problematic. This patch adds an option to the Scan interface that allows specifying a default value. For backward compatibility the default value provided by the Scan interface is partition defined, but a Scan can change it accordingly. To fully support the changes of this PR, the following additional changes had to be done: * `DataSourceV2ScanExecBase::outputPartitioning` removed the case for single partitions. * `lazyval DataSourceV2ScanExecBase::groupedPartitions` added a special check for empty key group partitioning so that the simple case does not trigger a materialization of the input partitions during planning. Additionally: * Fixes similar issues as https://github.com/apache/spark/pull/40004 ### Why are the changes needed? Avoid costly operations during explain operations. ### Does this PR introduce _any_ user-facing change? Np ### How was this patch tested? Added new UT. Closes #42099 from grundprinzip/SPARK-44505. Authored-by: Martin Grund Signed-off-by: Wenchen Fan --- .../org/apache/spark/sql/connector/read/Scan.java | 24 .../datasources/v2/DataSourceV2ScanExecBase.scala | 44 +- .../spark/sql/connector/DataSourceV2Suite.scala| 37 ++ .../connector/KeyGroupedPartitioningSuite.scala| 5 ++- .../command/AlignAssignmentsSuiteBase.scala| 4 +- 5 files changed, 93 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java index 8f79c656210..969a47be707 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java @@ -125,4 +125,28 @@ public interface Scan { default CustomTaskMetric[] reportDriverMetrics() { return new CustomTaskMetric[]{}; } + + /** + * This enum defines how the columnar support for the partitions of the data source + * should be determined. The default value is `PARTITION_DEFINED` which indicates that each + * partition can determine if it should be columnar or not. SUPPORTED and UNSUPPORTED provide + * default shortcuts to indicate support for columnar data or not. + * + * @since 3.5.0 + */ + enum ColumnarSupportMode { +PARTITION_DEFINED, +SUPPORTED, +UNSUPPORTED + } + + /** + * Subclasses can implement this method to indicate if the support for columnar data should + * be determined by each partition or is set as a default for the whole scan. + * + * @since 3.5.0 + */ + default ColumnarSupportMode columnarSupportMode() { +return ColumnarSupportMode.PARTITION_DEFINED; + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala index e539b1c4ee3..f688d3514d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala @@ -21,7 +21,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Expression, RowOrdering, SortOrder} import org.apache.spark.sql.catalyst.plans.physical -import org.apache.spark.sql.catalyst.plans.physical.{KeyGroupedPartitioning, SinglePartition} +import org.apache.spark.sql.catalyst.plans.physical.KeyGroupedPartitioning import org.apache.spark.sql.catalyst.util.{truncatedString, InternalRowComparableWrapper} import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition, PartitionReaderFactory, Scan} import org.apache.spark.sql.execution.{ExplainUtils, LeafExecNode, SQLExecution} @@ -91,22 +91,25 @@ trait DataSourceV2ScanExecBase extends LeafExecNode { } ove
[spark] branch master updated: [SPARK-44454][SQL][HIVE] HiveShim getTablesByType support fallback
This is an automated email from the ASF dual-hosted git repository. yumwang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 50b36d13132 [SPARK-44454][SQL][HIVE] HiveShim getTablesByType support fallback 50b36d13132 is described below commit 50b36d131326d1dc2503f20a35891e7f1b4b0194 Author: sychen AuthorDate: Thu Jul 27 18:23:31 2023 +0800 [SPARK-44454][SQL][HIVE] HiveShim getTablesByType support fallback ### What changes were proposed in this pull request? When `Shim_v2_3#getTablesByType` call returns no `get_tables_by_type` method, throw `SparkUnsupportedOperationException`. `HiveClientImpl#listTablesByType` will have a fallback call. ### Why are the changes needed? When we use a high version of Hive Client to communicate with a low version of Hive meta store, we may encounter Invalid method name: 'get_tables_by_type'. ```java 23/07/17 12:45:24,391 [main] DEBUG SparkSqlParser: Parsing command: show views 23/07/17 12:45:24,489 [main] ERROR log: Got exception: org.apache.thrift.TApplicationException Invalid method name: 'get_tables_by_type' org.apache.thrift.TApplicationException: Invalid method name: 'get_tables_by_type' at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:79) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_tables_by_type(ThriftHiveMetastore.java:1433) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_tables_by_type(ThriftHiveMetastore.java:1418) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTables(HiveMetaStoreClient.java:1411) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:173) at com.sun.proxy.$Proxy23.getTables(Unknown Source) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient$SynchronizedHandler.invoke(HiveMetaStoreClient.java:2344) at com.sun.proxy.$Proxy23.getTables(Unknown Source) at org.apache.hadoop.hive.ql.metadata.Hive.getTablesByType(Hive.java:1427) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.sql.hive.client.Shim_v2_3.getTablesByType(HiveShim.scala:1408) at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$listTablesByType$1(HiveClientImpl.scala:789) at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:294) at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:225) at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:224) at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:274) at org.apache.spark.sql.hive.client.HiveClientImpl.listTablesByType(HiveClientImpl.scala:785) at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$listViews$1(HiveExternalCatalog.scala:895) at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:108) at org.apache.spark.sql.hive.HiveExternalCatalog.listViews(HiveExternalCatalog.scala:893) at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.listViews(ExternalCatalogWithListener.scala:158) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.listViews(SessionCatalog.scala:1040) at org.apache.spark.sql.execution.command.ShowViewsCommand.$anonfun$run$5(views.scala:407) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.execution.command.ShowViewsCommand.run(views.scala:407) ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Use the built-in Hive 2.3.9 Client version to communicate with the Hive meta store version lower than 2.3, and test. Closes #42033 from cxzl25/SPARK-44454. Lead-authored-by: sychen Co-author
[spark] branch master updated: [SPARK-43611][PS][CONNECT][TESTS][FOLLOWUPS] Enable more tests
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 9b40c0cdc5c [SPARK-43611][PS][CONNECT][TESTS][FOLLOWUPS] Enable more tests 9b40c0cdc5c is described below commit 9b40c0cdc5cf8e24e66f7ee8e122702d3f157291 Author: Ruifeng Zheng AuthorDate: Thu Jul 27 16:30:47 2023 +0800 [SPARK-43611][PS][CONNECT][TESTS][FOLLOWUPS] Enable more tests ### What changes were proposed in this pull request? Enable more tests, they were excluded from https://github.com/apache/spark/pull/42086 due to the flaky CI issues ### Why are the changes needed? for test parity ### Does this PR introduce _any_ user-facing change? no, test-only ### How was this patch tested? enabled tests Closes #42182 from zhengruifeng/spark_43611_followup. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- .../connect/computation/test_parity_compute.py | 12 -- .../connect/computation/test_parity_cumulative.py | 48 -- .../diff_frames_ops/test_parity_basic_slow.py | 18 +--- .../tests/connect/frame/test_parity_time_series.py | 6 --- .../connect/groupby/test_parity_cumulative.py | 30 +- .../tests/connect/groupby/test_parity_groupby.py | 18 +--- .../connect/groupby/test_parity_missing_data.py| 18 +--- .../tests/connect/indexes/test_parity_base.py | 6 --- .../connect/indexes/test_parity_reset_index.py | 6 --- .../tests/connect/test_parity_default_index.py | 6 --- .../tests/connect/test_parity_generic_functions.py | 6 +-- ...st_parity_ops_on_diff_frames_groupby_rolling.py | 42 +-- 12 files changed, 6 insertions(+), 210 deletions(-) diff --git a/python/pyspark/pandas/tests/connect/computation/test_parity_compute.py b/python/pyspark/pandas/tests/connect/computation/test_parity_compute.py index 88eeb735d46..e2b92190b6e 100644 --- a/python/pyspark/pandas/tests/connect/computation/test_parity_compute.py +++ b/python/pyspark/pandas/tests/connect/computation/test_parity_compute.py @@ -27,22 +27,10 @@ class FrameParityComputeTests(FrameComputeMixin, PandasOnSparkTestUtils, ReusedC def psdf(self): return ps.from_pandas(self.pdf) -@unittest.skip( -"TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." -) -def test_diff(self): -super().test_diff() - @unittest.skip("Spark Connect does not support RDD but the tests depend on them.") def test_mode(self): super().test_mode() -@unittest.skip( -"TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." -) -def test_pct_change(self): -super().test_pct_change() - @unittest.skip("TODO(SPARK-43618): Fix pyspark.sq.column._unary_op to work with Spark Connect.") def test_rank(self): super().test_rank() diff --git a/python/pyspark/pandas/tests/connect/computation/test_parity_cumulative.py b/python/pyspark/pandas/tests/connect/computation/test_parity_cumulative.py index 8015d90aaa5..e14d296749c 100644 --- a/python/pyspark/pandas/tests/connect/computation/test_parity_cumulative.py +++ b/python/pyspark/pandas/tests/connect/computation/test_parity_cumulative.py @@ -29,54 +29,6 @@ class FrameParityCumulativeTests( def psdf(self): return ps.from_pandas(self.pdf) -@unittest.skip( -"TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." -) -def test_cummax(self): -super().test_cummax() - -@unittest.skip( -"TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." -) -def test_cummax_multiindex_columns(self): -super().test_cummax_multiindex_columns() - -@unittest.skip( -"TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." -) -def test_cummin(self): -super().test_cummin() - -@unittest.skip( -"TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." -) -def test_cummin_multiindex_columns(self): -super().test_cummin_multiindex_columns() - -@unittest.skip( -"TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." -) -def test_cumprod(self): -super().test_cumprod() - -@unittest.skip( -"TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." -) -def test_cumprod_multiindex_columns(self): -super().test_cumprod_multiindex_columns() - -@unittest.skip( -"TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client." -) -def test_cumsum(self): -super().test_cumsum() - -@unittest.skip( -
[spark] branch branch-3.4 updated: [SPARK-44544][INFRA][FOLLOWUP] Force run `run_python_packaging_tests`
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new 7ff70b11248 [SPARK-44544][INFRA][FOLLOWUP] Force run `run_python_packaging_tests` 7ff70b11248 is described below commit 7ff70b11248462352ca23e41ae70cd18dc2db0ba Author: Ruifeng Zheng AuthorDate: Thu Jul 27 16:16:03 2023 +0800 [SPARK-44544][INFRA][FOLLOWUP] Force run `run_python_packaging_tests` ### What changes were proposed in this pull request? run `run_python_packaging_tests` when there are any changes in PySpark ### Why are the changes needed? https://github.com/apache/spark/pull/42146 make CI run `run_python_packaging_tests` only within `pyspark-errors` (see https://github.com/apache/spark/actions/runs/5666118302/job/15359190468 and https://github.com/apache/spark/actions/runs/5668071930/job/15358091003) ![image](https://github.com/apache/spark/assets/7322292/aef5cd4c-87ee-4b52-add3-e19ca131cdf1) but I ignored that `pyspark-errors` maybe skipped (because no related source changes), so the `run_python_packaging_tests` maybe also skipped unexpectedly (see https://github.com/apache/spark/actions/runs/5666523657/job/15353485731) ![image](https://github.com/apache/spark/assets/7322292/c2517d39-efcf-4a95-8562-1507dad35794) this PR is to run `run_python_packaging_tests` even if `pyspark-errors` is skipped ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? updated CI Closes #42173 from zhengruifeng/infra_followup. Lead-authored-by: Ruifeng Zheng Co-authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng (cherry picked from commit f7947341ab2984113018f7f7014bb8373a3cb3b1) Signed-off-by: Ruifeng Zheng --- dev/sparktestsupport/modules.py | 9 - dev/sparktestsupport/utils.py | 2 +- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index fd18ddd6d13..ac24ea19d0e 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -778,7 +778,14 @@ pyspark_pandas_slow = Module( pyspark_errors = Module( name="pyspark-errors", dependencies=[], -source_file_regexes=["python/pyspark/errors"], +source_file_regexes=[ +# SPARK-44544: Force the execution of pyspark_errors when there are any changes +# in PySpark, since the Python Packaging Tests is only enabled within this module. +# This module is the smallest Python test module, it contains only 1 test file +# and normally takes < 2 seconds, so the additional cost is small. +"python/", +"python/pyspark/errors", +], python_test_goals=[ # unittests "pyspark.errors.tests.test_errors", diff --git a/dev/sparktestsupport/utils.py b/dev/sparktestsupport/utils.py index 6b190eb5ab2..19e6d8917e6 100755 --- a/dev/sparktestsupport/utils.py +++ b/dev/sparktestsupport/utils.py @@ -38,7 +38,7 @@ def determine_modules_for_files(filenames): and `README.md` is always ignored too. >>> sorted(x.name for x in determine_modules_for_files(["python/pyspark/a.py", "sql/core/foo"])) -['pyspark-core', 'sql'] +['pyspark-core', 'pyspark-errors', 'sql'] >>> [x.name for x in determine_modules_for_files(["file_not_matched_by_any_subproject"])] ['root'] >>> [x.name for x in determine_modules_for_files(["appveyor.yml", "sql/README.md"])] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-44544][INFRA][FOLLOWUP] Force run `run_python_packaging_tests`
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f7947341ab2 [SPARK-44544][INFRA][FOLLOWUP] Force run `run_python_packaging_tests` f7947341ab2 is described below commit f7947341ab2984113018f7f7014bb8373a3cb3b1 Author: Ruifeng Zheng AuthorDate: Thu Jul 27 16:16:03 2023 +0800 [SPARK-44544][INFRA][FOLLOWUP] Force run `run_python_packaging_tests` ### What changes were proposed in this pull request? run `run_python_packaging_tests` when there are any changes in PySpark ### Why are the changes needed? https://github.com/apache/spark/pull/42146 make CI run `run_python_packaging_tests` only within `pyspark-errors` (see https://github.com/apache/spark/actions/runs/5666118302/job/15359190468 and https://github.com/apache/spark/actions/runs/5668071930/job/15358091003) ![image](https://github.com/apache/spark/assets/7322292/aef5cd4c-87ee-4b52-add3-e19ca131cdf1) but I ignored that `pyspark-errors` maybe skipped (because no related source changes), so the `run_python_packaging_tests` maybe also skipped unexpectedly (see https://github.com/apache/spark/actions/runs/5666523657/job/15353485731) ![image](https://github.com/apache/spark/assets/7322292/c2517d39-efcf-4a95-8562-1507dad35794) this PR is to run `run_python_packaging_tests` even if `pyspark-errors` is skipped ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? updated CI Closes #42173 from zhengruifeng/infra_followup. Lead-authored-by: Ruifeng Zheng Co-authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- dev/sparktestsupport/modules.py | 9 - dev/sparktestsupport/utils.py | 2 +- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 9d0ba219e79..3cfd82c3d31 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -1040,7 +1040,14 @@ pyspark_pandas_slow_connect = Module( pyspark_errors = Module( name="pyspark-errors", dependencies=[], -source_file_regexes=["python/pyspark/errors"], +source_file_regexes=[ +# SPARK-44544: Force the execution of pyspark_errors when there are any changes +# in PySpark, since the Python Packaging Tests is only enabled within this module. +# This module is the smallest Python test module, it contains only 1 test file +# and normally takes < 2 seconds, so the additional cost is small. +"python/", +"python/pyspark/errors", +], python_test_goals=[ # unittests "pyspark.errors.tests.test_errors", diff --git a/dev/sparktestsupport/utils.py b/dev/sparktestsupport/utils.py index 339534bec25..816c982bd60 100755 --- a/dev/sparktestsupport/utils.py +++ b/dev/sparktestsupport/utils.py @@ -38,7 +38,7 @@ def determine_modules_for_files(filenames): and `README.md` is always ignored too. >>> sorted(x.name for x in determine_modules_for_files(["python/pyspark/a.py", "sql/core/foo"])) -['pyspark-core', 'sql'] +['pyspark-core', 'pyspark-errors', 'sql'] >>> [x.name for x in determine_modules_for_files(["file_not_matched_by_any_subproject"])] ['root'] >>> [x.name for x in determine_modules_for_files(["appveyor.yml", "sql/README.md"])] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.5 updated: [SPARK-44544][INFRA][FOLLOWUP] Force run `run_python_packaging_tests`
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 9dd388a1fd7 [SPARK-44544][INFRA][FOLLOWUP] Force run `run_python_packaging_tests` 9dd388a1fd7 is described below commit 9dd388a1fd793d4cdf3cc8586f59220c1d0bcefb Author: Ruifeng Zheng AuthorDate: Thu Jul 27 16:16:03 2023 +0800 [SPARK-44544][INFRA][FOLLOWUP] Force run `run_python_packaging_tests` ### What changes were proposed in this pull request? run `run_python_packaging_tests` when there are any changes in PySpark ### Why are the changes needed? https://github.com/apache/spark/pull/42146 make CI run `run_python_packaging_tests` only within `pyspark-errors` (see https://github.com/apache/spark/actions/runs/5666118302/job/15359190468 and https://github.com/apache/spark/actions/runs/5668071930/job/15358091003) ![image](https://github.com/apache/spark/assets/7322292/aef5cd4c-87ee-4b52-add3-e19ca131cdf1) but I ignored that `pyspark-errors` maybe skipped (because no related source changes), so the `run_python_packaging_tests` maybe also skipped unexpectedly (see https://github.com/apache/spark/actions/runs/5666523657/job/15353485731) ![image](https://github.com/apache/spark/assets/7322292/c2517d39-efcf-4a95-8562-1507dad35794) this PR is to run `run_python_packaging_tests` even if `pyspark-errors` is skipped ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? updated CI Closes #42173 from zhengruifeng/infra_followup. Lead-authored-by: Ruifeng Zheng Co-authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng (cherry picked from commit f7947341ab2984113018f7f7014bb8373a3cb3b1) Signed-off-by: Ruifeng Zheng --- dev/sparktestsupport/modules.py | 9 - dev/sparktestsupport/utils.py | 2 +- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 9d0ba219e79..3cfd82c3d31 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -1040,7 +1040,14 @@ pyspark_pandas_slow_connect = Module( pyspark_errors = Module( name="pyspark-errors", dependencies=[], -source_file_regexes=["python/pyspark/errors"], +source_file_regexes=[ +# SPARK-44544: Force the execution of pyspark_errors when there are any changes +# in PySpark, since the Python Packaging Tests is only enabled within this module. +# This module is the smallest Python test module, it contains only 1 test file +# and normally takes < 2 seconds, so the additional cost is small. +"python/", +"python/pyspark/errors", +], python_test_goals=[ # unittests "pyspark.errors.tests.test_errors", diff --git a/dev/sparktestsupport/utils.py b/dev/sparktestsupport/utils.py index 339534bec25..816c982bd60 100755 --- a/dev/sparktestsupport/utils.py +++ b/dev/sparktestsupport/utils.py @@ -38,7 +38,7 @@ def determine_modules_for_files(filenames): and `README.md` is always ignored too. >>> sorted(x.name for x in determine_modules_for_files(["python/pyspark/a.py", "sql/core/foo"])) -['pyspark-core', 'sql'] +['pyspark-core', 'pyspark-errors', 'sql'] >>> [x.name for x in determine_modules_for_files(["file_not_matched_by_any_subproject"])] ['root'] >>> [x.name for x in determine_modules_for_files(["appveyor.yml", "sql/README.md"])] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-44536][BUILD] Upgrade sbt to 1.9.3
This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 918d1581fb0 [SPARK-44536][BUILD] Upgrade sbt to 1.9.3 918d1581fb0 is described below commit 918d1581fb0f79e0cd9a0bedee60ea6060620a31 Author: panbingkun AuthorDate: Thu Jul 27 15:55:48 2023 +0800 [SPARK-44536][BUILD] Upgrade sbt to 1.9.3 ### What changes were proposed in this pull request? The pr aims to upgrade sbt from 1.9.2 to 1.9.3. ### Why are the changes needed? 1.The new version brings some improvment: Actionable diagnostics (aka quickfix) Actionable diagnostics, or quickfix, is an area in Scala tooling that's been getting attention since Chris Kipp presented it in the March 2023 Tooling Summit. Chris has written the [roadmap](https://contributors.scala-lang.org/t/roadmap-for-actionable-diagnostics/6172/1) and sent https://github.com/sbt/sbt/pull/7242 that kickstarted the effort, but now there's been steady progress in https://github.com/build-server-protocol/build-server-protocol/pull/527, https://github.com/lampepfl/d [...] sbt 1.9.3 adds a new interface called AnalysisCallback2 to relay code actions from the compiler(s) to Zinc's Analysis file. Future version of Scala 2.13.x (and hopefully Scala 3) will release with proper code actions, but as a demo I've implemented a code action for procedure syntax usages even on current Scala 2.13.11 with -deprecation flag. 2.Full release notes: https://github.com/sbt/sbt/releases/tag/v1.9.3 3.v1.9.2 VS v1.9.3 https://github.com/sbt/sbt/compare/v1.9.2...v1.9.3 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA. Closes #42141 from panbingkun/SPARK-44536. Authored-by: panbingkun Signed-off-by: yangjie01 --- dev/appveyor-install-dependencies.ps1 | 2 +- project/build.properties | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dev/appveyor-install-dependencies.ps1 b/dev/appveyor-install-dependencies.ps1 index 3737382eb86..db154cd51da 100644 --- a/dev/appveyor-install-dependencies.ps1 +++ b/dev/appveyor-install-dependencies.ps1 @@ -97,7 +97,7 @@ if (!(Test-Path $tools)) { # == SBT Push-Location $tools -$sbtVer = "1.9.2" +$sbtVer = "1.9.3" Start-FileDownload "https://github.com/sbt/sbt/releases/download/v$sbtVer/sbt-$sbtVer.zip"; "sbt.zip" # extract diff --git a/project/build.properties b/project/build.properties index 3eb34b94744..e883bb7bdf3 100644 --- a/project/build.properties +++ b/project/build.properties @@ -15,4 +15,4 @@ # limitations under the License. # # Please update the version in appveyor-install-dependencies.ps1 together. -sbt.version=1.9.2 +sbt.version=1.9.3 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-44482][CONNECT] Connect server should can specify the bind address
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new deaa2f7200b [SPARK-44482][CONNECT] Connect server should can specify the bind address deaa2f7200b is described below commit deaa2f7200bb79b5340c722bc8707dd45d50a1c2 Author: panbingkun AuthorDate: Thu Jul 27 16:43:01 2023 +0900 [SPARK-44482][CONNECT] Connect server should can specify the bind address ### What changes were proposed in this pull request? When a machine has multiple network cards, we may only want users of a certain network segment to be able to connect to the connect server. I propose that in addition to specifying port, the connect server can also specify address. https://github.com/apache/spark/blob/047fad5861daedbdb58111b223e76c05784f4951/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala#L26-L30 ### Why are the changes needed? 1.Connect server should can specify the bind address, improve flexibility. 2.As other Spark components, a bind address can also be specified, such as: https://github.com/apache/spark/blob/047fad5861daedbdb58111b223e76c05784f4951/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java#L147-L149 https://github.com/apache/spark/blob/047fad5861daedbdb58111b223e76c05784f4951/core/src/main/scala/org/apache/spark/SparkEnv.scala#L177-L196 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Manually test. - Pass GA. Closes #42073 from panbingkun/SPARK-44482. Authored-by: panbingkun Signed-off-by: Hyukjin Kwon --- .../org/apache/spark/sql/connect/config/Connect.scala| 6 ++ .../spark/sql/connect/service/SparkConnectService.scala | 16 +++- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala index 13e4b9f5364..31f119047e4 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala @@ -23,6 +23,12 @@ import org.apache.spark.sql.connect.common.config.ConnectCommon object Connect { import org.apache.spark.sql.internal.SQLConf.buildStaticConf + val CONNECT_GRPC_BINDING_ADDRESS = +ConfigBuilder("spark.connect.grpc.binding.address") + .version("4.0.0") + .stringConf + .createOptional + val CONNECT_GRPC_BINDING_PORT = ConfigBuilder("spark.connect.grpc.binding.port") .version("3.4.0") diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala index ad40c94d549..6b7007130be 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.connect.service +import java.net.InetSocketAddress import java.util.concurrent.TimeUnit import com.google.common.base.Ticker @@ -32,7 +33,7 @@ import org.apache.spark.connect.proto import org.apache.spark.connect.proto.{AddArtifactsRequest, AddArtifactsResponse} import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.connect.config.Connect.{CONNECT_GRPC_BINDING_PORT, CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE} +import org.apache.spark.sql.connect.config.Connect.{CONNECT_GRPC_BINDING_ADDRESS, CONNECT_GRPC_BINDING_PORT, CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE} import org.apache.spark.sql.connect.utils.ErrorUtils /** @@ -167,7 +168,7 @@ class SparkConnectService(debug: Boolean) * Used to start the overall SparkConnect service and provides global state to manage the * different SparkSession from different users connecting to the cluster. */ -object SparkConnectService { +object SparkConnectService extends Logging { private val CACHE_SIZE = 100 @@ -245,10 +246,15 @@ object SparkConnectService { */ private def startGRPCService(): Unit = { val debugMode = SparkEnv.get.conf.getBoolean("spark.connect.grpc.debug.enabled", true) +val bindAddress = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_ADDRESS) val port = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_PORT) -val sb = NettyServerBuilder - .forPort(port) - .maxInboundMessageSize(SparkEnv.get.conf.get(CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE).toInt) +val sb = bindAddress m
[spark] branch branch-3.4 updated: [SPARK-44513][BUILD][3.4] Upgrade snappy-java to 1.1.10.3
This is an automated email from the ASF dual-hosted git repository. yumwang pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new 2456455fbba [SPARK-44513][BUILD][3.4] Upgrade snappy-java to 1.1.10.3 2456455fbba is described below commit 2456455fbbafb97d5fd168222868dfb93ee284f0 Author: panbingkun AuthorDate: Thu Jul 27 15:01:41 2023 +0800 [SPARK-44513][BUILD][3.4] Upgrade snappy-java to 1.1.10.3 ### What changes were proposed in this pull request? - The pr is for branch-3.4. - The pr aims to upgrade snappy-java from 1.1.10.2 to 1.1.10.3. ### Why are the changes needed? 1.The newest version include a bug fixed: - Fix the GLIBC_2.32 not found issue of libsnappyjava.so in certain Linux distributions on s390x by kun-lu20 in https://github.com/xerial/snappy-java/pull/481 2.Release notes: https://github.com/xerial/snappy-java/releases/tag/v1.1.10.3 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA. Closes #42127 from panbingkun/branch-3.4_snappy_1_1_10_3. Authored-by: panbingkun Signed-off-by: Yuming Wang --- dev/deps/spark-deps-hadoop-2-hive-2.3 | 2 +- dev/deps/spark-deps-hadoop-3-hive-2.3 | 2 +- pom.xml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dev/deps/spark-deps-hadoop-2-hive-2.3 b/dev/deps/spark-deps-hadoop-2-hive-2.3 index b96ef3c1726..6c22673a7df 100644 --- a/dev/deps/spark-deps-hadoop-2-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-2-hive-2.3 @@ -248,7 +248,7 @@ scala-xml_2.12/2.1.0//scala-xml_2.12-2.1.0.jar shims/0.9.38//shims-0.9.38.jar slf4j-api/2.0.6//slf4j-api-2.0.6.jar snakeyaml/1.33//snakeyaml-1.33.jar -snappy-java/1.1.10.2//snappy-java-1.1.10.2.jar +snappy-java/1.1.10.3//snappy-java-1.1.10.3.jar spire-macros_2.12/0.17.0//spire-macros_2.12-0.17.0.jar spire-platform_2.12/0.17.0//spire-platform_2.12-0.17.0.jar spire-util_2.12/0.17.0//spire-util_2.12-0.17.0.jar diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 b/dev/deps/spark-deps-hadoop-3-hive-2.3 index 895db6bf0df..68a5cbced62 100644 --- a/dev/deps/spark-deps-hadoop-3-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3-hive-2.3 @@ -235,7 +235,7 @@ scala-xml_2.12/2.1.0//scala-xml_2.12-2.1.0.jar shims/0.9.38//shims-0.9.38.jar slf4j-api/2.0.6//slf4j-api-2.0.6.jar snakeyaml/1.33//snakeyaml-1.33.jar -snappy-java/1.1.10.2//snappy-java-1.1.10.2.jar +snappy-java/1.1.10.3//snappy-java-1.1.10.3.jar spire-macros_2.12/0.17.0//spire-macros_2.12-0.17.0.jar spire-platform_2.12/0.17.0//spire-platform_2.12-0.17.0.jar spire-util_2.12/0.17.0//spire-util_2.12-0.17.0.jar diff --git a/pom.xml b/pom.xml index ca1dd7ab605..a681adba193 100644 --- a/pom.xml +++ b/pom.xml @@ -185,7 +185,7 @@ 1.9.13 2.14.2 2.14.2 -1.1.10.2 +1.1.10.3 3.0.3 1.15 1.22 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org