[spark-docker] branch master updated: [SPARK-41258][INFRA] Upgrade docker and actions to cleanup warnning

2022-11-24 Thread yikun
This is an automated email from the ASF dual-hosted git repository.

yikun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark-docker.git


The following commit(s) were added to refs/heads/master by this push:
 new 33abc18  [SPARK-41258][INFRA] Upgrade docker and actions to cleanup 
warnning
33abc18 is described below

commit 33abc1894f3de135e827ce393842ca355229c117
Author: Yikun Jiang 
AuthorDate: Fri Nov 25 14:57:27 2022 +0800

[SPARK-41258][INFRA] Upgrade docker and actions to cleanup warnning

### What changes were proposed in this pull request?
- Upgrade `actions/checkout` from v2 to v3
- Upgrade `docker/build-push-action` from v2 to v3

### Why are the changes needed?
Cleanup set output and lower version node warnning

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Test passed

Closes #24 from Yikun/upgrade-actions.

Authored-by: Yikun Jiang 
Signed-off-by: Yikun Jiang 
---
 .github/workflows/main.yml | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml
index dfb99e9..024b853 100644
--- a/.github/workflows/main.yml
+++ b/.github/workflows/main.yml
@@ -71,7 +71,7 @@ jobs:
 image_suffix: [python3-ubuntu, ubuntu, r-ubuntu, python3-r-ubuntu]
 steps:
   - name: Checkout Spark Docker repository
-uses: actions/checkout@v2
+uses: actions/checkout@v3
 
   - name: Set up QEMU
 uses: docker/setup-qemu-action@v2
@@ -122,7 +122,7 @@ jobs:
   echo "PUBLISH_IMAGE_URL:"${PUBLISH_IMAGE_URL}
 
   - name: Build and push test image
-uses: docker/build-push-action@v2
+uses: docker/build-push-action@v3
 with:
   context: ${{ env.IMAGE_PATH }}
   tags: ${{ env.IMAGE_URL }}
@@ -258,7 +258,7 @@ jobs:
 
   - name: Publish - Push Image
 if: ${{ inputs.publish }}
-uses: docker/build-push-action@v2
+uses: docker/build-push-action@v3
 with:
   context: ${{ env.IMAGE_PATH }}
   push: true


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41181][SQL] Migrate the map options errors onto error classes

2022-11-24 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 0ae82d99d13 [SPARK-41181][SQL] Migrate the map options errors onto 
error classes
0ae82d99d13 is described below

commit 0ae82d99d13988086a297920d45a766115a70578
Author: panbingkun 
AuthorDate: Fri Nov 25 09:03:49 2022 +0300

[SPARK-41181][SQL] Migrate the map options errors onto error classes

### What changes were proposed in this pull request?
The pr aims to migrate the map options errors onto error classes.

### Why are the changes needed?
The changes improve the error framework.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass GA.

Closes #38730 from panbingkun/SPARK-41181.

Authored-by: panbingkun 
Signed-off-by: Max Gekk 
---
 core/src/main/resources/error/error-classes.json   | 27 +
 .../spark/sql/errors/QueryCompilationErrors.scala  |  6 +-
 .../sql-tests/results/csv-functions.sql.out| 13 +++--
 .../sql-tests/results/json-functions.sql.out   | 12 ++--
 .../org/apache/spark/sql/JsonFunctionsSuite.scala  | 66 --
 5 files changed, 81 insertions(+), 43 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index 55a56712554..1246e870e0d 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -735,6 +735,23 @@
   "The  JOIN with LATERAL correlation is not allowed because an 
OUTER subquery cannot correlate to its join partner. Remove the LATERAL 
correlation or use an INNER JOIN, or LEFT OUTER JOIN instead."
 ]
   },
+  "INVALID_OPTIONS" : {
+"message" : [
+  "Invalid options:"
+],
+"subClass" : {
+  "NON_MAP_FUNCTION" : {
+"message" : [
+  "Must use the `map()` function for options."
+]
+  },
+  "NON_STRING_TYPE" : {
+"message" : [
+  "A type of keys and values in `map()` must be string, but got 
."
+]
+  }
+}
+  },
   "INVALID_PANDAS_UDF_PLACEMENT" : {
 "message" : [
   "The group aggregate pandas UDF  cannot be invoked 
together with as other, non-pandas aggregate functions."
@@ -2190,16 +2207,6 @@
   "Schema should be struct type but got ."
 ]
   },
-  "_LEGACY_ERROR_TEMP_1095" : {
-"message" : [
-  "A type of keys and values in map() must be string, but got ."
-]
-  },
-  "_LEGACY_ERROR_TEMP_1096" : {
-"message" : [
-  "Must use a map() function for options."
-]
-  },
   "_LEGACY_ERROR_TEMP_1097" : {
 "message" : [
   "The field for corrupt records must be string type and nullable."
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index fa22c36f841..486bd21b844 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -1013,13 +1013,13 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
 
   def keyValueInMapNotStringError(m: CreateMap): Throwable = {
 new AnalysisException(
-  errorClass = "_LEGACY_ERROR_TEMP_1095",
-  messageParameters = Map("map" -> m.dataType.catalogString))
+  errorClass = "INVALID_OPTIONS.NON_STRING_TYPE",
+  messageParameters = Map("mapType" -> toSQLType(m.dataType)))
   }
 
   def nonMapFunctionNotAllowedError(): Throwable = {
 new AnalysisException(
-  errorClass = "_LEGACY_ERROR_TEMP_1096",
+  errorClass = "INVALID_OPTIONS.NON_MAP_FUNCTION",
   messageParameters = Map.empty)
   }
 
diff --git 
a/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out 
b/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out
index 0b5a63c28e4..200ddd837e1 100644
--- a/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out
@@ -66,7 +66,7 @@ struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "_LEGACY_ERROR_TEMP_1096",
+  "errorClass" : "INVALID_OPTIONS.NON_MAP_FUNCTION",
   "queryContext" : [ {
 "objectType" : "",
 "objectName" : "",
@@ -84,9 +84,9 @@ struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "_LEGACY_ERROR_TEMP_1095",
+  "errorClass" : "INVALID_OPTIONS.NON_STRING_TYPE",
   "messageParameters" : {
-"map" : "map"
+"mapType" : "\"MAP\""
   },
   "queryContext" : [ {
 "objectType" : "",
@@ -222,7 +222,7 @@ struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
 {
-  

[spark] branch master updated: [SPARK-41216][CONNECT][PYTHON] Implement `DataFrame.{isLocal, isStreaming, printSchema, inputFiles}`

2022-11-24 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new b84ddd5e71a [SPARK-41216][CONNECT][PYTHON] Implement 
`DataFrame.{isLocal, isStreaming, printSchema, inputFiles}`
b84ddd5e71a is described below

commit b84ddd5e71acc9ae3facdf47148becef3861d11d
Author: Ruifeng Zheng 
AuthorDate: Fri Nov 25 11:57:33 2022 +0800

[SPARK-41216][CONNECT][PYTHON] Implement `DataFrame.{isLocal, isStreaming, 
printSchema, inputFiles}`

### What changes were proposed in this pull request?
~~1, Make `AnalyzePlan` support specified multiple analysis tasks, that is, 
we can get `isLocal`, `schema`, `semanticHash` together in single RPC if we 
want.~~
2, Implement following APIs

- isLocal
- isStreaming
- printSchema
- ~~semanticHash~~
- ~~sameSemantics~~
- inputFiles

### Why are the changes needed?
for API coverage

### Does this PR introduce _any_ user-facing change?
yes, new APIs

### How was this patch tested?
added UTs

Closes #38742 from zhengruifeng/connect_df_print_schema.

Authored-by: Ruifeng Zheng 
Signed-off-by: Ruifeng Zheng 
---
 .../src/main/protobuf/spark/connect/base.proto | 13 
 .../sql/connect/service/SparkConnectService.scala  | 11 ++-
 .../connect/planner/SparkConnectServiceSuite.scala |  9 +++
 python/pyspark/sql/connect/client.py   | 23 ++-
 python/pyspark/sql/connect/dataframe.py| 78 +-
 python/pyspark/sql/connect/proto/base_pb2.py   | 36 +-
 python/pyspark/sql/connect/proto/base_pb2.pyi  | 36 +-
 .../sql/tests/connect/test_connect_basic.py| 40 +++
 8 files changed, 221 insertions(+), 25 deletions(-)

diff --git a/connector/connect/src/main/protobuf/spark/connect/base.proto 
b/connector/connect/src/main/protobuf/spark/connect/base.proto
index d6dac4854ef..5f9a4411ecd 100644
--- a/connector/connect/src/main/protobuf/spark/connect/base.proto
+++ b/connector/connect/src/main/protobuf/spark/connect/base.proto
@@ -112,6 +112,19 @@ message AnalyzePlanResponse {
 
   // The extended explain string as produced by Spark.
   string explain_string = 3;
+
+  // Get the tree string of the schema.
+  string tree_string = 4;
+
+  // Whether the 'collect' and 'take' methods can be run locally.
+  bool is_local = 5;
+
+  // Whether this plan contains one or more sources that continuously
+  // return data as it arrives.
+  bool is_streaming = 6;
+
+  // A best-effort snapshot of the files that compose this Dataset
+  repeated string input_files = 7;
 }
 
 // A request to be executed by the service.
diff --git 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
index 0c7a2ad2690..3046c8eebfc 100644
--- 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
+++ 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.connect.service
 
 import java.util.concurrent.TimeUnit
 
+import scala.collection.JavaConverters._
+
 import com.google.common.base.Ticker
 import com.google.common.cache.CacheBuilder
 import io.grpc.{Server, Status}
@@ -127,10 +129,13 @@ class SparkConnectService(debug: Boolean)
 val ds = Dataset.ofRows(session, logicalPlan)
 val explainString = ds.queryExecution.explainString(explainMode)
 
-val response = proto.AnalyzePlanResponse
-  .newBuilder()
-  .setExplainString(explainString)
+val response = proto.AnalyzePlanResponse.newBuilder()
 response.setSchema(DataTypeProtoConverter.toConnectProtoType(ds.schema))
+response.setExplainString(explainString)
+response.setTreeString(ds.schema.treeString)
+response.setIsLocal(ds.isLocal)
+response.setIsStreaming(ds.isStreaming)
+response.addAllInputFiles(ds.inputFiles.toSeq.asJava)
   }
 }
 
diff --git 
a/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
 
b/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
index 6ca3c2430c4..e5cd84fb504 100644
--- 
a/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
+++ 
b/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
@@ -65,6 +65,15 @@ class SparkConnectServiceSuite extends SharedSparkSession {
   assert(
 schema.getFields(1).getName == "col2"
   && schema.getFields(1).getDataType.getKindCase == 
proto.DataType.KindCase.STRING)
+
+  assert(!response.getIsLocal)
+  

[spark] branch master updated (a205e97ad9a -> 575b8f00faf)

2022-11-24 Thread yikun
This is an automated email from the ASF dual-hosted git repository.

yikun pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from a205e97ad9a [SPARK-41230][CONNECT][PYTHON] Remove `str` from Aggregate 
expression type
 add 575b8f00faf [SPARK-41257][INFRA] Upgrade actions/labeler to v4

No new revisions were added by this update.

Summary of changes:
 .github/workflows/labeler.yml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41230][CONNECT][PYTHON] Remove `str` from Aggregate expression type

2022-11-24 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new a205e97ad9a [SPARK-41230][CONNECT][PYTHON] Remove `str` from Aggregate 
expression type
a205e97ad9a is described below

commit a205e97ad9ae7894b2ec27e5253da21c4500fc8c
Author: Rui Wang 
AuthorDate: Fri Nov 25 09:32:41 2022 +0800

[SPARK-41230][CONNECT][PYTHON] Remove `str` from Aggregate expression type

### What changes were proposed in this pull request?

This PR proposes that Relations (e.g. Aggregate in this PR) should only 
deal with  `Expression` than `str`. `str` could be mapped to different 
expressions (e.g. sql expression, unresolved_attribute, etc.). Relations are 
not supposed to understand the difference of `str` but DataFrame should 
understand it.

This PR specifically changes for `Aggregate`.

### Why are the changes needed?

Codebase refactoring.

### Does this PR introduce _any_ user-facing change?

 No

### How was this patch tested?

UT

Closes #38768 from amaliujia/SPARK-41230.

Authored-by: Rui Wang 
Signed-off-by: Ruifeng Zheng 
---
 python/pyspark/sql/connect/dataframe.py| 61 --
 python/pyspark/sql/connect/plan.py | 19 ++-
 .../sql/tests/connect/test_connect_basic.py|  7 +++
 3 files changed, 43 insertions(+), 44 deletions(-)

diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index bd374dcf814..e3a7e8c7335 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -37,6 +37,7 @@ from pyspark.sql.connect.column import (
 Expression,
 LiteralExpression,
 SQLExpression,
+ScalarFunctionExpression,
 )
 from pyspark.sql.types import (
 StructType,
@@ -48,25 +49,13 @@ if TYPE_CHECKING:
 from pyspark.sql.connect.client import RemoteSparkSession
 
 
-class GroupingFrame(object):
-
-MeasuresType = Union[Sequence[Tuple["ExpressionOrString", str]], Dict[str, 
str]]
-OptMeasuresType = Optional[MeasuresType]
-
+class GroupedData(object):
 def __init__(self, df: "DataFrame", *grouping_cols: Union[Column, str]) -> 
None:
 self._df = df
 self._grouping_cols = [x if isinstance(x, Column) else df[x] for x in 
grouping_cols]
 
-def agg(self, exprs: Optional[MeasuresType] = None) -> "DataFrame":
-
-# Normalize the dictionary into a list of tuples.
-if isinstance(exprs, Dict):
-measures = list(exprs.items())
-elif isinstance(exprs, List):
-measures = exprs
-else:
-measures = []
-
+def agg(self, measures: Sequence[Expression]) -> "DataFrame":
+assert len(measures) > 0, "exprs should not be empty"
 res = DataFrame.withPlan(
 plan.Aggregate(
 child=self._df._plan,
@@ -77,23 +66,27 @@ class GroupingFrame(object):
 )
 return res
 
-def _map_cols_to_dict(self, fun: str, cols: List[Union[Column, str]]) -> 
Dict[str, str]:
-return {x if isinstance(x, str) else x.name(): fun for x in cols}
+def _map_cols_to_expression(
+self, fun: str, col: Union[Expression, str]
+) -> Sequence[Expression]:
+return [
+ScalarFunctionExpression(fun, Column(col)) if isinstance(col, str) 
else col,
+]
 
-def min(self, *cols: Union[Column, str]) -> "DataFrame":
-expr = self._map_cols_to_dict("min", list(cols))
+def min(self, col: Union[Expression, str]) -> "DataFrame":
+expr = self._map_cols_to_expression("min", col)
 return self.agg(expr)
 
-def max(self, *cols: Union[Column, str]) -> "DataFrame":
-expr = self._map_cols_to_dict("max", list(cols))
+def max(self, col: Union[Expression, str]) -> "DataFrame":
+expr = self._map_cols_to_expression("max", col)
 return self.agg(expr)
 
-def sum(self, *cols: Union[Column, str]) -> "DataFrame":
-expr = self._map_cols_to_dict("sum", list(cols))
+def sum(self, col: Union[Expression, str]) -> "DataFrame":
+expr = self._map_cols_to_expression("sum", col)
 return self.agg(expr)
 
 def count(self) -> "DataFrame":
-return self.agg([(LiteralExpression(1), "count")])
+return self.agg([ScalarFunctionExpression("count", 
LiteralExpression(1))])
 
 
 class DataFrame(object):
@@ -162,8 +155,18 @@ class DataFrame(object):
 
 return DataFrame.withPlan(plan.Project(self._plan, *sql_expr), 
session=self._session)
 
-def agg(self, exprs: Optional[GroupingFrame.MeasuresType]) -> "DataFrame":
-return self.groupBy().agg(exprs)
+def agg(self, *exprs: Union[Expression, Dict[str, str]]) -> "DataFrame":
+if not exprs:
+raise 

[spark] branch master updated: [SPARK-41238][CONNECT][PYTHON] Support more built-in datatypes

2022-11-24 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 4006d195111 [SPARK-41238][CONNECT][PYTHON] Support more built-in 
datatypes
4006d195111 is described below

commit 4006d195111334b4b795680e547dea9dd0acda22
Author: Ruifeng Zheng 
AuthorDate: Fri Nov 25 09:26:25 2022 +0800

[SPARK-41238][CONNECT][PYTHON] Support more built-in datatypes

### What changes were proposed in this pull request?
1, in the sever side, make `proto_datatype` <-> `catalyst_datatype` 
conversion support all the built-in sql datatypes;

2, in the client side, make `proto_datatype` <-> 
`pyspark_catalyst_datatype` conversion support [all the datatypes that are 
supported in pyspark 
now.](https://github.com/apache/spark/blob/master/python/pyspark/sql/types.py#L60-L83)

### Why are the changes needed?
right now, only `long`, `string`, `struct` are supported

```
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated 
with:
status = StatusCode.UNKNOWN
details = "Does not support convert float to connect proto types."
debug_error_string = 
"{"created":"1669206685.760099000","description":"Error received from peer 
ipv6:[::1]:15002","file":"src/core/lib/surface/call.cc","file_line":1064,"grpc_message":"Does
 not support convert float to connect proto types.","grpc_status":2}"
```

this PR make the schema and literal expr support more datatypes.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
added UT

Closes #38770 from zhengruifeng/connect_support_more_datatypes.

Authored-by: Ruifeng Zheng 
Signed-off-by: Ruifeng Zheng 
---
 .../main/protobuf/spark/connect/expressions.proto  |   2 +-
 .../src/main/protobuf/spark/connect/types.proto| 123 +++--
 .../org/apache/spark/sql/connect/dsl/package.scala |   5 +-
 .../connect/planner/DataTypeProtoConverter.scala   | 281 ++--
 .../connect/planner/SparkConnectServiceSuite.scala |   4 +-
 python/pyspark/sql/connect/client.py   | 108 -
 .../pyspark/sql/connect/proto/expressions_pb2.py   |  66 +--
 .../pyspark/sql/connect/proto/expressions_pb2.pyi  |  16 +-
 python/pyspark/sql/connect/proto/types_pb2.py  | 261 ++-
 python/pyspark/sql/connect/proto/types_pb2.pyi | 507 +
 .../sql/tests/connect/test_connect_basic.py|  67 +++
 11 files changed, 972 insertions(+), 468 deletions(-)

diff --git 
a/connector/connect/src/main/protobuf/spark/connect/expressions.proto 
b/connector/connect/src/main/protobuf/spark/connect/expressions.proto
index ac5fe24d349..7ff06aeb196 100644
--- a/connector/connect/src/main/protobuf/spark/connect/expressions.proto
+++ b/connector/connect/src/main/protobuf/spark/connect/expressions.proto
@@ -68,7 +68,7 @@ message Expression {
   bytes uuid = 28;
   DataType null = 29; // a typed null literal
   List list = 30;
-  DataType.List empty_list = 31;
+  DataType.Array empty_array = 31;
   DataType.Map empty_map = 32;
   UserDefined user_defined = 33;
 }
diff --git a/connector/connect/src/main/protobuf/spark/connect/types.proto 
b/connector/connect/src/main/protobuf/spark/connect/types.proto
index ad043d85947..56dbf28665e 100644
--- a/connector/connect/src/main/protobuf/spark/connect/types.proto
+++ b/connector/connect/src/main/protobuf/spark/connect/types.proto
@@ -26,31 +26,46 @@ option java_package = "org.apache.spark.connect.proto";
 // itself but only describes it.
 message DataType {
   oneof kind {
-Boolean bool = 1;
-I8 i8 = 2;
-I16 i16 = 3;
-I32 i32 = 5;
-I64 i64 = 7;
-FP32 fp32 = 10;
-FP64 fp64 = 11;
-String string = 12;
-Binary binary = 13;
-Timestamp timestamp = 14;
-Date date = 16;
-Time time = 17;
-IntervalYear interval_year = 19;
-IntervalDay interval_day = 20;
-TimestampTZ timestamp_tz = 29;
-UUID uuid = 32;
-
-FixedChar fixed_char = 21;
-VarChar varchar = 22;
-FixedBinary fixed_binary = 23;
-Decimal decimal = 24;
-
-Struct struct = 25;
-List list = 27;
-Map map = 28;
+NULL null = 1;
+
+Binary binary = 2;
+
+Boolean boolean = 3;
+
+// Numeric types
+Byte byte = 4;
+Short short = 5;
+Integer integer = 6;
+Long long = 7;
+
+Float float = 8;
+Double double = 9;
+Decimal decimal = 10;
+
+// String types
+String string = 11;
+Char char = 12;
+VarChar var_char = 13;
+
+// Datatime types
+Date date = 14;
+Timestamp timestamp = 15;
+TimestampNTZ timestamp_ntz = 16;
+
+// Interval types
+CalendarInterval calendar_interval = 17;
+YearMonthInterval year_month_interval = 18;
+DayTimeInterval day_time_interval 

[spark] branch master updated (033dbe604bc -> 71b5c5bde75)

2022-11-24 Thread yikun
This is an automated email from the ASF dual-hosted git repository.

yikun pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 033dbe604bc [SPARK-41247][BUILD] Unify the Protobuf versions in Spark 
connect and Protobuf connector
 add 71b5c5bde75 [SPARK-41251][PS][INFRA] Upgrade pandas from 1.5.1 to 1.5.2

No new revisions were added by this update.

Summary of changes:
 dev/infra/Dockerfile   | 4 ++--
 python/pyspark/pandas/supported_api_gen.py | 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (483e3c93ddb -> 033dbe604bc)

2022-11-24 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 483e3c93ddb [SPARK-41097][CORE][SQL][SS][PROTOBUF] Remove redundant 
collection conversion base on Scala 2.13 code
 add 033dbe604bc [SPARK-41247][BUILD] Unify the Protobuf versions in Spark 
connect and Protobuf connector

No new revisions were added by this update.

Summary of changes:
 connector/connect/pom.xml  | 1 -
 connector/protobuf/pom.xml | 1 -
 pom.xml| 7 +--
 3 files changed, 5 insertions(+), 4 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (ac029d6ec0f -> 483e3c93ddb)

2022-11-24 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from ac029d6ec0f [SPARK-41224][SPARK-41165][SPARK-41184] Optimized 
Arrow-based collect implementation to stream from server to client
 add 483e3c93ddb [SPARK-41097][CORE][SQL][SS][PROTOBUF] Remove redundant 
collection conversion base on Scala 2.13 code

No new revisions were added by this update.

Summary of changes:
 .../org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala  | 2 +-
 .../org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala   | 2 +-
 .../org/apache/spark/sql/protobuf/utils/SchemaConverters.scala  | 2 +-
 .../src/main/scala/org/apache/spark/ExecutorAllocationManager.scala | 2 +-
 core/src/main/scala/org/apache/spark/status/api/v1/api.scala| 2 +-
 .../src/main/scala/org/apache/spark/sql/types/StructType.scala  | 2 +-
 sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala  | 2 +-
 .../apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala  | 2 +-
 .../apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala | 6 +++---
 .../main/scala/org/apache/spark/sql/execution/command/tables.scala  | 2 +-
 .../spark/sql/execution/datasources/v2/ShowFunctionsExec.scala  | 2 +-
 .../sql/execution/datasources/v2/ShowTablePropertiesExec.scala  | 2 +-
 .../scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala | 4 ++--
 13 files changed, 16 insertions(+), 16 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41224][SPARK-41165][SPARK-41184] Optimized Arrow-based collect implementation to stream from server to client

2022-11-24 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new ac029d6ec0f [SPARK-41224][SPARK-41165][SPARK-41184] Optimized 
Arrow-based collect implementation to stream from server to client
ac029d6ec0f is described below

commit ac029d6ec0fc4d9d80c3ef098954ced6918fa80f
Author: Hyukjin Kwon 
AuthorDate: Thu Nov 24 09:28:30 2022 -0400

[SPARK-41224][SPARK-41165][SPARK-41184] Optimized Arrow-based collect 
implementation to stream from server to client

### What changes were proposed in this pull request?

This PR proposes an optimized Arrow-based collect, that is virtually 
https://github.com/apache/spark/pull/38720 that implements the logics except a 
couple of nits.

### Why are the changes needed?

To stream the Arrow batch from the server to the client side instead of 
waiting all the jobs to finish.

### Does this PR introduce _any_ user-facing change?

No, this feature isn't released yet.

### How was this patch tested?

Unittest added.

Closes #38720

Closes #38759 from HyukjinKwon/SPARK-41165-followup.

Authored-by: Hyukjin Kwon 
Signed-off-by: Herman van Hovell 
---
 .../service/SparkConnectStreamHandler.scala| 87 ---
 .../connect/planner/SparkConnectServiceSuite.scala | 99 +++---
 2 files changed, 163 insertions(+), 23 deletions(-)

diff --git 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
index 092bdd00dc1..b5d100e894d 100644
--- 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
+++ 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.connect.service
 
 import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
 
 import com.google.protobuf.ByteString
 import io.grpc.stub.StreamObserver
@@ -32,6 +33,7 @@ import org.apache.spark.sql.execution.{SparkPlan, 
SQLExecution}
 import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, 
AdaptiveSparkPlanHelper, QueryStageExec}
 import org.apache.spark.sql.execution.arrow.ArrowConverters
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.ThreadUtils
 
 class SparkConnectStreamHandler(responseObserver: 
StreamObserver[ExecutePlanResponse])
 extends Logging {
@@ -71,20 +73,83 @@ class SparkConnectStreamHandler(responseObserver: 
StreamObserver[ExecutePlanResp
   var numSent = 0
 
   if (numPartitions > 0) {
+type Batch = (Array[Byte], Long)
+
 val batches = rows.mapPartitionsInternal(
   SparkConnectStreamHandler
 .rowToArrowConverter(schema, maxRecordsPerBatch, maxBatchSize, 
timeZoneId))
 
-batches.collect().foreach { case (bytes, count) =>
-  val response = 
proto.ExecutePlanResponse.newBuilder().setClientId(clientId)
-  val batch = proto.ExecutePlanResponse.ArrowBatch
-.newBuilder()
-.setRowCount(count)
-.setData(ByteString.copyFrom(bytes))
-.build()
-  response.setArrowBatch(batch)
-  responseObserver.onNext(response.build())
-  numSent += 1
+val signal = new Object
+val partitions = new Array[Array[Batch]](numPartitions)
+var error: Option[Throwable] = None
+
+// This callback is executed by the DAGScheduler thread.
+// After fetching a partition, it inserts the partition into the Map, 
and then
+// wakes up the main thread.
+val resultHandler = (partitionId: Int, partition: Array[Batch]) => {
+  signal.synchronized {
+partitions(partitionId) = partition
+signal.notify()
+  }
+  ()
+}
+
+val future = spark.sparkContext.submitJob(
+  rdd = batches,
+  processPartition = (iter: Iterator[Batch]) => iter.toArray,
+  partitions = Seq.range(0, numPartitions),
+  resultHandler = resultHandler,
+  resultFunc = () => ())
+
+// Collect errors and propagate them to the main thread.
+future.onComplete { result =>
+  result.failed.foreach { throwable =>
+signal.synchronized {
+  error = Some(throwable)
+  signal.notify()
+}
+  }
+}(ThreadUtils.sameThread)
+
+// The main thread will wait until 0-th partition is available,
+// then send it to client and wait for the next partition.
+// Different from the implementation of 
[[Dataset#collectAsArrowToPython]], it sends
+ 

[spark] branch master updated: [SPARK-41249][SS][TEST] Add acceptance test for self-union on streaming query

2022-11-24 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 3cd7f1166fc [SPARK-41249][SS][TEST] Add acceptance test for self-union 
on streaming query
3cd7f1166fc is described below

commit 3cd7f1166fc949abd79d3fb430e855b2925b038c
Author: Jungtaek Lim 
AuthorDate: Thu Nov 24 20:27:18 2022 +0900

[SPARK-41249][SS][TEST] Add acceptance test for self-union on streaming 
query

### What changes were proposed in this pull request?

This PR proposes to add a new test suite specifically for self-union tests 
on streaming query. The test cases are acceptance tests for 4 different cases, 
DSv1 vs DSv2 / DataStreamReader API vs table API.

### Why are the changes needed?

This PR brings more test coverage on streaming workloads. We should have 
caught an issue during the work of 
[SPARK-39564](https://issues.apache.org/jira/browse/SPARK-39564) if we had this 
test suite.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New test suite.

Closes #38785 from HeartSaVioR/SPARK-41249.

Authored-by: Jungtaek Lim 
Signed-off-by: Jungtaek Lim 
---
 .../sql/streaming/StreamingSelfUnionSuite.scala| 160 +
 1 file changed, 160 insertions(+)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala
new file mode 100644
index 000..8f099c31e6b
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.scalatest.BeforeAndAfter
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
+
+import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.streaming.test.{InMemoryStreamTable, 
InMemoryStreamTableCatalog}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.sql.types.{LongType, StructField, StructType}
+
+class StreamingSelfUnionSuite extends StreamTest with BeforeAndAfter {
+
+  import testImplicits._
+  import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+
+  before {
+spark.conf.set("spark.sql.catalog.teststream", 
classOf[InMemoryStreamTableCatalog].getName)
+  }
+
+  after {
+spark.sessionState.catalogManager.reset()
+spark.sessionState.conf.clear()
+sqlContext.streams.active.foreach(_.stop())
+  }
+
+  test("self-union, DSv1, read via DataStreamReader API") {
+withTempPath { dir =>
+  val dataLocation = dir.getAbsolutePath
+  spark.range(1, 4).write.format("parquet").save(dataLocation)
+
+  val streamDf = spark.readStream.format("parquet")
+.schema(StructType(Seq(StructField("id", 
LongType.load(dataLocation)
+  val unionedDf = streamDf.union(streamDf)
+
+  testStream(unionedDf)(
+ProcessAllAvailable(),
+CheckLastBatch(1, 2, 3, 1, 2, 3),
+AssertOnQuery { q =>
+  val lastProgress = getLastProgressWithData(q)
+  assert(lastProgress.nonEmpty)
+  assert(lastProgress.get.numInputRows == 6)
+  assert(lastProgress.get.sources.length == 1)
+  assert(lastProgress.get.sources(0).numInputRows == 6)
+  true
+}
+  )
+}
+  }
+
+  test("self-union, DSv1, read via table API") {
+withTable("parquet_streaming_tbl") {
+  spark.sql("CREATE TABLE parquet_streaming_tbl (key integer) USING 
parquet")
+
+  val streamDf = spark.readStream.table("parquet_streaming_tbl")
+  val unionedDf = streamDf.union(streamDf)
+
+  val clock = new StreamManualClock()
+  testStream(unionedDf)(
+StartStream(triggerClock = clock, trigger = 
Trigger.ProcessingTime(100)),
+Execute { _ =>
+  spark.range(1, 

[spark] branch master updated (1f90e416314 -> 02e7fadc553)

2022-11-24 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 1f90e416314 [SPARK-41182][SQL] Assign a name to the error class 
_LEGACY_ERROR_TEMP_1102
 add 02e7fadc553 [SPARK-41250][CONNECT][PYTHON] DataFrame. toPandas should 
not return optional pandas dataframe

No new revisions were added by this update.

Summary of changes:
 .../src/main/protobuf/spark/connect/base.proto |  2 +
 python/pyspark/sql/connect/client.py   | 51 +++---
 python/pyspark/sql/connect/dataframe.py|  2 +-
 python/pyspark/sql/connect/proto/base_pb2_grpc.py  |  5 ++-
 4 files changed, 33 insertions(+), 27 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (074444bd71f -> 1f90e416314)

2022-11-24 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 07bd71f [SPARK-41179][SQL] Assign a name to the error class 
_LEGACY_ERROR_TEMP_1092
 add 1f90e416314 [SPARK-41182][SQL] Assign a name to the error class 
_LEGACY_ERROR_TEMP_1102

No new revisions were added by this update.

Summary of changes:
 core/src/main/resources/error/error-classes.json   | 10 ++--
 .../spark/sql/errors/QueryCompilationErrors.scala  |  6 +--
 .../resources/sql-tests/results/extract.sql.out| 56 +++---
 3 files changed, 35 insertions(+), 37 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org