[spark] branch master updated: [SPARK-41891][CONNECT][TESTS] Enable test_add_months_function, test_array_repeat, test_dayofweek, test_first_last_ignorenulls, test_inline, test_window_time, test_recipr

2023-01-04 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 58d5dc3d573 [SPARK-41891][CONNECT][TESTS] Enable 
test_add_months_function, test_array_repeat, test_dayofweek, 
test_first_last_ignorenulls, test_inline, test_window_time, 
test_reciprocal_trig_functions
58d5dc3d573 is described below

commit 58d5dc3d573dfbb6d21ea41d101550146756f45b
Author: Sandeep Singh 
AuthorDate: Thu Jan 5 16:59:19 2023 +0900

[SPARK-41891][CONNECT][TESTS] Enable test_add_months_function, 
test_array_repeat, test_dayofweek, test_first_last_ignorenulls, test_inline, 
test_window_time, test_reciprocal_trig_functions

### What changes were proposed in this pull request?
Enabling tests in connect/test_parity_functions.py

### Why are the changes needed?
Improved coverage

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
New Tests

Closes #39400 from techaddict/SPARK-41891.

Authored-by: Sandeep Singh 
Signed-off-by: Hyukjin Kwon 
---
 .../sql/tests/connect/test_parity_functions.py | 28 --
 1 file changed, 28 deletions(-)

diff --git a/python/pyspark/sql/tests/connect/test_parity_functions.py 
b/python/pyspark/sql/tests/connect/test_parity_functions.py
index 78ccbd49148..3c616b5c864 100644
--- a/python/pyspark/sql/tests/connect/test_parity_functions.py
+++ b/python/pyspark/sql/tests/connect/test_parity_functions.py
@@ -44,14 +44,6 @@ class FunctionsParityTests(ReusedSQLTestCase, 
FunctionsTestsMixin):
 cls.spark = cls._spark.stop()
 del os.environ["SPARK_REMOTE"]
 
-@unittest.skip("Fails in Spark Connect, should enable.")
-def test_add_months_function(self):
-super().test_add_months_function()
-
-@unittest.skip("Fails in Spark Connect, should enable.")
-def test_array_repeat(self):
-super().test_array_repeat()
-
 @unittest.skip("Fails in Spark Connect, should enable.")
 def test_assert_true(self):
 super().test_assert_true()
@@ -68,18 +60,10 @@ class FunctionsParityTests(ReusedSQLTestCase, 
FunctionsTestsMixin):
 def test_date_sub_function(self):
 super().test_date_sub_function()
 
-@unittest.skip("Fails in Spark Connect, should enable.")
-def test_dayofweek(self):
-super().test_dayofweek()
-
 @unittest.skip("Fails in Spark Connect, should enable.")
 def test_explode(self):
 super().test_explode()
 
-@unittest.skip("Fails in Spark Connect, should enable.")
-def test_first_last_ignorenulls(self):
-super().test_first_last_ignorenulls()
-
 @unittest.skip("Fails in Spark Connect, should enable.")
 def test_function_parity(self):
 super().test_function_parity()
@@ -88,10 +72,6 @@ class FunctionsParityTests(ReusedSQLTestCase, 
FunctionsTestsMixin):
 def test_functions_broadcast(self):
 super().test_functions_broadcast()
 
-@unittest.skip("Fails in Spark Connect, should enable.")
-def test_inline(self):
-super().test_inline()
-
 @unittest.skip("Fails in Spark Connect, should enable.")
 def test_input_file_name_reset_for_rdd(self):
 super().test_input_file_name_reset_for_rdd()
@@ -160,18 +140,10 @@ class FunctionsParityTests(ReusedSQLTestCase, 
FunctionsTestsMixin):
 def test_window_functions_without_partitionBy(self):
 super().test_window_functions_without_partitionBy()
 
-@unittest.skip("Fails in Spark Connect, should enable.")
-def test_window_time(self):
-super().test_window_time()
-
 @unittest.skip("Fails in Spark Connect, should enable.")
 def test_rand_functions(self):
 super().test_rand_functions()
 
-@unittest.skip("Fails in Spark Connect, should enable.")
-def test_reciprocal_trig_functions(self):
-super().test_reciprocal_trig_functions()
-
 @unittest.skip("Fails in Spark Connect, should enable.")
 def test_sampleby(self):
 super().test_sampleby()


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41829][CONNECT][PYTHON] Add the missing ordering parameter in `Sort` and `sortWithinPartitions`

2023-01-04 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 6ee22158f2a [SPARK-41829][CONNECT][PYTHON] Add the missing ordering 
parameter in `Sort` and `sortWithinPartitions`
6ee22158f2a is described below

commit 6ee22158f2a1891d39c4274fb6fe96d6fbb6c1fc
Author: Ruifeng Zheng 
AuthorDate: Thu Jan 5 15:26:17 2023 +0800

[SPARK-41829][CONNECT][PYTHON] Add the missing ordering parameter in `Sort` 
and `sortWithinPartitions`

### What changes were proposed in this pull request?
Add the missing ordering parameter in `Sort` and `sortWithinPartitions`

### Why are the changes needed?
API coverage

### Does this PR introduce _any_ user-facing change?
yes

### How was this patch tested?
enabled doctests

Closes #39398 from zhengruifeng/connect_fix_41829.

Authored-by: Ruifeng Zheng 
Signed-off-by: Ruifeng Zheng 
---
 python/pyspark/sql/connect/dataframe.py | 59 -
 python/pyspark/sql/connect/plan.py  | 21 +---
 2 files changed, 59 insertions(+), 21 deletions(-)

diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index a22c2cc6421..13a421ca72a 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -22,6 +22,7 @@ from typing import (
 Optional,
 Tuple,
 Union,
+Sequence,
 TYPE_CHECKING,
 overload,
 Callable,
@@ -44,7 +45,13 @@ from pyspark.sql.connect.group import GroupedData
 from pyspark.sql.connect.readwriter import DataFrameWriter
 from pyspark.sql.connect.column import Column
 from pyspark.sql.connect.expressions import UnresolvedRegex
-from pyspark.sql.connect.functions import _invoke_function, col, lit, expr as 
sql_expression
+from pyspark.sql.connect.functions import (
+_to_col,
+_invoke_function,
+col,
+lit,
+expr as sql_expression,
+)
 from pyspark.sql.dataframe import (
 DataFrame as PySparkDataFrame,
 DataFrameNaFunctions as PySparkDataFrameNaFunctions,
@@ -342,18 +349,56 @@ class DataFrame:
 
 tail.__doc__ = PySparkDataFrame.tail.__doc__
 
-def sort(self, *cols: "ColumnOrName") -> "DataFrame":
+def _sort_cols(
+self, cols: Sequence[Union[str, Column, List[Union[str, Column, 
kwargs: Dict[str, Any]
+) -> List[Column]:
+"""Return a JVM Seq of Columns that describes the sort order"""
+if cols is None:
+raise ValueError("should sort by at least one column")
+
+_cols: List[Column] = []
+if len(cols) == 1 and isinstance(cols[0], list):
+_cols = [_to_col(c) for c in cols[0]]
+else:
+_cols = [_to_col(cast("ColumnOrName", c)) for c in cols]
+
+ascending = kwargs.get("ascending", True)
+if isinstance(ascending, (bool, int)):
+if not ascending:
+_cols = [c.desc() for c in _cols]
+elif isinstance(ascending, list):
+_cols = [c if asc else c.desc() for asc, c in zip(ascending, 
_cols)]
+else:
+raise TypeError("ascending can only be boolean or list, but got 
%s" % type(ascending))
+
+return _cols
+
+def sort(
+self, *cols: Union[str, Column, List[Union[str, Column]]], **kwargs: 
Any
+) -> "DataFrame":
 return DataFrame.withPlan(
-plan.Sort(self._plan, columns=list(cols), is_global=True), 
session=self._session
+plan.Sort(
+self._plan,
+columns=self._sort_cols(cols, kwargs),
+is_global=True,
+),
+session=self._session,
 )
 
 sort.__doc__ = PySparkDataFrame.sort.__doc__
 
 orderBy = sort
 
-def sortWithinPartitions(self, *cols: "ColumnOrName") -> "DataFrame":
+def sortWithinPartitions(
+self, *cols: Union[str, Column, List[Union[str, Column]]], **kwargs: 
Any
+) -> "DataFrame":
 return DataFrame.withPlan(
-plan.Sort(self._plan, columns=list(cols), is_global=False), 
session=self._session
+plan.Sort(
+self._plan,
+columns=self._sort_cols(cols, kwargs),
+is_global=False,
+),
+session=self._session,
 )
 
 sortWithinPartitions.__doc__ = 
PySparkDataFrame.sortWithinPartitions.__doc__
@@ -1440,10 +1485,6 @@ def _test() -> None:
 # TODO(SPARK-41827): groupBy requires all cols be Column or str
 del pyspark.sql.connect.dataframe.DataFrame.groupBy.__doc__
 
-# TODO(SPARK-41829): Add Dataframe sort ordering
-del pyspark.sql.connect.dataframe.DataFrame.sort.__doc__
-del 
pyspark.sql.connect.dataframe.DataFrame.sortWithinPartitions.__doc__
-
 # TODO(SPARK-41830): fix sample 

[spark] branch master updated: [SPARK-41580][SQL] Assign name to _LEGACY_ERROR_TEMP_2137

2023-01-04 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 4d3bc8f5b55 [SPARK-41580][SQL] Assign name to _LEGACY_ERROR_TEMP_2137
4d3bc8f5b55 is described below

commit 4d3bc8f5b55969f7c954991239ff43f9faba1346
Author: itholic 
AuthorDate: Thu Jan 5 10:58:14 2023 +0500

[SPARK-41580][SQL] Assign name to _LEGACY_ERROR_TEMP_2137

### What changes were proposed in this pull request?

This PR proposes to assign name to _LEGACY_ERROR_TEMP_2137, 
"INVALID_JSON_ROOT_FIELD".

### Why are the changes needed?

We should assign proper name to _LEGACY_ERROR_TEMP_*

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

`./build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite*`

Closes #39305 from itholic/LEGACY_2137.

Authored-by: itholic 
Signed-off-by: Max Gekk 
---
 core/src/main/resources/error/error-classes.json   | 10 +-
 .../org/apache/spark/sql/errors/QueryExecutionErrors.scala |  2 +-
 .../spark/sql/execution/datasources/json/JsonSuite.scala   | 14 +++---
 3 files changed, 17 insertions(+), 9 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index 12f4b0f9c37..29cafdcc1b6 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -760,6 +760,11 @@
   "The identifier  is invalid. Please, consider quoting it with 
back-quotes as ``."
 ]
   },
+  "INVALID_JSON_ROOT_FIELD" : {
+"message" : [
+  "Cannot convert JSON root field to target Spark type."
+]
+  },
   "INVALID_JSON_SCHEMA_MAP_TYPE" : {
 "message" : [
   "Input schema  can only contain STRING as a key type for a 
MAP."
@@ -4110,11 +4115,6 @@
   "Failed to parse an empty string for data type "
 ]
   },
-  "_LEGACY_ERROR_TEMP_2137" : {
-"message" : [
-  "Root converter returned null"
-]
-  },
   "_LEGACY_ERROR_TEMP_2138" : {
 "message" : [
   "Cannot have circular references in bean class, but got the circular 
reference of class "
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 227e86994f5..0c92d56ed04 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -1457,7 +1457,7 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase {
 
   def rootConverterReturnNullError(): SparkRuntimeException = {
 new SparkRuntimeException(
-  errorClass = "_LEGACY_ERROR_TEMP_2137",
+  errorClass = "INVALID_JSON_ROOT_FIELD",
   messageParameters = Map.empty)
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 0d2c98316e7..a4b7df9af42 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -25,11 +25,12 @@ import java.time.{Duration, Instant, LocalDate, 
LocalDateTime, Period, ZoneId}
 import java.util.Locale
 
 import com.fasterxml.jackson.core.JsonFactory
+import org.apache.commons.lang3.exception.ExceptionUtils
 import org.apache.hadoop.fs.{Path, PathFilter}
 import org.apache.hadoop.io.SequenceFile.CompressionType
 import org.apache.hadoop.io.compress.GzipCodec
 
-import org.apache.spark.{SparkConf, SparkException, SparkUpgradeException, 
TestUtils}
+import org.apache.spark.{SparkConf, SparkException, SparkRuntimeException, 
SparkUpgradeException, TestUtils}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{functions => F, _}
 import org.apache.spark.sql.catalyst.json._
@@ -3192,10 +3193,17 @@ abstract class JsonSuite
   }
 
   test("SPARK-36379: proceed parsing with root nulls in permissive mode") {
-assert(intercept[SparkException] {
+val exception = intercept[SparkException] {
   spark.read.option("mode", "failfast")
 .schema("a string").json(Seq("""[{"a": "str"}, 
null]""").toDS).collect()
-}.getMessage.contains("Malformed records are detected"))
+}
+assert(exception.getMessage.contains("Malformed records are detected"))
+
+checkError(
+  exception = 
ExceptionUtils.getRootCause(exception).asInstanceOf[SparkRuntimeException],
+  errorClass = "INVALID_JSON_ROOT_FIELD",
+  parameters = Map.empty
+)
 
 // Permissive modes should proceed parsing malformed records (null).
 // 

[spark] branch master updated: [SPARK-41576][SQL] Assign name to _LEGACY_ERROR_TEMP_2051

2023-01-04 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 76d7c857078 [SPARK-41576][SQL] Assign name to _LEGACY_ERROR_TEMP_2051
76d7c857078 is described below

commit 76d7c8570788c773720c6e143e496647dfe9ebe0
Author: itholic 
AuthorDate: Thu Jan 5 10:47:46 2023 +0500

[SPARK-41576][SQL] Assign name to _LEGACY_ERROR_TEMP_2051

### What changes were proposed in this pull request?

This PR proposes to assign name to _LEGACY_ERROR_TEMP_2051, 
"DATA_SOURCE_NOT_FOUND".

### Why are the changes needed?

We should assign proper name to _LEGACY_ERROR_TEMP_*

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

`./build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite*`

Closes #39281 from itholic/LEGACY_2051.

Authored-by: itholic 
Signed-off-by: Max Gekk 
---
 core/src/main/resources/error/error-classes.json | 10 +-
 .../org/apache/spark/sql/errors/QueryExecutionErrors.scala   |  4 ++--
 .../apache/spark/sql/execution/datasources/DataSource.scala  |  2 +-
 .../org/apache/spark/sql/execution/command/DDLSuite.scala| 12 
 .../apache/spark/sql/sources/ResolvedDataSourceSuite.scala   |  9 +++--
 5 files changed, 23 insertions(+), 14 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index 120925f5254..12f4b0f9c37 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -441,6 +441,11 @@
 ],
 "sqlState" : "42000"
   },
+  "DATA_SOURCE_NOT_FOUND" : {
+"message" : [
+  "Failed to find the data source: . Please find packages at 
`https://spark.apache.org/third-party-projects.html`.;
+]
+  },
   "DATETIME_OVERFLOW" : {
 "message" : [
   "Datetime operation overflow: ."
@@ -3696,11 +3701,6 @@
   "Expected exactly one path to be specified, but got: "
 ]
   },
-  "_LEGACY_ERROR_TEMP_2051" : {
-"message" : [
-  "Failed to find data source: . Please find packages at 
https://spark.apache.org/third-party-projects.html;
-]
-  },
   "_LEGACY_ERROR_TEMP_2052" : {
 "message" : [
   " was removed in Spark 2.0. Please check if your library is 
compatible with Spark 2.0"
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 44a1972272f..227e86994f5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -731,10 +731,10 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase {
   messageParameters = Map("paths" -> allPaths.mkString(", ")))
   }
 
-  def failedToFindDataSourceError(
+  def dataSourceNotFoundError(
   provider: String, error: Throwable): SparkClassNotFoundException = {
 new SparkClassNotFoundException(
-  errorClass = "_LEGACY_ERROR_TEMP_2051",
+  errorClass = "DATA_SOURCE_NOT_FOUND",
   messageParameters = Map("provider" -> provider),
   cause = error)
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index edbdd6bbc67..9bb5191dc01 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -643,7 +643,7 @@ object DataSource extends Logging {
 } else if (provider1.toLowerCase(Locale.ROOT) == "kafka") {
   throw 
QueryCompilationErrors.failedToFindKafkaDataSourceError(provider1)
 } else {
-  throw 
QueryExecutionErrors.failedToFindDataSourceError(provider1, error)
+  throw 
QueryExecutionErrors.dataSourceNotFoundError(provider1, error)
 }
 }
   } catch {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 6cc37a41210..f5d17b142e2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -24,7 +24,7 @@ import java.util.Locale
 import org.apache.hadoop.fs.{Path, RawLocalFileSystem}
 import org.apache.hadoop.fs.permission.{AclEntry, AclStatus}
 
-import org.apache.spark.{SparkException, SparkFiles, SparkRuntimeException}
+import 

[spark] branch master updated (069fa1eebe9 -> ef0784990cc)

2023-01-04 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 069fa1eebe9 [MINOR][CONNECT] Fix typos in connect/plan.py
 add ef0784990cc [SPARK-41821][CONNECT][PYTHON] Fix doc test for 
DataFrame.describe

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/connect/dataframe.py| 18 ++
 python/pyspark/sql/tests/connect/test_connect_basic.py | 15 +++
 2 files changed, 25 insertions(+), 8 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [MINOR][CONNECT] Fix typos in connect/plan.py

2023-01-04 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 069fa1eebe9 [MINOR][CONNECT] Fix typos in connect/plan.py
069fa1eebe9 is described below

commit 069fa1eebe97d7ca4dde7cd87ef385d4857ffaad
Author: Sandeep Singh 
AuthorDate: Thu Jan 5 14:34:41 2023 +0900

[MINOR][CONNECT] Fix typos in connect/plan.py

### What changes were proposed in this pull request?
Fixing typos in connect/plan.py

### Why are the changes needed?
Typos

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
just fixing typos

Closes #39397 from techaddict/typo-in-createview.

Authored-by: Sandeep Singh 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/connect/plan.py | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/python/pyspark/sql/connect/plan.py 
b/python/pyspark/sql/connect/plan.py
index f63e39c7f3e..a6d1ad4068b 100644
--- a/python/pyspark/sql/connect/plan.py
+++ b/python/pyspark/sql/connect/plan.py
@@ -1227,7 +1227,7 @@ class CreateView(LogicalPlan):
 ) -> None:
 super().__init__(child)
 self._name = name
-self._is_gloal = is_global
+self._is_global = is_global
 self._replace = replace
 
 def command(self, session: "SparkConnectClient") -> proto.Command:
@@ -1235,7 +1235,7 @@ class CreateView(LogicalPlan):
 
 plan = proto.Command()
 plan.create_dataframe_view.replace = self._replace
-plan.create_dataframe_view.is_global = self._is_gloal
+plan.create_dataframe_view.is_global = self._is_global
 plan.create_dataframe_view.name = self._name
 plan.create_dataframe_view.input.CopyFrom(self._child.plan(session))
 return plan
@@ -1587,7 +1587,7 @@ class RecoverPartitions(LogicalPlan):
 # self._table_name = table_name
 #
 # def plan(self, session: "SparkConnectClient") -> proto.Relation:
-# plan = 
proto.Relation(catalog=proto.Catalog(is_cached=proto.IsCahed()))
+# plan = 
proto.Relation(catalog=proto.Catalog(is_cached=proto.IsCached()))
 # plan.catalog.is_cached.table_name = self._table_name
 # return plan
 #


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41871][CONNECT] DataFrame hint parameter can be str, float or int

2023-01-04 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 501beeb93b2 [SPARK-41871][CONNECT] DataFrame hint parameter can be 
str, float or int
501beeb93b2 is described below

commit 501beeb93b2be42348fb1150204023e13ed5e35f
Author: Sandeep Singh 
AuthorDate: Thu Jan 5 14:33:03 2023 +0900

[SPARK-41871][CONNECT] DataFrame hint parameter can be str, float or int

### What changes were proposed in this pull request?
Spark Connect DataFrame hint parameter can be str, list, float, or int. 
This is done in parity with pyspark DataFrame.hint

### Why are the changes needed?
For parity

### Does this PR introduce _any_ user-facing change?
yes, allows more types as parameters.

### How was this patch tested?
Enabling existing tests

Closes #39393 from techaddict/SPARK-41871.

Authored-by: Sandeep Singh 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/connect/dataframe.py|  6 --
 python/pyspark/sql/connect/plan.py |  3 ++-
 python/pyspark/sql/tests/connect/test_connect_basic.py | 15 ++-
 3 files changed, 20 insertions(+), 4 deletions(-)

diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index 2464441bcf2..de50e6f52ca 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -480,9 +480,11 @@ class DataFrame:
 
 def hint(self, name: str, *params: Any) -> "DataFrame":
 for param in params:
-if param is not None and not isinstance(param, (int, str)):
+# TODO(SPARK-41887): support list type as hint parameter
+if param is not None and not isinstance(param, (int, str, float)):
 raise TypeError(
-f"param should be a int or str, but got 
{type(param).__name__} {param}"
+f"param should be a str, float or int, but got 
{type(param).__name__}"
+f" {param}"
 )
 
 return DataFrame.withPlan(
diff --git a/python/pyspark/sql/connect/plan.py 
b/python/pyspark/sql/connect/plan.py
index 1f4e4192fdf..f63e39c7f3e 100644
--- a/python/pyspark/sql/connect/plan.py
+++ b/python/pyspark/sql/connect/plan.py
@@ -403,8 +403,9 @@ class Hint(LogicalPlan):
 
 self.name = name
 
+# TODO(SPARK-41887): support list type as hint parameter
 assert isinstance(params, list) and all(
-p is None or isinstance(p, (int, str)) for p in params
+p is not None and isinstance(p, (int, str, float)) for p in params
 )
 self.params = params
 
diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py 
b/python/pyspark/sql/tests/connect/test_connect_basic.py
index fe6c2c65e25..57d2b675065 100644
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -1193,13 +1193,26 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
 self.spark.read.table(self.tbl_name).hint("illegal").toPandas(),
 )
 
+# Hint with all supported parameter values
+such_a_nice_list = ["itworks1", "itworks2", "itworks3"]
+self.assert_eq(
+self.connect.read.table(self.tbl_name).hint("my awesome hint", 
1.2345, 2).toPandas(),
+self.spark.read.table(self.tbl_name).hint("my awesome hint", 
1.2345, 2).toPandas(),
+)
+
 # Hint with unsupported parameter values
 with self.assertRaises(SparkConnectException):
 self.connect.read.table(self.tbl_name).hint("REPARTITION", 
"id+1").toPandas()
 
 # Hint with unsupported parameter types
 with self.assertRaises(TypeError):
-self.connect.read.table(self.tbl_name).hint("REPARTITION", 
1.1).toPandas()
+self.connect.read.table(self.tbl_name).hint("REPARTITION", 
range(5)).toPandas()
+
+# Hint with unsupported parameter types
+with self.assertRaises(TypeError):
+self.connect.read.table(self.tbl_name).hint(
+"my awesome hint", 1.2345, 2, such_a_nice_list, range(6)
+).toPandas()
 
 # Hint with wrong combination
 with self.assertRaises(SparkConnectException):


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41694][CORE] Isolate RocksDB path for Live UI and automatically cleanup when `SparkContext.stop()`

2023-01-04 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new fb5cf5f90c6 [SPARK-41694][CORE] Isolate RocksDB path for Live UI and 
automatically cleanup when `SparkContext.stop()`
fb5cf5f90c6 is described below

commit fb5cf5f90c6fe0860c811c0f7e06b9d8255d1772
Author: yangjie01 
AuthorDate: Wed Jan 4 20:32:43 2023 -0800

[SPARK-41694][CORE] Isolate RocksDB path for Live UI and automatically 
cleanup when `SparkContext.stop()`

### What changes were proposed in this pull request?
This pr brings two fixes:

- Add sub-dir with `spark-ui` prefix under `spark.ui.store.path` for each 
Spark App to ensure that multiple Spark Apps can run normally use the same 
Spark Client with same `spark.ui.store.path` configuration

- Automatically cleanup Live UI data when `SparkContext.stop()`

### Why are the changes needed?
There are 2 issue before this pr:

1.  Multiple Spark Apps can't run normally use the same Spark Client with 
same `spark.ui.store.path` configuration, the following exceptions will occur:

```
org.rocksdb.RocksDBException: While lock file: 
/${baseDir}/listing.rdb/LOCK: Resource temporarily unavailable
```

At the same time, only one Spark App can run normally use RocksDB as the 
Live UI store.

After this pr, each Spark App uses an independent RocksDB directory when  
`spark.ui.store.path` is specified as Live UI store.

2.  `spark.ui.store.path` directory not clean up when `SparkContext.stop()`:
   - The disk space occupied by the `spark.ui.store.path` directory will 
continue to grow.
   - When submitting new App and reusing the `spark.ui.store.path` 
directory, we will see the content related to the previous App, which is a bit 
strange

After this pr, `spark.ui.store.path` directory is is automatically cleaned 
by default when `SparkContext` stop.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add new UTs

Closes #39226 from LuciferYang/SPARK-41694.

Lead-authored-by: yangjie01 
Co-authored-by: YangJie 
Co-authored-by: Gengliang Wang 
Signed-off-by: Gengliang Wang 
---
 .../org/apache/spark/status/AppStatusStore.scala   | 29 +--
 .../spark/status/AutoCleanupLiveUIDirSuite.scala   | 56 ++
 2 files changed, 81 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
index 70fcbfd2d51..6db2fa57833 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
@@ -18,12 +18,14 @@
 package org.apache.spark.status
 
 import java.io.File
+import java.io.IOException
 import java.util.{List => JList}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.HashMap
 
 import org.apache.spark.{JobExecutionStatus, SparkConf, SparkContext}
+import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.Status.LIVE_UI_LOCAL_STORE_DIR
 import org.apache.spark.status.api.v1
 import org.apache.spark.storage.FallbackStorage.FALLBACK_BLOCK_MANAGER_ID
@@ -36,7 +38,8 @@ import org.apache.spark.util.kvstore.KVStore
  */
 private[spark] class AppStatusStore(
 val store: KVStore,
-val listener: Option[AppStatusListener] = None) {
+val listener: Option[AppStatusListener] = None,
+val storePath: Option[File] = None) {
 
   def applicationInfo(): v1.ApplicationInfo = {
 try {
@@ -733,6 +736,11 @@ private[spark] class AppStatusStore(
 
   def close(): Unit = {
 store.close()
+cleanUpStorePath()
+  }
+
+  private def cleanUpStorePath(): Unit = {
+storePath.foreach(Utils.deleteRecursively)
   }
 
   def constructTaskDataList(taskDataWrapperIter: Iterable[TaskDataWrapper]): 
Seq[v1.TaskData] = {
@@ -761,7 +769,7 @@ private[spark] class AppStatusStore(
   }
 }
 
-private[spark] object AppStatusStore {
+private[spark] object AppStatusStore extends Logging {
 
   val CURRENT_VERSION = 2L
 
@@ -771,10 +779,23 @@ private[spark] object AppStatusStore {
   def createLiveStore(
   conf: SparkConf,
   appStatusSource: Option[AppStatusSource] = None): AppStatusStore = {
-val storePath = conf.get(LIVE_UI_LOCAL_STORE_DIR).map(new File(_))
+
+def createStorePath(rootDir: String): Option[File] = {
+  try {
+val localDir = Utils.createDirectory(rootDir, "spark-ui")
+logInfo(s"Created spark ui store directory at $rootDir")
+Some(localDir)
+  } catch {
+case e: IOException =>
+  logError(s"Failed to create spark ui store path in $rootDir.", e)
+  None
+  }
+}
+
+val storePath 

[spark] branch master updated: [SPARK-41825][CONNECT][PYTHON] Enable doctests related to `DataFrame.show`

2023-01-04 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 3c7fb30f61f [SPARK-41825][CONNECT][PYTHON] Enable doctests related to 
`DataFrame.show`
3c7fb30f61f is described below

commit 3c7fb30f61f51532dce8d15600215afd2f2ff019
Author: Ruifeng Zheng 
AuthorDate: Thu Jan 5 12:19:22 2023 +0900

[SPARK-41825][CONNECT][PYTHON] Enable doctests related to `DataFrame.show`

### What changes were proposed in this pull request?
enable a group of doctests

### Why are the changes needed?
for test coverage

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
enabled tests

Closes #39396 from zhengruifeng/connect_fix_41825.

Authored-by: Ruifeng Zheng 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/connect/dataframe.py | 6 +-
 python/pyspark/sql/connect/functions.py | 4 
 2 files changed, 1 insertion(+), 9 deletions(-)

diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index fdb75d377b7..2464441bcf2 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -1427,11 +1427,7 @@ def _test() -> None:
 del pyspark.sql.connect.dataframe.DataFrame.explain.__doc__
 del pyspark.sql.connect.dataframe.DataFrame.hint.__doc__
 
-# TODO(SPARK-41825): Dataframe.show formatting int as double
-del pyspark.sql.connect.dataframe.DataFrame.fillna.__doc__
-del pyspark.sql.connect.dataframe.DataFrameNaFunctions.replace.__doc__
-del pyspark.sql.connect.dataframe.DataFrameNaFunctions.fill.__doc__
-del pyspark.sql.connect.dataframe.DataFrame.replace.__doc__
+# TODO(SPARK-41886): The doctest output has different order
 del pyspark.sql.connect.dataframe.DataFrame.intersect.__doc__
 
 # TODO(SPARK-41625): Support Structured Streaming
diff --git a/python/pyspark/sql/connect/functions.py 
b/python/pyspark/sql/connect/functions.py
index 965a9a5331e..f2603d477cb 100644
--- a/python/pyspark/sql/connect/functions.py
+++ b/python/pyspark/sql/connect/functions.py
@@ -2360,10 +2360,6 @@ def _test() -> None:
 del pyspark.sql.connect.functions.date_trunc.__doc__
 del pyspark.sql.connect.functions.from_utc_timestamp.__doc__
 
-# TODO(SPARK-41825): Dataframe.show formatting int as double
-del pyspark.sql.connect.functions.coalesce.__doc__
-del pyspark.sql.connect.functions.sum_distinct.__doc__
-
 # TODO(SPARK-41834): implement Dataframe.conf
 del pyspark.sql.connect.functions.from_unixtime.__doc__
 del pyspark.sql.connect.functions.timestamp_seconds.__doc__


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41878][CONNECT][TESTS] pyspark.sql.tests.test_dataframe - Add JIRAs or messages for skipped messages

2023-01-04 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 8cbff3d5b6f [SPARK-41878][CONNECT][TESTS] 
pyspark.sql.tests.test_dataframe - Add JIRAs or messages for skipped messages
8cbff3d5b6f is described below

commit 8cbff3d5b6f6a34e551aa42e965a16c3cb41e4c7
Author: Sandeep Singh 
AuthorDate: Thu Jan 5 08:53:08 2023 +0900

[SPARK-41878][CONNECT][TESTS] pyspark.sql.tests.test_dataframe - Add JIRAs 
or messages for skipped messages

### What changes were proposed in this pull request?
This PR enables the reused PySpark tests in Spark Connect that pass now. 
And add JIRAs/ Messages to the skipped ones

### Why are the changes needed?
To make sure on the test coverage.

### Does this PR introduce any user-facing change?
No, test-only.

### How was this patch tested?
Enabling tests

Closes #39382 from techaddict/SPARK-41878.

Authored-by: Sandeep Singh 
Signed-off-by: Hyukjin Kwon 
---
 .../sql/tests/connect/test_parity_dataframe.py | 44 ++
 1 file changed, 36 insertions(+), 8 deletions(-)

diff --git a/python/pyspark/sql/tests/connect/test_parity_dataframe.py 
b/python/pyspark/sql/tests/connect/test_parity_dataframe.py
index ea1eb23fd4f..69f445b69ca 100644
--- a/python/pyspark/sql/tests/connect/test_parity_dataframe.py
+++ b/python/pyspark/sql/tests/connect/test_parity_dataframe.py
@@ -41,154 +41,182 @@ class DataFrameParityTests(DataFrameTestsMixin, 
ReusedSQLTestCase):
 cls._spark.stop()
 del os.environ["SPARK_REMOTE"]
 
+# TODO(SPARK-41612): support Catalog.isCached
 @unittest.skip("Fails in Spark Connect, should enable.")
 def test_cache(self):
 super().test_cache()
 
+# TODO(SPARK-41866): createDataframe support array type
 @unittest.skip("Fails in Spark Connect, should enable.")
 def test_create_dataframe_from_array_of_long(self):
 super().test_create_dataframe_from_array_of_long()
 
+# TODO(SPARK-41868): Support data type Duration(NANOSECOND)
 @unittest.skip("Fails in Spark Connect, should enable.")
 def test_create_dataframe_from_pandas_with_day_time_interval(self):
 super().test_create_dataframe_from_pandas_with_day_time_interval()
 
+# TODO(SPARK-41842): Support data type Timestamp(NANOSECOND, null)
 @unittest.skip("Fails in Spark Connect, should enable.")
 def test_create_dataframe_from_pandas_with_dst(self):
 super().test_create_dataframe_from_pandas_with_dst()
 
+# TODO(SPARK-41842): Support data type Timestamp(NANOSECOND, null)
 @unittest.skip("Fails in Spark Connect, should enable.")
 def test_create_dataframe_from_pandas_with_timestamp(self):
 super().test_create_dataframe_from_pandas_with_timestamp()
 
-@unittest.skip("Fails in Spark Connect, should enable.")
-def test_create_dataframe_required_pandas_not_found(self):
-super().test_create_dataframe_required_pandas_not_found()
-
+# TODO(SPARK-41855): createDataFrame doesn't handle None/NaN properly
 @unittest.skip("Fails in Spark Connect, should enable.")
 def test_create_nan_decimal_dataframe(self):
 super().test_create_nan_decimal_dataframe()
 
+# TODO(SPARK-41869): DataFrame dropDuplicates should throw error on non 
list argument
 @unittest.skip("Fails in Spark Connect, should enable.")
 def test_drop_duplicates(self):
 super().test_drop_duplicates()
 
+# TODO(SPARK-41870): Handle duplicate columns in `createDataFrame`
 @unittest.skip("Fails in Spark Connect, should enable.")
 def test_duplicated_column_names(self):
 super().test_duplicated_column_names()
 
+# TODO(SPARK-41871): DataFrame hint parameter can be a float
 @unittest.skip("Fails in Spark Connect, should enable.")
 def test_extended_hint_types(self):
 super().test_extended_hint_types()
 
+# TODO(SPARK-41872): Fix DataFrame createDataframe handling of None
 @unittest.skip("Fails in Spark Connect, should enable.")
 def test_fillna(self):
 super().test_fillna()
 
+# TODO: comparing types, need to expose connect types
 @unittest.skip("Fails in Spark Connect, should enable.")
 def test_generic_hints(self):
 super().test_generic_hints()
 
+# Spark Connect does not support RDD but the tests depend on them.
 @unittest.skip("Fails in Spark Connect, should enable.")
 def test_help_command(self):
 super().test_help_command()
 
+# Spark Connect throws NotImplementedError tests expects 
IllegalArgumentException
 @unittest.skip("Fails in Spark Connect, should enable.")
 def test_invalid_join_method(self):
 super().test_invalid_join_method()
 
+# TODO(SPARK-41834): Implement SparkSession.conf
 @unittest.skip("Fails 

[spark] branch master updated: [SPARK-41833][SPARK-41881][SPARK-41815][CONNECT][PYTHON] Make `DataFrame.collect` handle None/NaN/Array/Binary porperly

2023-01-04 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 731b89d5914 [SPARK-41833][SPARK-41881][SPARK-41815][CONNECT][PYTHON] 
Make `DataFrame.collect` handle None/NaN/Array/Binary porperly
731b89d5914 is described below

commit 731b89d59143adb8a4ab3d16dd9f0e08c799abf2
Author: Ruifeng Zheng 
AuthorDate: Thu Jan 5 08:52:08 2023 +0900

[SPARK-41833][SPARK-41881][SPARK-41815][CONNECT][PYTHON] Make 
`DataFrame.collect` handle None/NaN/Array/Binary porperly

### What changes were proposed in this pull request?

Existing `DataFrame.collect` directly collect coming Arrow batches into a 
Pandas DataFrame, and then convert each series into a Row, which is problematic 
since it can not correctly handle None/NaN/Arrays/Binary/etc.

This PR refactor `DataFrame.collect` by directly building rows from the raw 
Arrow Table, in order to support:
1, None/NaN values;
2, ArrayType
3, BinaryType

### Why are the changes needed?
To be consistent with PySpark

### Does this PR introduce _any_ user-facing change?
yes

### How was this patch tested?
enabled doctests

Closes #39386 from zhengruifeng/connect_fix_41833.

Authored-by: Ruifeng Zheng 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/connect/client.py| 54 ++---
 python/pyspark/sql/connect/column.py|  2 --
 python/pyspark/sql/connect/dataframe.py | 22 +++---
 python/pyspark/sql/connect/functions.py | 31 +--
 4 files changed, 55 insertions(+), 54 deletions(-)

diff --git a/python/pyspark/sql/connect/client.py 
b/python/pyspark/sql/connect/client.py
index e78c4de0f70..832b5648676 100644
--- a/python/pyspark/sql/connect/client.py
+++ b/python/pyspark/sql/connect/client.py
@@ -21,12 +21,13 @@ import urllib.parse
 import uuid
 from typing import Iterable, Optional, Any, Union, List, Tuple, Dict, 
NoReturn, cast
 
+import pandas as pd
+import pyarrow as pa
+
 import google.protobuf.message
 from grpc_status import rpc_status
 import grpc
-import pandas
 from google.protobuf import text_format
-import pyarrow as pa
 from google.rpc import error_details_pb2
 
 import pyspark.sql.connect.proto as pb2
@@ -406,11 +407,22 @@ class SparkConnectClient(object):
 for x in metrics.metrics
 ]
 
-def to_pandas(self, plan: pb2.Plan) -> "pandas.DataFrame":
+def to_table(self, plan: pb2.Plan) -> "pa.Table":
+logger.info(f"Executing plan {self._proto_to_string(plan)}")
+req = self._execute_plan_request_with_metadata()
+req.plan.CopyFrom(plan)
+table, _ = self._execute_and_fetch(req)
+return table
+
+def to_pandas(self, plan: pb2.Plan) -> "pd.DataFrame":
 logger.info(f"Executing plan {self._proto_to_string(plan)}")
 req = self._execute_plan_request_with_metadata()
 req.plan.CopyFrom(plan)
-return self._execute_and_fetch(req)
+table, metrics = self._execute_and_fetch(req)
+pdf = table.to_pandas()
+if len(metrics) > 0:
+pdf.attrs["metrics"] = metrics
+return pdf
 
 def _proto_schema_to_pyspark_schema(self, schema: pb2.DataType) -> 
DataType:
 return types.proto_schema_to_pyspark_data_type(schema)
@@ -521,10 +533,6 @@ class SparkConnectClient(object):
 except grpc.RpcError as rpc_error:
 self._handle_error(rpc_error)
 
-def _process_batch(self, arrow_batch: pb2.ExecutePlanResponse.ArrowBatch) 
-> "pandas.DataFrame":
-with pa.ipc.open_stream(arrow_batch.data) as rd:
-return rd.read_pandas()
-
 def _execute(self, req: pb2.ExecutePlanRequest) -> None:
 """
 Execute the passed request `req` and drop all results.
@@ -546,12 +554,14 @@ class SparkConnectClient(object):
 except grpc.RpcError as rpc_error:
 self._handle_error(rpc_error)
 
-def _execute_and_fetch(self, req: pb2.ExecutePlanRequest) -> 
"pandas.DataFrame":
+def _execute_and_fetch(
+self, req: pb2.ExecutePlanRequest
+) -> Tuple["pa.Table", List[PlanMetrics]]:
 logger.info("ExecuteAndFetch")
-import pandas as pd
 
 m: Optional[pb2.ExecutePlanResponse.Metrics] = None
-result_dfs = []
+
+batches: List[pa.RecordBatch] = []
 
 try:
 for b in self._stub.ExecutePlan(req, 
metadata=self._builder.metadata()):
@@ -567,25 +577,21 @@ class SparkConnectClient(object):
 f"Received arrow batch rows={b.arrow_batch.row_count} "
 f"size={len(b.arrow_batch.data)}"
 )
-pb = self._process_batch(b.arrow_batch)
-result_dfs.append(pb)
+
+with 

[spark] branch master updated: [SPARK-41846][CONNECT][PYTHON] Enable doctests for window functions

2023-01-04 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new a02bf2bb17e [SPARK-41846][CONNECT][PYTHON] Enable doctests for window 
functions
a02bf2bb17e is described below

commit a02bf2bb17e049eaa7aa26561876560c26396bbd
Author: Ruifeng Zheng 
AuthorDate: Thu Jan 5 08:50:09 2023 +0900

[SPARK-41846][CONNECT][PYTHON] Enable doctests for window functions

### What changes were proposed in this pull request?
 Enable doctests for window functions

### Why are the changes needed?
for test coverage

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
enabled tests

Closes #39392 from zhengruifeng/connect_fix_41846.

Authored-by: Ruifeng Zheng 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/connect/functions.py | 6 --
 1 file changed, 6 deletions(-)

diff --git a/python/pyspark/sql/connect/functions.py 
b/python/pyspark/sql/connect/functions.py
index 05ba52aa377..77c7db2d808 100644
--- a/python/pyspark/sql/connect/functions.py
+++ b/python/pyspark/sql/connect/functions.py
@@ -2418,12 +2418,6 @@ def _test() -> None:
 # TODO(SPARK-41845): Fix count bug
 del pyspark.sql.connect.functions.count.__doc__
 
-# TODO(SPARK-41846): window functions : unresolved columns
-del pyspark.sql.connect.functions.rank.__doc__
-del pyspark.sql.connect.functions.cume_dist.__doc__
-del pyspark.sql.connect.functions.dense_rank.__doc__
-del pyspark.sql.connect.functions.percent_rank.__doc__
-
 # TODO(SPARK-41847): mapfield,structlist invalid type
 del pyspark.sql.connect.functions.element_at.__doc__
 del pyspark.sql.connect.functions.explode.__doc__


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (915e9c67a95 -> c1acbfca491)

2023-01-04 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 915e9c67a95 [SPARK-41677][CORE][SQL][SS][UI] Add Protobuf serializer 
for StreamingQueryProgressWrapper
 add c1acbfca491 [SPARK-41840][CONNECT][PYTHON] Add the missing alias 
`groupby`

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/connect/dataframe.py | 2 ++
 python/pyspark/sql/connect/functions.py | 8 
 2 files changed, 2 insertions(+), 8 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41677][CORE][SQL][SS][UI] Add Protobuf serializer for StreamingQueryProgressWrapper

2023-01-04 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 915e9c67a95 [SPARK-41677][CORE][SQL][SS][UI] Add Protobuf serializer 
for StreamingQueryProgressWrapper
915e9c67a95 is described below

commit 915e9c67a9581a1f66e70321879092d854c9fb3b
Author: yangjie01 
AuthorDate: Wed Jan 4 14:03:58 2023 -0800

[SPARK-41677][CORE][SQL][SS][UI] Add Protobuf serializer for 
StreamingQueryProgressWrapper

### What changes were proposed in this pull request?
Add Protobuf serializer for `StreamingQueryProgressWrapper `

### Why are the changes needed?
Support fast and compact serialization/deserialization for 
`StreamingQueryProgressWrapper ` over RocksDB.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add new UT

Closes #39357 from LuciferYang/SPARK-41677.

Authored-by: yangjie01 
Signed-off-by: Gengliang Wang 
---
 .../apache/spark/status/protobuf/store_types.proto |  51 +++
 .../org.apache.spark.status.protobuf.ProtobufSerDe |   1 +
 .../org/apache/spark/sql/streaming/progress.scala  |   8 +-
 .../ui/StreamingQueryStatusListener.scala  |   2 +-
 .../protobuf/sql/SinkProgressSerializer.scala  |  42 +
 .../protobuf/sql/SourceProgressSerializer.scala|  65 
 .../sql/StateOperatorProgressSerializer.scala  |  75 +
 .../sql/StreamingQueryProgressSerializer.scala |  89 +++
 .../StreamingQueryProgressWrapperSerializer.scala  |  40 +
 .../sql/KVStoreProtobufSerializerSuite.scala   | 170 -
 10 files changed, 537 insertions(+), 6 deletions(-)

diff --git 
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto 
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index 6ba1915dfa1..2a45b5da1d8 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -685,3 +685,54 @@ message ExecutorPeakMetricsDistributions {
   repeated double quantiles = 1;
   repeated ExecutorMetrics executor_metrics = 2;
 }
+
+message StateOperatorProgress {
+  string operator_name = 1;
+  int64 num_rows_total = 2;
+  int64 num_rows_updated = 3;
+  int64 all_updates_time_ms = 4;
+  int64 num_rows_removed = 5;
+  int64 all_removals_time_ms = 6;
+  int64 commit_time_ms = 7;
+  int64 memory_used_bytes = 8;
+  int64 num_rows_dropped_by_watermark = 9;
+  int64 num_shuffle_partitions = 10;
+  int64 num_state_store_instances = 11;
+  map custom_metrics = 12;
+}
+
+message SourceProgress {
+  string description = 1;
+  string start_offset = 2;
+  string end_offset = 3;
+  string latest_offset = 4;
+  int64 num_input_rows = 5;
+  double input_rows_per_second = 6;
+  double processed_rows_per_second = 7;
+  map metrics = 8;
+}
+
+message SinkProgress {
+  string description = 1;
+  int64 num_output_rows = 2;
+  map metrics = 3;
+}
+
+message StreamingQueryProgress {
+  string id = 1;
+  string run_id = 2;
+  string name = 3;
+  string timestamp = 4;
+  int64 batch_id = 5;
+  int64 batch_duration = 6;
+  map duration_ms = 7;
+  map event_time = 8;
+  repeated StateOperatorProgress state_operators = 9;
+  repeated SourceProgress sources = 10;
+  SinkProgress sink = 11;
+  map observed_metrics = 12;
+}
+
+message StreamingQueryProgressWrapper {
+  StreamingQueryProgress progress = 1;
+}
diff --git 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
index 7beff87d7ec..e907d559349 100644
--- 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
+++ 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
@@ -18,3 +18,4 @@
 org.apache.spark.status.protobuf.sql.SQLExecutionUIDataSerializer
 org.apache.spark.status.protobuf.sql.SparkPlanGraphWrapperSerializer
 org.apache.spark.status.protobuf.sql.StreamingQueryDataSerializer
+org.apache.spark.status.protobuf.sql.StreamingQueryProgressWrapperSerializer
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index 3d206e7780c..1b755ed70c6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -40,7 +40,7 @@ import 
org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS
  * Information about updates made to stateful operators in a 
[[StreamingQuery]] during a trigger.
  */
 @Evolving
-class StateOperatorProgress private[sql](
+class StateOperatorProgress 

[spark] branch master updated: [SPARK-41768][CORE] Refactor the definition of enum to follow with the code style

2023-01-04 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 492356d1646 [SPARK-41768][CORE] Refactor the definition of enum to 
follow with the code style
492356d1646 is described below

commit 492356d1646a5e7571dad7e3107a11f765ee810a
Author: panbingkun 
AuthorDate: Wed Jan 4 13:15:04 2023 -0800

[SPARK-41768][CORE] Refactor the definition of enum to follow with the code 
style

### What changes were proposed in this pull request?
The pr aims to refactor the definition of enum in `UI protobuf serializer` 
to follow with the code style.

### Why are the changes needed?
Following code style:
https://developers.google.com/protocol-buffers/docs/style#enums
https://user-images.githubusercontent.com/15246973/209946067-4c541101-be0d-49a6-9812-768ba98423a4.png;>

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass GA
Existed UT.

Closes #39286 from panbingkun/SPARK-41768.

Authored-by: panbingkun 
Signed-off-by: Gengliang Wang 
---
 .../apache/spark/status/protobuf/store_types.proto | 21 ++-
 .../status/protobuf/JobDataWrapperSerializer.scala | 12 ++
 .../protobuf/JobExecutionStatusSerializer.scala| 43 ++
 .../RDDOperationGraphWrapperSerializer.scala   | 35 --
 .../status/protobuf/StageStatusSerializer.scala| 26 +
 .../sql/SQLExecutionUIDataSerializer.scala |  7 ++--
 6 files changed, 110 insertions(+), 34 deletions(-)

diff --git 
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto 
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index 38b82518ddd..6ba1915dfa1 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -27,10 +27,10 @@ package org.apache.spark.status.protobuf;
 
 enum JobExecutionStatus {
   JOB_EXECUTION_STATUS_UNSPECIFIED = 0;
-  RUNNING = 1;
-  SUCCEEDED = 2;
-  FAILED = 3;
-  UNKNOWN = 4;
+  JOB_EXECUTION_STATUS_RUNNING = 1;
+  JOB_EXECUTION_STATUS_SUCCEEDED = 2;
+  JOB_EXECUTION_STATUS_FAILED = 3;
+  JOB_EXECUTION_STATUS_UNKNOWN = 4;
 }
 
 message JobData {
@@ -434,13 +434,14 @@ message RDDOperationEdge {
   int32 to_id = 2;
 }
 
+enum DeterministicLevel {
+  DETERMINISTIC_LEVEL_UNSPECIFIED = 0;
+  DETERMINISTIC_LEVEL_DETERMINATE = 1;
+  DETERMINISTIC_LEVEL_UNORDERED = 2;
+  DETERMINISTIC_LEVEL_INDETERMINATE = 3;
+}
+
 message RDDOperationNode {
-  enum DeterministicLevel {
-UNSPECIFIED = 0;
-DETERMINATE = 1;
-UNORDERED = 2;
-INDETERMINATE = 3;
-  }
   int32 id = 1;
   string name = 2;
   bool cached = 3;
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
index 98ac2d643c9..e2e2a1a8d89 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
@@ -17,10 +17,10 @@
 
 package org.apache.spark.status.protobuf
 
-import collection.JavaConverters._
 import java.util.Date
 
-import org.apache.spark.JobExecutionStatus
+import collection.JavaConverters._
+
 import org.apache.spark.status.JobDataWrapper
 import org.apache.spark.status.api.v1.JobData
 import org.apache.spark.status.protobuf.Utils.getOptional
@@ -55,7 +55,7 @@ class JobDataWrapperSerializer extends ProtobufSerDe {
 val jobDataBuilder = StoreTypes.JobData.newBuilder()
 jobDataBuilder.setJobId(jobData.jobId.toLong)
   .setName(jobData.name)
-  .setStatus(serializeJobExecutionStatus(jobData.status))
+  .setStatus(JobExecutionStatusSerializer.serialize(jobData.status))
   .setNumTasks(jobData.numTasks)
   .setNumActiveTasks(jobData.numActiveTasks)
   .setNumCompletedTasks(jobData.numCompletedTasks)
@@ -89,7 +89,7 @@ class JobDataWrapperSerializer extends ProtobufSerDe {
   getOptional(info.hasSubmissionTime, () => new 
Date(info.getSubmissionTime))
 val completionTime = getOptional(info.hasCompletionTime, () => new 
Date(info.getCompletionTime))
 val jobGroup = getOptional(info.hasJobGroup, info.getJobGroup)
-val status = JobExecutionStatus.valueOf(info.getStatus.toString)
+val status = JobExecutionStatusSerializer.deserialize(info.getStatus)
 
 new JobData(
   jobId = info.getJobId.toInt,
@@ -113,8 +113,4 @@ class JobDataWrapperSerializer extends ProtobufSerDe {
   numFailedStages = info.getNumFailedStages,
   killedTasksSummary = 
info.getKillTasksSummaryMap.asScala.mapValues(_.toInt).toMap)
   }
-
-  private def serializeJobExecutionStatus(j: 

[spark] branch master updated: [SPARK-41573][SQL] Assign name to _LEGACY_ERROR_TEMP_2136

2023-01-04 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f352f103ed5 [SPARK-41573][SQL] Assign name to _LEGACY_ERROR_TEMP_2136
f352f103ed5 is described below

commit f352f103ed512806abb3f642571a0c595b8b0509
Author: itholic 
AuthorDate: Thu Jan 5 00:21:32 2023 +0500

[SPARK-41573][SQL] Assign name to _LEGACY_ERROR_TEMP_2136

### What changes were proposed in this pull request?

This PR proposes to assign name to _LEGACY_ERROR_TEMP_2136, 
"CANNOT_PARSE_JSON_FIELD".

### Why are the changes needed?

We should assign proper name to _LEGACY_ERROR_TEMP_*

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

`./build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite*`

Closes #39284 from itholic/LEGACY_2136.

Authored-by: itholic 
Signed-off-by: Max Gekk 
---
 core/src/main/resources/error/error-classes.json| 10 +-
 .../spark/sql/catalyst/json/JacksonParser.scala |  2 +-
 .../spark/sql/errors/QueryExecutionErrors.scala |  8 
 .../org/apache/spark/sql/JsonFunctionsSuite.scala   | 21 ++---
 4 files changed, 24 insertions(+), 17 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index a7b120ef427..120925f5254 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -75,6 +75,11 @@
 ],
 "sqlState" : "42000"
   },
+  "CANNOT_PARSE_JSON_FIELD" : {
+"message" : [
+  "Cannot parse the field name  and the value  of 
the JSON token type  to target Spark data type "
+]
+  },
   "CANNOT_PARSE_PROTOBUF_DESCRIPTOR" : {
 "message" : [
   "Error parsing file  descriptor byte[] into Descriptor 
object"
@@ -4105,11 +4110,6 @@
   "Failed to parse an empty string for data type "
 ]
   },
-  "_LEGACY_ERROR_TEMP_2136" : {
-"message" : [
-  "Failed to parse field name , field value , 
[] to target spark data type []."
-]
-  },
   "_LEGACY_ERROR_TEMP_2137" : {
 "message" : [
   "Root converter returned null"
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index ee21a1e2b76..3fe26e87499 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -430,7 +430,7 @@ class JacksonParser(
 case token =>
   // We cannot parse this token based on the given data type. So, we throw 
a
   // RuntimeException and this exception will be caught by `parse` method.
-  throw QueryExecutionErrors.failToParseValueForDataTypeError(parser, 
token, dataType)
+  throw QueryExecutionErrors.cannotParseJSONFieldError(parser, token, 
dataType)
   }
 
   /**
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 3e234cfee2c..44a1972272f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -1444,15 +1444,15 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase {
 "dataType" -> dataType.catalogString))
   }
 
-  def failToParseValueForDataTypeError(parser: JsonParser, token: JsonToken, 
dataType: DataType)
+  def cannotParseJSONFieldError(parser: JsonParser, jsonType: JsonToken, 
dataType: DataType)
   : SparkRuntimeException = {
 new SparkRuntimeException(
-  errorClass = "_LEGACY_ERROR_TEMP_2136",
+  errorClass = "CANNOT_PARSE_JSON_FIELD",
   messageParameters = Map(
 "fieldName" -> parser.getCurrentName.toString(),
 "fieldValue" -> parser.getText.toString(),
-"token" -> token.toString(),
-"dataType" -> dataType.toString()))
+"jsonType" -> jsonType.toString(),
+"dataType" -> toSQLType(dataType)))
   }
 
   def rootConverterReturnNullError(): SparkRuntimeException = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index 399665c0de6..0f282336d58 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -22,8 +22,9 @@ import java.time.{Duration, LocalDateTime, Period}
 import java.util.Locale
 
 import collection.JavaConverters._
+import 

[spark] branch master updated (23e3c9b7c2f -> d0a598922e9)

2023-01-04 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 23e3c9b7c2f [SPARK-41828][CONNECT][PYTHON] Make `createDataFrame` 
support empty dataframe
 add d0a598922e9 [MINOR] Fix a typo "from from" -> "from"

No new revisions were added by this update.

Summary of changes:
 dev/ansible-for-test-node/roles/jenkins-worker/README.md | 2 +-
 docs/running-on-yarn.md  | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41828][CONNECT][PYTHON] Make `createDataFrame` support empty dataframe

2023-01-04 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 23e3c9b7c2f [SPARK-41828][CONNECT][PYTHON] Make `createDataFrame` 
support empty dataframe
23e3c9b7c2f is described below

commit 23e3c9b7c2f08c5350992934cf660de6d2793982
Author: Ruifeng Zheng 
AuthorDate: Wed Jan 4 17:45:46 2023 +0900

[SPARK-41828][CONNECT][PYTHON] Make `createDataFrame` support empty 
dataframe

### What changes were proposed in this pull request?
Make `createDataFrame` support empty dataframe:

```
In [24]: spark.createDataFrame([], schema="x STRING, y INTEGER")
Out[24]: DataFrame[x: string, y: int]
```

### Why are the changes needed?
to be consistent with PySpark

### Does this PR introduce _any_ user-facing change?
yes

### How was this patch tested?
added UT and enabled doctests

Closes #39379 from zhengruifeng/connect_fix_41828.

Authored-by: Ruifeng Zheng 
Signed-off-by: Hyukjin Kwon 
---
 .../main/protobuf/spark/connect/relations.proto|  18 ++--
 .../sql/connect/planner/SparkConnectPlanner.scala  |  68 -
 python/pyspark/sql/connect/dataframe.py|   3 -
 python/pyspark/sql/connect/plan.py |  34 ---
 python/pyspark/sql/connect/proto/relations_pb2.py  | 110 ++---
 python/pyspark/sql/connect/proto/relations_pb2.pyi |  41 
 python/pyspark/sql/connect/session.py  |  32 --
 .../sql/tests/connect/test_connect_basic.py|  28 ++
 8 files changed, 193 insertions(+), 141 deletions(-)

diff --git 
a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto 
b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
index 51981714ded..c0f22dd4576 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
@@ -328,20 +328,16 @@ message Deduplicate {
 
 // A relation that does not need to be qualified by name.
 message LocalRelation {
-  // Local collection data serialized into Arrow IPC streaming format which 
contains
+  // (Optional) Local collection data serialized into Arrow IPC streaming 
format which contains
   // the schema of the data.
-  bytes data = 1;
+  optional bytes data = 1;
 
-  // (Optional) The user provided schema.
+  // (Optional) The schema of local data.
+  // It should be either a DDL-formatted type string or a JSON string.
   //
-  // The Sever side will update the column names and data types according to 
this schema.
-  oneof schema {
-
-DataType datatype = 2;
-
-// Server will use Catalyst parser to parse this string to DataType.
-string datatype_str = 3;
-  }
+  // The server side will update the column names and data types according to 
this schema.
+  // If the 'data' is not provided, then this schema will be required.
+  optional string schema = 2;
 }
 
 // Relation of type [[Sample]] that samples a fraction of the dataset.
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 754bb7ced9e..b4c882541e0 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -571,47 +571,61 @@ class SparkConnectPlanner(session: SparkSession) {
 try {
   parser.parseTableSchema(sqlText)
 } catch {
-  case _: ParseException =>
+  case e: ParseException =>
 try {
   parser.parseDataType(sqlText)
 } catch {
   case _: ParseException =>
-parser.parseDataType(s"struct<${sqlText.trim}>")
+try {
+  parser.parseDataType(s"struct<${sqlText.trim}>")
+} catch {
+  case _: ParseException =>
+throw e
+}
 }
 }
   }
 
   private def transformLocalRelation(rel: proto.LocalRelation): LogicalPlan = {
-val (rows, structType) = ArrowConverters.fromBatchWithSchemaIterator(
-  Iterator(rel.getData.toByteArray),
-  TaskContext.get())
-if (structType == null) {
-  throw InvalidPlanInput(s"Input data for LocalRelation does not produce a 
schema.")
+var schema: StructType = null
+if (rel.hasSchema) {
+  val schemaType = DataType.parseTypeWithFallback(
+rel.getSchema,
+parseDatatypeString,
+fallbackParser = DataType.fromJson)
+  schema = schemaType match {
+case s: StructType => s
+case d => StructType(Seq(StructField("value", d)))
+  }
 }
-val