[spark] branch master updated: [SPARK-41891][CONNECT][TESTS] Enable test_add_months_function, test_array_repeat, test_dayofweek, test_first_last_ignorenulls, test_inline, test_window_time, test_recipr
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 58d5dc3d573 [SPARK-41891][CONNECT][TESTS] Enable test_add_months_function, test_array_repeat, test_dayofweek, test_first_last_ignorenulls, test_inline, test_window_time, test_reciprocal_trig_functions 58d5dc3d573 is described below commit 58d5dc3d573dfbb6d21ea41d101550146756f45b Author: Sandeep Singh AuthorDate: Thu Jan 5 16:59:19 2023 +0900 [SPARK-41891][CONNECT][TESTS] Enable test_add_months_function, test_array_repeat, test_dayofweek, test_first_last_ignorenulls, test_inline, test_window_time, test_reciprocal_trig_functions ### What changes were proposed in this pull request? Enabling tests in connect/test_parity_functions.py ### Why are the changes needed? Improved coverage ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New Tests Closes #39400 from techaddict/SPARK-41891. Authored-by: Sandeep Singh Signed-off-by: Hyukjin Kwon --- .../sql/tests/connect/test_parity_functions.py | 28 -- 1 file changed, 28 deletions(-) diff --git a/python/pyspark/sql/tests/connect/test_parity_functions.py b/python/pyspark/sql/tests/connect/test_parity_functions.py index 78ccbd49148..3c616b5c864 100644 --- a/python/pyspark/sql/tests/connect/test_parity_functions.py +++ b/python/pyspark/sql/tests/connect/test_parity_functions.py @@ -44,14 +44,6 @@ class FunctionsParityTests(ReusedSQLTestCase, FunctionsTestsMixin): cls.spark = cls._spark.stop() del os.environ["SPARK_REMOTE"] -@unittest.skip("Fails in Spark Connect, should enable.") -def test_add_months_function(self): -super().test_add_months_function() - -@unittest.skip("Fails in Spark Connect, should enable.") -def test_array_repeat(self): -super().test_array_repeat() - @unittest.skip("Fails in Spark Connect, should enable.") def test_assert_true(self): super().test_assert_true() @@ -68,18 +60,10 @@ class FunctionsParityTests(ReusedSQLTestCase, FunctionsTestsMixin): def test_date_sub_function(self): super().test_date_sub_function() -@unittest.skip("Fails in Spark Connect, should enable.") -def test_dayofweek(self): -super().test_dayofweek() - @unittest.skip("Fails in Spark Connect, should enable.") def test_explode(self): super().test_explode() -@unittest.skip("Fails in Spark Connect, should enable.") -def test_first_last_ignorenulls(self): -super().test_first_last_ignorenulls() - @unittest.skip("Fails in Spark Connect, should enable.") def test_function_parity(self): super().test_function_parity() @@ -88,10 +72,6 @@ class FunctionsParityTests(ReusedSQLTestCase, FunctionsTestsMixin): def test_functions_broadcast(self): super().test_functions_broadcast() -@unittest.skip("Fails in Spark Connect, should enable.") -def test_inline(self): -super().test_inline() - @unittest.skip("Fails in Spark Connect, should enable.") def test_input_file_name_reset_for_rdd(self): super().test_input_file_name_reset_for_rdd() @@ -160,18 +140,10 @@ class FunctionsParityTests(ReusedSQLTestCase, FunctionsTestsMixin): def test_window_functions_without_partitionBy(self): super().test_window_functions_without_partitionBy() -@unittest.skip("Fails in Spark Connect, should enable.") -def test_window_time(self): -super().test_window_time() - @unittest.skip("Fails in Spark Connect, should enable.") def test_rand_functions(self): super().test_rand_functions() -@unittest.skip("Fails in Spark Connect, should enable.") -def test_reciprocal_trig_functions(self): -super().test_reciprocal_trig_functions() - @unittest.skip("Fails in Spark Connect, should enable.") def test_sampleby(self): super().test_sampleby() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41829][CONNECT][PYTHON] Add the missing ordering parameter in `Sort` and `sortWithinPartitions`
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 6ee22158f2a [SPARK-41829][CONNECT][PYTHON] Add the missing ordering parameter in `Sort` and `sortWithinPartitions` 6ee22158f2a is described below commit 6ee22158f2a1891d39c4274fb6fe96d6fbb6c1fc Author: Ruifeng Zheng AuthorDate: Thu Jan 5 15:26:17 2023 +0800 [SPARK-41829][CONNECT][PYTHON] Add the missing ordering parameter in `Sort` and `sortWithinPartitions` ### What changes were proposed in this pull request? Add the missing ordering parameter in `Sort` and `sortWithinPartitions` ### Why are the changes needed? API coverage ### Does this PR introduce _any_ user-facing change? yes ### How was this patch tested? enabled doctests Closes #39398 from zhengruifeng/connect_fix_41829. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- python/pyspark/sql/connect/dataframe.py | 59 - python/pyspark/sql/connect/plan.py | 21 +--- 2 files changed, 59 insertions(+), 21 deletions(-) diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index a22c2cc6421..13a421ca72a 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -22,6 +22,7 @@ from typing import ( Optional, Tuple, Union, +Sequence, TYPE_CHECKING, overload, Callable, @@ -44,7 +45,13 @@ from pyspark.sql.connect.group import GroupedData from pyspark.sql.connect.readwriter import DataFrameWriter from pyspark.sql.connect.column import Column from pyspark.sql.connect.expressions import UnresolvedRegex -from pyspark.sql.connect.functions import _invoke_function, col, lit, expr as sql_expression +from pyspark.sql.connect.functions import ( +_to_col, +_invoke_function, +col, +lit, +expr as sql_expression, +) from pyspark.sql.dataframe import ( DataFrame as PySparkDataFrame, DataFrameNaFunctions as PySparkDataFrameNaFunctions, @@ -342,18 +349,56 @@ class DataFrame: tail.__doc__ = PySparkDataFrame.tail.__doc__ -def sort(self, *cols: "ColumnOrName") -> "DataFrame": +def _sort_cols( +self, cols: Sequence[Union[str, Column, List[Union[str, Column, kwargs: Dict[str, Any] +) -> List[Column]: +"""Return a JVM Seq of Columns that describes the sort order""" +if cols is None: +raise ValueError("should sort by at least one column") + +_cols: List[Column] = [] +if len(cols) == 1 and isinstance(cols[0], list): +_cols = [_to_col(c) for c in cols[0]] +else: +_cols = [_to_col(cast("ColumnOrName", c)) for c in cols] + +ascending = kwargs.get("ascending", True) +if isinstance(ascending, (bool, int)): +if not ascending: +_cols = [c.desc() for c in _cols] +elif isinstance(ascending, list): +_cols = [c if asc else c.desc() for asc, c in zip(ascending, _cols)] +else: +raise TypeError("ascending can only be boolean or list, but got %s" % type(ascending)) + +return _cols + +def sort( +self, *cols: Union[str, Column, List[Union[str, Column]]], **kwargs: Any +) -> "DataFrame": return DataFrame.withPlan( -plan.Sort(self._plan, columns=list(cols), is_global=True), session=self._session +plan.Sort( +self._plan, +columns=self._sort_cols(cols, kwargs), +is_global=True, +), +session=self._session, ) sort.__doc__ = PySparkDataFrame.sort.__doc__ orderBy = sort -def sortWithinPartitions(self, *cols: "ColumnOrName") -> "DataFrame": +def sortWithinPartitions( +self, *cols: Union[str, Column, List[Union[str, Column]]], **kwargs: Any +) -> "DataFrame": return DataFrame.withPlan( -plan.Sort(self._plan, columns=list(cols), is_global=False), session=self._session +plan.Sort( +self._plan, +columns=self._sort_cols(cols, kwargs), +is_global=False, +), +session=self._session, ) sortWithinPartitions.__doc__ = PySparkDataFrame.sortWithinPartitions.__doc__ @@ -1440,10 +1485,6 @@ def _test() -> None: # TODO(SPARK-41827): groupBy requires all cols be Column or str del pyspark.sql.connect.dataframe.DataFrame.groupBy.__doc__ -# TODO(SPARK-41829): Add Dataframe sort ordering -del pyspark.sql.connect.dataframe.DataFrame.sort.__doc__ -del pyspark.sql.connect.dataframe.DataFrame.sortWithinPartitions.__doc__ - # TODO(SPARK-41830): fix sample
[spark] branch master updated: [SPARK-41580][SQL] Assign name to _LEGACY_ERROR_TEMP_2137
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 4d3bc8f5b55 [SPARK-41580][SQL] Assign name to _LEGACY_ERROR_TEMP_2137 4d3bc8f5b55 is described below commit 4d3bc8f5b55969f7c954991239ff43f9faba1346 Author: itholic AuthorDate: Thu Jan 5 10:58:14 2023 +0500 [SPARK-41580][SQL] Assign name to _LEGACY_ERROR_TEMP_2137 ### What changes were proposed in this pull request? This PR proposes to assign name to _LEGACY_ERROR_TEMP_2137, "INVALID_JSON_ROOT_FIELD". ### Why are the changes needed? We should assign proper name to _LEGACY_ERROR_TEMP_* ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? `./build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite*` Closes #39305 from itholic/LEGACY_2137. Authored-by: itholic Signed-off-by: Max Gekk --- core/src/main/resources/error/error-classes.json | 10 +- .../org/apache/spark/sql/errors/QueryExecutionErrors.scala | 2 +- .../spark/sql/execution/datasources/json/JsonSuite.scala | 14 +++--- 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 12f4b0f9c37..29cafdcc1b6 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -760,6 +760,11 @@ "The identifier is invalid. Please, consider quoting it with back-quotes as ``." ] }, + "INVALID_JSON_ROOT_FIELD" : { +"message" : [ + "Cannot convert JSON root field to target Spark type." +] + }, "INVALID_JSON_SCHEMA_MAP_TYPE" : { "message" : [ "Input schema can only contain STRING as a key type for a MAP." @@ -4110,11 +4115,6 @@ "Failed to parse an empty string for data type " ] }, - "_LEGACY_ERROR_TEMP_2137" : { -"message" : [ - "Root converter returned null" -] - }, "_LEGACY_ERROR_TEMP_2138" : { "message" : [ "Cannot have circular references in bean class, but got the circular reference of class " diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 227e86994f5..0c92d56ed04 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -1457,7 +1457,7 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { def rootConverterReturnNullError(): SparkRuntimeException = { new SparkRuntimeException( - errorClass = "_LEGACY_ERROR_TEMP_2137", + errorClass = "INVALID_JSON_ROOT_FIELD", messageParameters = Map.empty) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 0d2c98316e7..a4b7df9af42 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -25,11 +25,12 @@ import java.time.{Duration, Instant, LocalDate, LocalDateTime, Period, ZoneId} import java.util.Locale import com.fasterxml.jackson.core.JsonFactory +import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.GzipCodec -import org.apache.spark.{SparkConf, SparkException, SparkUpgradeException, TestUtils} +import org.apache.spark.{SparkConf, SparkException, SparkRuntimeException, SparkUpgradeException, TestUtils} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{functions => F, _} import org.apache.spark.sql.catalyst.json._ @@ -3192,10 +3193,17 @@ abstract class JsonSuite } test("SPARK-36379: proceed parsing with root nulls in permissive mode") { -assert(intercept[SparkException] { +val exception = intercept[SparkException] { spark.read.option("mode", "failfast") .schema("a string").json(Seq("""[{"a": "str"}, null]""").toDS).collect() -}.getMessage.contains("Malformed records are detected")) +} +assert(exception.getMessage.contains("Malformed records are detected")) + +checkError( + exception = ExceptionUtils.getRootCause(exception).asInstanceOf[SparkRuntimeException], + errorClass = "INVALID_JSON_ROOT_FIELD", + parameters = Map.empty +) // Permissive modes should proceed parsing malformed records (null). //
[spark] branch master updated: [SPARK-41576][SQL] Assign name to _LEGACY_ERROR_TEMP_2051
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 76d7c857078 [SPARK-41576][SQL] Assign name to _LEGACY_ERROR_TEMP_2051 76d7c857078 is described below commit 76d7c8570788c773720c6e143e496647dfe9ebe0 Author: itholic AuthorDate: Thu Jan 5 10:47:46 2023 +0500 [SPARK-41576][SQL] Assign name to _LEGACY_ERROR_TEMP_2051 ### What changes were proposed in this pull request? This PR proposes to assign name to _LEGACY_ERROR_TEMP_2051, "DATA_SOURCE_NOT_FOUND". ### Why are the changes needed? We should assign proper name to _LEGACY_ERROR_TEMP_* ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? `./build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite*` Closes #39281 from itholic/LEGACY_2051. Authored-by: itholic Signed-off-by: Max Gekk --- core/src/main/resources/error/error-classes.json | 10 +- .../org/apache/spark/sql/errors/QueryExecutionErrors.scala | 4 ++-- .../apache/spark/sql/execution/datasources/DataSource.scala | 2 +- .../org/apache/spark/sql/execution/command/DDLSuite.scala| 12 .../apache/spark/sql/sources/ResolvedDataSourceSuite.scala | 9 +++-- 5 files changed, 23 insertions(+), 14 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 120925f5254..12f4b0f9c37 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -441,6 +441,11 @@ ], "sqlState" : "42000" }, + "DATA_SOURCE_NOT_FOUND" : { +"message" : [ + "Failed to find the data source: . Please find packages at `https://spark.apache.org/third-party-projects.html`.; +] + }, "DATETIME_OVERFLOW" : { "message" : [ "Datetime operation overflow: ." @@ -3696,11 +3701,6 @@ "Expected exactly one path to be specified, but got: " ] }, - "_LEGACY_ERROR_TEMP_2051" : { -"message" : [ - "Failed to find data source: . Please find packages at https://spark.apache.org/third-party-projects.html; -] - }, "_LEGACY_ERROR_TEMP_2052" : { "message" : [ " was removed in Spark 2.0. Please check if your library is compatible with Spark 2.0" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 44a1972272f..227e86994f5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -731,10 +731,10 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { messageParameters = Map("paths" -> allPaths.mkString(", "))) } - def failedToFindDataSourceError( + def dataSourceNotFoundError( provider: String, error: Throwable): SparkClassNotFoundException = { new SparkClassNotFoundException( - errorClass = "_LEGACY_ERROR_TEMP_2051", + errorClass = "DATA_SOURCE_NOT_FOUND", messageParameters = Map("provider" -> provider), cause = error) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index edbdd6bbc67..9bb5191dc01 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -643,7 +643,7 @@ object DataSource extends Logging { } else if (provider1.toLowerCase(Locale.ROOT) == "kafka") { throw QueryCompilationErrors.failedToFindKafkaDataSourceError(provider1) } else { - throw QueryExecutionErrors.failedToFindDataSourceError(provider1, error) + throw QueryExecutionErrors.dataSourceNotFoundError(provider1, error) } } } catch { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 6cc37a41210..f5d17b142e2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -24,7 +24,7 @@ import java.util.Locale import org.apache.hadoop.fs.{Path, RawLocalFileSystem} import org.apache.hadoop.fs.permission.{AclEntry, AclStatus} -import org.apache.spark.{SparkException, SparkFiles, SparkRuntimeException} +import
[spark] branch master updated (069fa1eebe9 -> ef0784990cc)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 069fa1eebe9 [MINOR][CONNECT] Fix typos in connect/plan.py add ef0784990cc [SPARK-41821][CONNECT][PYTHON] Fix doc test for DataFrame.describe No new revisions were added by this update. Summary of changes: python/pyspark/sql/connect/dataframe.py| 18 ++ python/pyspark/sql/tests/connect/test_connect_basic.py | 15 +++ 2 files changed, 25 insertions(+), 8 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [MINOR][CONNECT] Fix typos in connect/plan.py
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 069fa1eebe9 [MINOR][CONNECT] Fix typos in connect/plan.py 069fa1eebe9 is described below commit 069fa1eebe97d7ca4dde7cd87ef385d4857ffaad Author: Sandeep Singh AuthorDate: Thu Jan 5 14:34:41 2023 +0900 [MINOR][CONNECT] Fix typos in connect/plan.py ### What changes were proposed in this pull request? Fixing typos in connect/plan.py ### Why are the changes needed? Typos ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? just fixing typos Closes #39397 from techaddict/typo-in-createview. Authored-by: Sandeep Singh Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/connect/plan.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/connect/plan.py b/python/pyspark/sql/connect/plan.py index f63e39c7f3e..a6d1ad4068b 100644 --- a/python/pyspark/sql/connect/plan.py +++ b/python/pyspark/sql/connect/plan.py @@ -1227,7 +1227,7 @@ class CreateView(LogicalPlan): ) -> None: super().__init__(child) self._name = name -self._is_gloal = is_global +self._is_global = is_global self._replace = replace def command(self, session: "SparkConnectClient") -> proto.Command: @@ -1235,7 +1235,7 @@ class CreateView(LogicalPlan): plan = proto.Command() plan.create_dataframe_view.replace = self._replace -plan.create_dataframe_view.is_global = self._is_gloal +plan.create_dataframe_view.is_global = self._is_global plan.create_dataframe_view.name = self._name plan.create_dataframe_view.input.CopyFrom(self._child.plan(session)) return plan @@ -1587,7 +1587,7 @@ class RecoverPartitions(LogicalPlan): # self._table_name = table_name # # def plan(self, session: "SparkConnectClient") -> proto.Relation: -# plan = proto.Relation(catalog=proto.Catalog(is_cached=proto.IsCahed())) +# plan = proto.Relation(catalog=proto.Catalog(is_cached=proto.IsCached())) # plan.catalog.is_cached.table_name = self._table_name # return plan # - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41871][CONNECT] DataFrame hint parameter can be str, float or int
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 501beeb93b2 [SPARK-41871][CONNECT] DataFrame hint parameter can be str, float or int 501beeb93b2 is described below commit 501beeb93b2be42348fb1150204023e13ed5e35f Author: Sandeep Singh AuthorDate: Thu Jan 5 14:33:03 2023 +0900 [SPARK-41871][CONNECT] DataFrame hint parameter can be str, float or int ### What changes were proposed in this pull request? Spark Connect DataFrame hint parameter can be str, list, float, or int. This is done in parity with pyspark DataFrame.hint ### Why are the changes needed? For parity ### Does this PR introduce _any_ user-facing change? yes, allows more types as parameters. ### How was this patch tested? Enabling existing tests Closes #39393 from techaddict/SPARK-41871. Authored-by: Sandeep Singh Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/connect/dataframe.py| 6 -- python/pyspark/sql/connect/plan.py | 3 ++- python/pyspark/sql/tests/connect/test_connect_basic.py | 15 ++- 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index 2464441bcf2..de50e6f52ca 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -480,9 +480,11 @@ class DataFrame: def hint(self, name: str, *params: Any) -> "DataFrame": for param in params: -if param is not None and not isinstance(param, (int, str)): +# TODO(SPARK-41887): support list type as hint parameter +if param is not None and not isinstance(param, (int, str, float)): raise TypeError( -f"param should be a int or str, but got {type(param).__name__} {param}" +f"param should be a str, float or int, but got {type(param).__name__}" +f" {param}" ) return DataFrame.withPlan( diff --git a/python/pyspark/sql/connect/plan.py b/python/pyspark/sql/connect/plan.py index 1f4e4192fdf..f63e39c7f3e 100644 --- a/python/pyspark/sql/connect/plan.py +++ b/python/pyspark/sql/connect/plan.py @@ -403,8 +403,9 @@ class Hint(LogicalPlan): self.name = name +# TODO(SPARK-41887): support list type as hint parameter assert isinstance(params, list) and all( -p is None or isinstance(p, (int, str)) for p in params +p is not None and isinstance(p, (int, str, float)) for p in params ) self.params = params diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py b/python/pyspark/sql/tests/connect/test_connect_basic.py index fe6c2c65e25..57d2b675065 100644 --- a/python/pyspark/sql/tests/connect/test_connect_basic.py +++ b/python/pyspark/sql/tests/connect/test_connect_basic.py @@ -1193,13 +1193,26 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase): self.spark.read.table(self.tbl_name).hint("illegal").toPandas(), ) +# Hint with all supported parameter values +such_a_nice_list = ["itworks1", "itworks2", "itworks3"] +self.assert_eq( +self.connect.read.table(self.tbl_name).hint("my awesome hint", 1.2345, 2).toPandas(), +self.spark.read.table(self.tbl_name).hint("my awesome hint", 1.2345, 2).toPandas(), +) + # Hint with unsupported parameter values with self.assertRaises(SparkConnectException): self.connect.read.table(self.tbl_name).hint("REPARTITION", "id+1").toPandas() # Hint with unsupported parameter types with self.assertRaises(TypeError): -self.connect.read.table(self.tbl_name).hint("REPARTITION", 1.1).toPandas() +self.connect.read.table(self.tbl_name).hint("REPARTITION", range(5)).toPandas() + +# Hint with unsupported parameter types +with self.assertRaises(TypeError): +self.connect.read.table(self.tbl_name).hint( +"my awesome hint", 1.2345, 2, such_a_nice_list, range(6) +).toPandas() # Hint with wrong combination with self.assertRaises(SparkConnectException): - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41694][CORE] Isolate RocksDB path for Live UI and automatically cleanup when `SparkContext.stop()`
This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new fb5cf5f90c6 [SPARK-41694][CORE] Isolate RocksDB path for Live UI and automatically cleanup when `SparkContext.stop()` fb5cf5f90c6 is described below commit fb5cf5f90c6fe0860c811c0f7e06b9d8255d1772 Author: yangjie01 AuthorDate: Wed Jan 4 20:32:43 2023 -0800 [SPARK-41694][CORE] Isolate RocksDB path for Live UI and automatically cleanup when `SparkContext.stop()` ### What changes were proposed in this pull request? This pr brings two fixes: - Add sub-dir with `spark-ui` prefix under `spark.ui.store.path` for each Spark App to ensure that multiple Spark Apps can run normally use the same Spark Client with same `spark.ui.store.path` configuration - Automatically cleanup Live UI data when `SparkContext.stop()` ### Why are the changes needed? There are 2 issue before this pr: 1. Multiple Spark Apps can't run normally use the same Spark Client with same `spark.ui.store.path` configuration, the following exceptions will occur: ``` org.rocksdb.RocksDBException: While lock file: /${baseDir}/listing.rdb/LOCK: Resource temporarily unavailable ``` At the same time, only one Spark App can run normally use RocksDB as the Live UI store. After this pr, each Spark App uses an independent RocksDB directory when `spark.ui.store.path` is specified as Live UI store. 2. `spark.ui.store.path` directory not clean up when `SparkContext.stop()`: - The disk space occupied by the `spark.ui.store.path` directory will continue to grow. - When submitting new App and reusing the `spark.ui.store.path` directory, we will see the content related to the previous App, which is a bit strange After this pr, `spark.ui.store.path` directory is is automatically cleaned by default when `SparkContext` stop. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add new UTs Closes #39226 from LuciferYang/SPARK-41694. Lead-authored-by: yangjie01 Co-authored-by: YangJie Co-authored-by: Gengliang Wang Signed-off-by: Gengliang Wang --- .../org/apache/spark/status/AppStatusStore.scala | 29 +-- .../spark/status/AutoCleanupLiveUIDirSuite.scala | 56 ++ 2 files changed, 81 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index 70fcbfd2d51..6db2fa57833 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -18,12 +18,14 @@ package org.apache.spark.status import java.io.File +import java.io.IOException import java.util.{List => JList} import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap import org.apache.spark.{JobExecutionStatus, SparkConf, SparkContext} +import org.apache.spark.internal.Logging import org.apache.spark.internal.config.Status.LIVE_UI_LOCAL_STORE_DIR import org.apache.spark.status.api.v1 import org.apache.spark.storage.FallbackStorage.FALLBACK_BLOCK_MANAGER_ID @@ -36,7 +38,8 @@ import org.apache.spark.util.kvstore.KVStore */ private[spark] class AppStatusStore( val store: KVStore, -val listener: Option[AppStatusListener] = None) { +val listener: Option[AppStatusListener] = None, +val storePath: Option[File] = None) { def applicationInfo(): v1.ApplicationInfo = { try { @@ -733,6 +736,11 @@ private[spark] class AppStatusStore( def close(): Unit = { store.close() +cleanUpStorePath() + } + + private def cleanUpStorePath(): Unit = { +storePath.foreach(Utils.deleteRecursively) } def constructTaskDataList(taskDataWrapperIter: Iterable[TaskDataWrapper]): Seq[v1.TaskData] = { @@ -761,7 +769,7 @@ private[spark] class AppStatusStore( } } -private[spark] object AppStatusStore { +private[spark] object AppStatusStore extends Logging { val CURRENT_VERSION = 2L @@ -771,10 +779,23 @@ private[spark] object AppStatusStore { def createLiveStore( conf: SparkConf, appStatusSource: Option[AppStatusSource] = None): AppStatusStore = { -val storePath = conf.get(LIVE_UI_LOCAL_STORE_DIR).map(new File(_)) + +def createStorePath(rootDir: String): Option[File] = { + try { +val localDir = Utils.createDirectory(rootDir, "spark-ui") +logInfo(s"Created spark ui store directory at $rootDir") +Some(localDir) + } catch { +case e: IOException => + logError(s"Failed to create spark ui store path in $rootDir.", e) + None + } +} + +val storePath
[spark] branch master updated: [SPARK-41825][CONNECT][PYTHON] Enable doctests related to `DataFrame.show`
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 3c7fb30f61f [SPARK-41825][CONNECT][PYTHON] Enable doctests related to `DataFrame.show` 3c7fb30f61f is described below commit 3c7fb30f61f51532dce8d15600215afd2f2ff019 Author: Ruifeng Zheng AuthorDate: Thu Jan 5 12:19:22 2023 +0900 [SPARK-41825][CONNECT][PYTHON] Enable doctests related to `DataFrame.show` ### What changes were proposed in this pull request? enable a group of doctests ### Why are the changes needed? for test coverage ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? enabled tests Closes #39396 from zhengruifeng/connect_fix_41825. Authored-by: Ruifeng Zheng Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/connect/dataframe.py | 6 +- python/pyspark/sql/connect/functions.py | 4 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index fdb75d377b7..2464441bcf2 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -1427,11 +1427,7 @@ def _test() -> None: del pyspark.sql.connect.dataframe.DataFrame.explain.__doc__ del pyspark.sql.connect.dataframe.DataFrame.hint.__doc__ -# TODO(SPARK-41825): Dataframe.show formatting int as double -del pyspark.sql.connect.dataframe.DataFrame.fillna.__doc__ -del pyspark.sql.connect.dataframe.DataFrameNaFunctions.replace.__doc__ -del pyspark.sql.connect.dataframe.DataFrameNaFunctions.fill.__doc__ -del pyspark.sql.connect.dataframe.DataFrame.replace.__doc__ +# TODO(SPARK-41886): The doctest output has different order del pyspark.sql.connect.dataframe.DataFrame.intersect.__doc__ # TODO(SPARK-41625): Support Structured Streaming diff --git a/python/pyspark/sql/connect/functions.py b/python/pyspark/sql/connect/functions.py index 965a9a5331e..f2603d477cb 100644 --- a/python/pyspark/sql/connect/functions.py +++ b/python/pyspark/sql/connect/functions.py @@ -2360,10 +2360,6 @@ def _test() -> None: del pyspark.sql.connect.functions.date_trunc.__doc__ del pyspark.sql.connect.functions.from_utc_timestamp.__doc__ -# TODO(SPARK-41825): Dataframe.show formatting int as double -del pyspark.sql.connect.functions.coalesce.__doc__ -del pyspark.sql.connect.functions.sum_distinct.__doc__ - # TODO(SPARK-41834): implement Dataframe.conf del pyspark.sql.connect.functions.from_unixtime.__doc__ del pyspark.sql.connect.functions.timestamp_seconds.__doc__ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41878][CONNECT][TESTS] pyspark.sql.tests.test_dataframe - Add JIRAs or messages for skipped messages
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 8cbff3d5b6f [SPARK-41878][CONNECT][TESTS] pyspark.sql.tests.test_dataframe - Add JIRAs or messages for skipped messages 8cbff3d5b6f is described below commit 8cbff3d5b6f6a34e551aa42e965a16c3cb41e4c7 Author: Sandeep Singh AuthorDate: Thu Jan 5 08:53:08 2023 +0900 [SPARK-41878][CONNECT][TESTS] pyspark.sql.tests.test_dataframe - Add JIRAs or messages for skipped messages ### What changes were proposed in this pull request? This PR enables the reused PySpark tests in Spark Connect that pass now. And add JIRAs/ Messages to the skipped ones ### Why are the changes needed? To make sure on the test coverage. ### Does this PR introduce any user-facing change? No, test-only. ### How was this patch tested? Enabling tests Closes #39382 from techaddict/SPARK-41878. Authored-by: Sandeep Singh Signed-off-by: Hyukjin Kwon --- .../sql/tests/connect/test_parity_dataframe.py | 44 ++ 1 file changed, 36 insertions(+), 8 deletions(-) diff --git a/python/pyspark/sql/tests/connect/test_parity_dataframe.py b/python/pyspark/sql/tests/connect/test_parity_dataframe.py index ea1eb23fd4f..69f445b69ca 100644 --- a/python/pyspark/sql/tests/connect/test_parity_dataframe.py +++ b/python/pyspark/sql/tests/connect/test_parity_dataframe.py @@ -41,154 +41,182 @@ class DataFrameParityTests(DataFrameTestsMixin, ReusedSQLTestCase): cls._spark.stop() del os.environ["SPARK_REMOTE"] +# TODO(SPARK-41612): support Catalog.isCached @unittest.skip("Fails in Spark Connect, should enable.") def test_cache(self): super().test_cache() +# TODO(SPARK-41866): createDataframe support array type @unittest.skip("Fails in Spark Connect, should enable.") def test_create_dataframe_from_array_of_long(self): super().test_create_dataframe_from_array_of_long() +# TODO(SPARK-41868): Support data type Duration(NANOSECOND) @unittest.skip("Fails in Spark Connect, should enable.") def test_create_dataframe_from_pandas_with_day_time_interval(self): super().test_create_dataframe_from_pandas_with_day_time_interval() +# TODO(SPARK-41842): Support data type Timestamp(NANOSECOND, null) @unittest.skip("Fails in Spark Connect, should enable.") def test_create_dataframe_from_pandas_with_dst(self): super().test_create_dataframe_from_pandas_with_dst() +# TODO(SPARK-41842): Support data type Timestamp(NANOSECOND, null) @unittest.skip("Fails in Spark Connect, should enable.") def test_create_dataframe_from_pandas_with_timestamp(self): super().test_create_dataframe_from_pandas_with_timestamp() -@unittest.skip("Fails in Spark Connect, should enable.") -def test_create_dataframe_required_pandas_not_found(self): -super().test_create_dataframe_required_pandas_not_found() - +# TODO(SPARK-41855): createDataFrame doesn't handle None/NaN properly @unittest.skip("Fails in Spark Connect, should enable.") def test_create_nan_decimal_dataframe(self): super().test_create_nan_decimal_dataframe() +# TODO(SPARK-41869): DataFrame dropDuplicates should throw error on non list argument @unittest.skip("Fails in Spark Connect, should enable.") def test_drop_duplicates(self): super().test_drop_duplicates() +# TODO(SPARK-41870): Handle duplicate columns in `createDataFrame` @unittest.skip("Fails in Spark Connect, should enable.") def test_duplicated_column_names(self): super().test_duplicated_column_names() +# TODO(SPARK-41871): DataFrame hint parameter can be a float @unittest.skip("Fails in Spark Connect, should enable.") def test_extended_hint_types(self): super().test_extended_hint_types() +# TODO(SPARK-41872): Fix DataFrame createDataframe handling of None @unittest.skip("Fails in Spark Connect, should enable.") def test_fillna(self): super().test_fillna() +# TODO: comparing types, need to expose connect types @unittest.skip("Fails in Spark Connect, should enable.") def test_generic_hints(self): super().test_generic_hints() +# Spark Connect does not support RDD but the tests depend on them. @unittest.skip("Fails in Spark Connect, should enable.") def test_help_command(self): super().test_help_command() +# Spark Connect throws NotImplementedError tests expects IllegalArgumentException @unittest.skip("Fails in Spark Connect, should enable.") def test_invalid_join_method(self): super().test_invalid_join_method() +# TODO(SPARK-41834): Implement SparkSession.conf @unittest.skip("Fails
[spark] branch master updated: [SPARK-41833][SPARK-41881][SPARK-41815][CONNECT][PYTHON] Make `DataFrame.collect` handle None/NaN/Array/Binary porperly
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 731b89d5914 [SPARK-41833][SPARK-41881][SPARK-41815][CONNECT][PYTHON] Make `DataFrame.collect` handle None/NaN/Array/Binary porperly 731b89d5914 is described below commit 731b89d59143adb8a4ab3d16dd9f0e08c799abf2 Author: Ruifeng Zheng AuthorDate: Thu Jan 5 08:52:08 2023 +0900 [SPARK-41833][SPARK-41881][SPARK-41815][CONNECT][PYTHON] Make `DataFrame.collect` handle None/NaN/Array/Binary porperly ### What changes were proposed in this pull request? Existing `DataFrame.collect` directly collect coming Arrow batches into a Pandas DataFrame, and then convert each series into a Row, which is problematic since it can not correctly handle None/NaN/Arrays/Binary/etc. This PR refactor `DataFrame.collect` by directly building rows from the raw Arrow Table, in order to support: 1, None/NaN values; 2, ArrayType 3, BinaryType ### Why are the changes needed? To be consistent with PySpark ### Does this PR introduce _any_ user-facing change? yes ### How was this patch tested? enabled doctests Closes #39386 from zhengruifeng/connect_fix_41833. Authored-by: Ruifeng Zheng Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/connect/client.py| 54 ++--- python/pyspark/sql/connect/column.py| 2 -- python/pyspark/sql/connect/dataframe.py | 22 +++--- python/pyspark/sql/connect/functions.py | 31 +-- 4 files changed, 55 insertions(+), 54 deletions(-) diff --git a/python/pyspark/sql/connect/client.py b/python/pyspark/sql/connect/client.py index e78c4de0f70..832b5648676 100644 --- a/python/pyspark/sql/connect/client.py +++ b/python/pyspark/sql/connect/client.py @@ -21,12 +21,13 @@ import urllib.parse import uuid from typing import Iterable, Optional, Any, Union, List, Tuple, Dict, NoReturn, cast +import pandas as pd +import pyarrow as pa + import google.protobuf.message from grpc_status import rpc_status import grpc -import pandas from google.protobuf import text_format -import pyarrow as pa from google.rpc import error_details_pb2 import pyspark.sql.connect.proto as pb2 @@ -406,11 +407,22 @@ class SparkConnectClient(object): for x in metrics.metrics ] -def to_pandas(self, plan: pb2.Plan) -> "pandas.DataFrame": +def to_table(self, plan: pb2.Plan) -> "pa.Table": +logger.info(f"Executing plan {self._proto_to_string(plan)}") +req = self._execute_plan_request_with_metadata() +req.plan.CopyFrom(plan) +table, _ = self._execute_and_fetch(req) +return table + +def to_pandas(self, plan: pb2.Plan) -> "pd.DataFrame": logger.info(f"Executing plan {self._proto_to_string(plan)}") req = self._execute_plan_request_with_metadata() req.plan.CopyFrom(plan) -return self._execute_and_fetch(req) +table, metrics = self._execute_and_fetch(req) +pdf = table.to_pandas() +if len(metrics) > 0: +pdf.attrs["metrics"] = metrics +return pdf def _proto_schema_to_pyspark_schema(self, schema: pb2.DataType) -> DataType: return types.proto_schema_to_pyspark_data_type(schema) @@ -521,10 +533,6 @@ class SparkConnectClient(object): except grpc.RpcError as rpc_error: self._handle_error(rpc_error) -def _process_batch(self, arrow_batch: pb2.ExecutePlanResponse.ArrowBatch) -> "pandas.DataFrame": -with pa.ipc.open_stream(arrow_batch.data) as rd: -return rd.read_pandas() - def _execute(self, req: pb2.ExecutePlanRequest) -> None: """ Execute the passed request `req` and drop all results. @@ -546,12 +554,14 @@ class SparkConnectClient(object): except grpc.RpcError as rpc_error: self._handle_error(rpc_error) -def _execute_and_fetch(self, req: pb2.ExecutePlanRequest) -> "pandas.DataFrame": +def _execute_and_fetch( +self, req: pb2.ExecutePlanRequest +) -> Tuple["pa.Table", List[PlanMetrics]]: logger.info("ExecuteAndFetch") -import pandas as pd m: Optional[pb2.ExecutePlanResponse.Metrics] = None -result_dfs = [] + +batches: List[pa.RecordBatch] = [] try: for b in self._stub.ExecutePlan(req, metadata=self._builder.metadata()): @@ -567,25 +577,21 @@ class SparkConnectClient(object): f"Received arrow batch rows={b.arrow_batch.row_count} " f"size={len(b.arrow_batch.data)}" ) -pb = self._process_batch(b.arrow_batch) -result_dfs.append(pb) + +with
[spark] branch master updated: [SPARK-41846][CONNECT][PYTHON] Enable doctests for window functions
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a02bf2bb17e [SPARK-41846][CONNECT][PYTHON] Enable doctests for window functions a02bf2bb17e is described below commit a02bf2bb17e049eaa7aa26561876560c26396bbd Author: Ruifeng Zheng AuthorDate: Thu Jan 5 08:50:09 2023 +0900 [SPARK-41846][CONNECT][PYTHON] Enable doctests for window functions ### What changes were proposed in this pull request? Enable doctests for window functions ### Why are the changes needed? for test coverage ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? enabled tests Closes #39392 from zhengruifeng/connect_fix_41846. Authored-by: Ruifeng Zheng Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/connect/functions.py | 6 -- 1 file changed, 6 deletions(-) diff --git a/python/pyspark/sql/connect/functions.py b/python/pyspark/sql/connect/functions.py index 05ba52aa377..77c7db2d808 100644 --- a/python/pyspark/sql/connect/functions.py +++ b/python/pyspark/sql/connect/functions.py @@ -2418,12 +2418,6 @@ def _test() -> None: # TODO(SPARK-41845): Fix count bug del pyspark.sql.connect.functions.count.__doc__ -# TODO(SPARK-41846): window functions : unresolved columns -del pyspark.sql.connect.functions.rank.__doc__ -del pyspark.sql.connect.functions.cume_dist.__doc__ -del pyspark.sql.connect.functions.dense_rank.__doc__ -del pyspark.sql.connect.functions.percent_rank.__doc__ - # TODO(SPARK-41847): mapfield,structlist invalid type del pyspark.sql.connect.functions.element_at.__doc__ del pyspark.sql.connect.functions.explode.__doc__ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (915e9c67a95 -> c1acbfca491)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 915e9c67a95 [SPARK-41677][CORE][SQL][SS][UI] Add Protobuf serializer for StreamingQueryProgressWrapper add c1acbfca491 [SPARK-41840][CONNECT][PYTHON] Add the missing alias `groupby` No new revisions were added by this update. Summary of changes: python/pyspark/sql/connect/dataframe.py | 2 ++ python/pyspark/sql/connect/functions.py | 8 2 files changed, 2 insertions(+), 8 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41677][CORE][SQL][SS][UI] Add Protobuf serializer for StreamingQueryProgressWrapper
This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 915e9c67a95 [SPARK-41677][CORE][SQL][SS][UI] Add Protobuf serializer for StreamingQueryProgressWrapper 915e9c67a95 is described below commit 915e9c67a9581a1f66e70321879092d854c9fb3b Author: yangjie01 AuthorDate: Wed Jan 4 14:03:58 2023 -0800 [SPARK-41677][CORE][SQL][SS][UI] Add Protobuf serializer for StreamingQueryProgressWrapper ### What changes were proposed in this pull request? Add Protobuf serializer for `StreamingQueryProgressWrapper ` ### Why are the changes needed? Support fast and compact serialization/deserialization for `StreamingQueryProgressWrapper ` over RocksDB. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add new UT Closes #39357 from LuciferYang/SPARK-41677. Authored-by: yangjie01 Signed-off-by: Gengliang Wang --- .../apache/spark/status/protobuf/store_types.proto | 51 +++ .../org.apache.spark.status.protobuf.ProtobufSerDe | 1 + .../org/apache/spark/sql/streaming/progress.scala | 8 +- .../ui/StreamingQueryStatusListener.scala | 2 +- .../protobuf/sql/SinkProgressSerializer.scala | 42 + .../protobuf/sql/SourceProgressSerializer.scala| 65 .../sql/StateOperatorProgressSerializer.scala | 75 + .../sql/StreamingQueryProgressSerializer.scala | 89 +++ .../StreamingQueryProgressWrapperSerializer.scala | 40 + .../sql/KVStoreProtobufSerializerSuite.scala | 170 - 10 files changed, 537 insertions(+), 6 deletions(-) diff --git a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto index 6ba1915dfa1..2a45b5da1d8 100644 --- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto +++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto @@ -685,3 +685,54 @@ message ExecutorPeakMetricsDistributions { repeated double quantiles = 1; repeated ExecutorMetrics executor_metrics = 2; } + +message StateOperatorProgress { + string operator_name = 1; + int64 num_rows_total = 2; + int64 num_rows_updated = 3; + int64 all_updates_time_ms = 4; + int64 num_rows_removed = 5; + int64 all_removals_time_ms = 6; + int64 commit_time_ms = 7; + int64 memory_used_bytes = 8; + int64 num_rows_dropped_by_watermark = 9; + int64 num_shuffle_partitions = 10; + int64 num_state_store_instances = 11; + map custom_metrics = 12; +} + +message SourceProgress { + string description = 1; + string start_offset = 2; + string end_offset = 3; + string latest_offset = 4; + int64 num_input_rows = 5; + double input_rows_per_second = 6; + double processed_rows_per_second = 7; + map metrics = 8; +} + +message SinkProgress { + string description = 1; + int64 num_output_rows = 2; + map metrics = 3; +} + +message StreamingQueryProgress { + string id = 1; + string run_id = 2; + string name = 3; + string timestamp = 4; + int64 batch_id = 5; + int64 batch_duration = 6; + map duration_ms = 7; + map event_time = 8; + repeated StateOperatorProgress state_operators = 9; + repeated SourceProgress sources = 10; + SinkProgress sink = 11; + map observed_metrics = 12; +} + +message StreamingQueryProgressWrapper { + StreamingQueryProgress progress = 1; +} diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe index 7beff87d7ec..e907d559349 100644 --- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe +++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe @@ -18,3 +18,4 @@ org.apache.spark.status.protobuf.sql.SQLExecutionUIDataSerializer org.apache.spark.status.protobuf.sql.SparkPlanGraphWrapperSerializer org.apache.spark.status.protobuf.sql.StreamingQueryDataSerializer +org.apache.spark.status.protobuf.sql.StreamingQueryProgressWrapperSerializer diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 3d206e7780c..1b755ed70c6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS * Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger. */ @Evolving -class StateOperatorProgress private[sql]( +class StateOperatorProgress
[spark] branch master updated: [SPARK-41768][CORE] Refactor the definition of enum to follow with the code style
This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 492356d1646 [SPARK-41768][CORE] Refactor the definition of enum to follow with the code style 492356d1646 is described below commit 492356d1646a5e7571dad7e3107a11f765ee810a Author: panbingkun AuthorDate: Wed Jan 4 13:15:04 2023 -0800 [SPARK-41768][CORE] Refactor the definition of enum to follow with the code style ### What changes were proposed in this pull request? The pr aims to refactor the definition of enum in `UI protobuf serializer` to follow with the code style. ### Why are the changes needed? Following code style: https://developers.google.com/protocol-buffers/docs/style#enums https://user-images.githubusercontent.com/15246973/209946067-4c541101-be0d-49a6-9812-768ba98423a4.png;> ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA Existed UT. Closes #39286 from panbingkun/SPARK-41768. Authored-by: panbingkun Signed-off-by: Gengliang Wang --- .../apache/spark/status/protobuf/store_types.proto | 21 ++- .../status/protobuf/JobDataWrapperSerializer.scala | 12 ++ .../protobuf/JobExecutionStatusSerializer.scala| 43 ++ .../RDDOperationGraphWrapperSerializer.scala | 35 -- .../status/protobuf/StageStatusSerializer.scala| 26 + .../sql/SQLExecutionUIDataSerializer.scala | 7 ++-- 6 files changed, 110 insertions(+), 34 deletions(-) diff --git a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto index 38b82518ddd..6ba1915dfa1 100644 --- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto +++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto @@ -27,10 +27,10 @@ package org.apache.spark.status.protobuf; enum JobExecutionStatus { JOB_EXECUTION_STATUS_UNSPECIFIED = 0; - RUNNING = 1; - SUCCEEDED = 2; - FAILED = 3; - UNKNOWN = 4; + JOB_EXECUTION_STATUS_RUNNING = 1; + JOB_EXECUTION_STATUS_SUCCEEDED = 2; + JOB_EXECUTION_STATUS_FAILED = 3; + JOB_EXECUTION_STATUS_UNKNOWN = 4; } message JobData { @@ -434,13 +434,14 @@ message RDDOperationEdge { int32 to_id = 2; } +enum DeterministicLevel { + DETERMINISTIC_LEVEL_UNSPECIFIED = 0; + DETERMINISTIC_LEVEL_DETERMINATE = 1; + DETERMINISTIC_LEVEL_UNORDERED = 2; + DETERMINISTIC_LEVEL_INDETERMINATE = 3; +} + message RDDOperationNode { - enum DeterministicLevel { -UNSPECIFIED = 0; -DETERMINATE = 1; -UNORDERED = 2; -INDETERMINATE = 3; - } int32 id = 1; string name = 2; bool cached = 3; diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala index 98ac2d643c9..e2e2a1a8d89 100644 --- a/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala +++ b/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala @@ -17,10 +17,10 @@ package org.apache.spark.status.protobuf -import collection.JavaConverters._ import java.util.Date -import org.apache.spark.JobExecutionStatus +import collection.JavaConverters._ + import org.apache.spark.status.JobDataWrapper import org.apache.spark.status.api.v1.JobData import org.apache.spark.status.protobuf.Utils.getOptional @@ -55,7 +55,7 @@ class JobDataWrapperSerializer extends ProtobufSerDe { val jobDataBuilder = StoreTypes.JobData.newBuilder() jobDataBuilder.setJobId(jobData.jobId.toLong) .setName(jobData.name) - .setStatus(serializeJobExecutionStatus(jobData.status)) + .setStatus(JobExecutionStatusSerializer.serialize(jobData.status)) .setNumTasks(jobData.numTasks) .setNumActiveTasks(jobData.numActiveTasks) .setNumCompletedTasks(jobData.numCompletedTasks) @@ -89,7 +89,7 @@ class JobDataWrapperSerializer extends ProtobufSerDe { getOptional(info.hasSubmissionTime, () => new Date(info.getSubmissionTime)) val completionTime = getOptional(info.hasCompletionTime, () => new Date(info.getCompletionTime)) val jobGroup = getOptional(info.hasJobGroup, info.getJobGroup) -val status = JobExecutionStatus.valueOf(info.getStatus.toString) +val status = JobExecutionStatusSerializer.deserialize(info.getStatus) new JobData( jobId = info.getJobId.toInt, @@ -113,8 +113,4 @@ class JobDataWrapperSerializer extends ProtobufSerDe { numFailedStages = info.getNumFailedStages, killedTasksSummary = info.getKillTasksSummaryMap.asScala.mapValues(_.toInt).toMap) } - - private def serializeJobExecutionStatus(j:
[spark] branch master updated: [SPARK-41573][SQL] Assign name to _LEGACY_ERROR_TEMP_2136
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f352f103ed5 [SPARK-41573][SQL] Assign name to _LEGACY_ERROR_TEMP_2136 f352f103ed5 is described below commit f352f103ed512806abb3f642571a0c595b8b0509 Author: itholic AuthorDate: Thu Jan 5 00:21:32 2023 +0500 [SPARK-41573][SQL] Assign name to _LEGACY_ERROR_TEMP_2136 ### What changes were proposed in this pull request? This PR proposes to assign name to _LEGACY_ERROR_TEMP_2136, "CANNOT_PARSE_JSON_FIELD". ### Why are the changes needed? We should assign proper name to _LEGACY_ERROR_TEMP_* ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? `./build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite*` Closes #39284 from itholic/LEGACY_2136. Authored-by: itholic Signed-off-by: Max Gekk --- core/src/main/resources/error/error-classes.json| 10 +- .../spark/sql/catalyst/json/JacksonParser.scala | 2 +- .../spark/sql/errors/QueryExecutionErrors.scala | 8 .../org/apache/spark/sql/JsonFunctionsSuite.scala | 21 ++--- 4 files changed, 24 insertions(+), 17 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index a7b120ef427..120925f5254 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -75,6 +75,11 @@ ], "sqlState" : "42000" }, + "CANNOT_PARSE_JSON_FIELD" : { +"message" : [ + "Cannot parse the field name and the value of the JSON token type to target Spark data type " +] + }, "CANNOT_PARSE_PROTOBUF_DESCRIPTOR" : { "message" : [ "Error parsing file descriptor byte[] into Descriptor object" @@ -4105,11 +4110,6 @@ "Failed to parse an empty string for data type " ] }, - "_LEGACY_ERROR_TEMP_2136" : { -"message" : [ - "Failed to parse field name , field value , [] to target spark data type []." -] - }, "_LEGACY_ERROR_TEMP_2137" : { "message" : [ "Root converter returned null" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index ee21a1e2b76..3fe26e87499 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -430,7 +430,7 @@ class JacksonParser( case token => // We cannot parse this token based on the given data type. So, we throw a // RuntimeException and this exception will be caught by `parse` method. - throw QueryExecutionErrors.failToParseValueForDataTypeError(parser, token, dataType) + throw QueryExecutionErrors.cannotParseJSONFieldError(parser, token, dataType) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 3e234cfee2c..44a1972272f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -1444,15 +1444,15 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { "dataType" -> dataType.catalogString)) } - def failToParseValueForDataTypeError(parser: JsonParser, token: JsonToken, dataType: DataType) + def cannotParseJSONFieldError(parser: JsonParser, jsonType: JsonToken, dataType: DataType) : SparkRuntimeException = { new SparkRuntimeException( - errorClass = "_LEGACY_ERROR_TEMP_2136", + errorClass = "CANNOT_PARSE_JSON_FIELD", messageParameters = Map( "fieldName" -> parser.getCurrentName.toString(), "fieldValue" -> parser.getText.toString(), -"token" -> token.toString(), -"dataType" -> dataType.toString())) +"jsonType" -> jsonType.toString(), +"dataType" -> toSQLType(dataType))) } def rootConverterReturnNullError(): SparkRuntimeException = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 399665c0de6..0f282336d58 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -22,8 +22,9 @@ import java.time.{Duration, LocalDateTime, Period} import java.util.Locale import collection.JavaConverters._ +import
[spark] branch master updated (23e3c9b7c2f -> d0a598922e9)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 23e3c9b7c2f [SPARK-41828][CONNECT][PYTHON] Make `createDataFrame` support empty dataframe add d0a598922e9 [MINOR] Fix a typo "from from" -> "from" No new revisions were added by this update. Summary of changes: dev/ansible-for-test-node/roles/jenkins-worker/README.md | 2 +- docs/running-on-yarn.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41828][CONNECT][PYTHON] Make `createDataFrame` support empty dataframe
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 23e3c9b7c2f [SPARK-41828][CONNECT][PYTHON] Make `createDataFrame` support empty dataframe 23e3c9b7c2f is described below commit 23e3c9b7c2f08c5350992934cf660de6d2793982 Author: Ruifeng Zheng AuthorDate: Wed Jan 4 17:45:46 2023 +0900 [SPARK-41828][CONNECT][PYTHON] Make `createDataFrame` support empty dataframe ### What changes were proposed in this pull request? Make `createDataFrame` support empty dataframe: ``` In [24]: spark.createDataFrame([], schema="x STRING, y INTEGER") Out[24]: DataFrame[x: string, y: int] ``` ### Why are the changes needed? to be consistent with PySpark ### Does this PR introduce _any_ user-facing change? yes ### How was this patch tested? added UT and enabled doctests Closes #39379 from zhengruifeng/connect_fix_41828. Authored-by: Ruifeng Zheng Signed-off-by: Hyukjin Kwon --- .../main/protobuf/spark/connect/relations.proto| 18 ++-- .../sql/connect/planner/SparkConnectPlanner.scala | 68 - python/pyspark/sql/connect/dataframe.py| 3 - python/pyspark/sql/connect/plan.py | 34 --- python/pyspark/sql/connect/proto/relations_pb2.py | 110 ++--- python/pyspark/sql/connect/proto/relations_pb2.pyi | 41 python/pyspark/sql/connect/session.py | 32 -- .../sql/tests/connect/test_connect_basic.py| 28 ++ 8 files changed, 193 insertions(+), 141 deletions(-) diff --git a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto index 51981714ded..c0f22dd4576 100644 --- a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto +++ b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto @@ -328,20 +328,16 @@ message Deduplicate { // A relation that does not need to be qualified by name. message LocalRelation { - // Local collection data serialized into Arrow IPC streaming format which contains + // (Optional) Local collection data serialized into Arrow IPC streaming format which contains // the schema of the data. - bytes data = 1; + optional bytes data = 1; - // (Optional) The user provided schema. + // (Optional) The schema of local data. + // It should be either a DDL-formatted type string or a JSON string. // - // The Sever side will update the column names and data types according to this schema. - oneof schema { - -DataType datatype = 2; - -// Server will use Catalyst parser to parse this string to DataType. -string datatype_str = 3; - } + // The server side will update the column names and data types according to this schema. + // If the 'data' is not provided, then this schema will be required. + optional string schema = 2; } // Relation of type [[Sample]] that samples a fraction of the dataset. diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 754bb7ced9e..b4c882541e0 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -571,47 +571,61 @@ class SparkConnectPlanner(session: SparkSession) { try { parser.parseTableSchema(sqlText) } catch { - case _: ParseException => + case e: ParseException => try { parser.parseDataType(sqlText) } catch { case _: ParseException => -parser.parseDataType(s"struct<${sqlText.trim}>") +try { + parser.parseDataType(s"struct<${sqlText.trim}>") +} catch { + case _: ParseException => +throw e +} } } } private def transformLocalRelation(rel: proto.LocalRelation): LogicalPlan = { -val (rows, structType) = ArrowConverters.fromBatchWithSchemaIterator( - Iterator(rel.getData.toByteArray), - TaskContext.get()) -if (structType == null) { - throw InvalidPlanInput(s"Input data for LocalRelation does not produce a schema.") +var schema: StructType = null +if (rel.hasSchema) { + val schemaType = DataType.parseTypeWithFallback( +rel.getSchema, +parseDatatypeString, +fallbackParser = DataType.fromJson) + schema = schemaType match { +case s: StructType => s +case d => StructType(Seq(StructField("value", d))) + } } -val