(spark) branch master updated: [SPARK-47324][SQL] Add missing timestamp conversion for JDBC nested types

2024-03-10 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 72a95bcad7f1 [SPARK-47324][SQL] Add missing timestamp conversion for 
JDBC nested types
72a95bcad7f1 is described below

commit 72a95bcad7f1906c97fb0971ed6338374ec3009d
Author: Kent Yao 
AuthorDate: Mon Mar 11 09:34:12 2024 +0900

[SPARK-47324][SQL] Add missing timestamp conversion for JDBC nested types

### What changes were proposed in this pull request?

[SPARK-44280](https://issues.apache.org/jira/browse/SPARK-44280) added a 
new API convertJavaTimestampToTimestamp which is called only for plain 
timestamps.

This PR makes it work for timestamps in arrays

### Why are the changes needed?

data consistency/correctness

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?

new tests

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #45435 from yaooqinn/SPARK-47324.

Authored-by: Kent Yao 
Signed-off-by: Hyukjin Kwon 
---
 .../spark/sql/jdbc/PostgresIntegrationSuite.scala  | 17 +---
 .../sql/execution/datasources/jdbc/JdbcUtils.scala | 46 +-
 2 files changed, 29 insertions(+), 34 deletions(-)

diff --git 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
index 2d1c0314f27b..04e31679f386 100644
--- 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
+++ 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
@@ -23,8 +23,7 @@ import java.text.SimpleDateFormat
 import java.time.{LocalDateTime, ZoneOffset}
 import java.util.Properties
 
-import org.apache.spark.sql.Column
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{Column, Row}
 import org.apache.spark.sql.catalyst.expressions.Literal
 import org.apache.spark.sql.types.{ArrayType, DecimalType, FloatType, 
ShortType}
 import org.apache.spark.tags.DockerTest
@@ -149,9 +148,12 @@ class PostgresIntegrationSuite extends 
DockerJDBCIntegrationSuite {
   |('2013-04-05 18:01:02.123456')""".stripMargin).executeUpdate()
 
 conn.prepareStatement("CREATE TABLE infinity_timestamp" +
-  "(id SERIAL PRIMARY KEY, timestamp_column TIMESTAMP);").executeUpdate()
-conn.prepareStatement("INSERT INTO infinity_timestamp (timestamp_column)" +
-  " VALUES ('infinity'), ('-infinity');").executeUpdate()
+  "(id SERIAL PRIMARY KEY, timestamp_column TIMESTAMP, timestamp_array 
TIMESTAMP[])")
+  .executeUpdate()
+conn.prepareStatement("INSERT INTO infinity_timestamp (timestamp_column, 
timestamp_array)" +
+  " VALUES ('infinity', ARRAY[TIMESTAMP 'infinity']), " +
+"('-infinity', ARRAY[TIMESTAMP '-infinity'])")
+  .executeUpdate()
 
 conn.prepareStatement("CREATE DOMAIN not_null_text AS TEXT DEFAULT 
''").executeUpdate()
 conn.prepareStatement("create table custom_type(type_array 
not_null_text[]," +
@@ -447,10 +449,13 @@ class PostgresIntegrationSuite extends 
DockerJDBCIntegrationSuite {
 assert(row.length == 2)
 val infinity = row(0).getAs[Timestamp]("timestamp_column")
 val negativeInfinity = row(1).getAs[Timestamp]("timestamp_column")
+val infinitySeq = 
row(0).getAs[scala.collection.Seq[Timestamp]]("timestamp_array")
+val negativeInfinitySeq = 
row(1).getAs[scala.collection.Seq[Timestamp]]("timestamp_array")
 val minTimeStamp = LocalDateTime.of(1, 1, 1, 0, 0, 
0).toEpochSecond(ZoneOffset.UTC)
 val maxTimestamp = LocalDateTime.of(, 12, 31, 23, 59, 
59).toEpochSecond(ZoneOffset.UTC)
-
 assert(infinity.getTime == maxTimestamp)
 assert(negativeInfinity.getTime == minTimeStamp)
+assert(infinitySeq.head.getTime == maxTimestamp)
+assert(negativeInfinitySeq.head.getTime == minTimeStamp)
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index b5e78ba32cd5..a7bbb832a839 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
-import java.sql.{Connection, JDBCType, PreparedStatement, ResultSet, 
ResultSetMetaData, SQLException}
+import java.math.{BigDecimal => JBigDecimal}
+import java.sql.{Connection, Date, JDBCType, PreparedStatement, ResultSet, 
ResultSetMetaData, SQLException, 

(spark) branch master updated: [SPARK-47331][SS] Serialization using case classes/primitives/POJO based on SQL encoder for Arbitrary State API v2

2024-03-10 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new afbebfbadc4b [SPARK-47331][SS] Serialization using case 
classes/primitives/POJO based on SQL encoder for Arbitrary State API v2
afbebfbadc4b is described below

commit afbebfbadc4b5e927df7c568a8afb08fc4407f58
Author: jingz-db 
AuthorDate: Mon Mar 11 09:20:44 2024 +0900

[SPARK-47331][SS] Serialization using case classes/primitives/POJO based on 
SQL encoder for Arbitrary State API v2

### What changes were proposed in this pull request?

In the new operator for arbitrary state-v2, we cannot rely on the 
session/encoder being available since the initialization for the various state 
instances happens on the executors. Hence, for the state serialization, we 
propose to let user explicitly pass in encoder for state variable and serialize 
primitives/case classes/POJO with SQL encoder. Leveraging SQL encoder can speed 
up the serialization.

### Why are the changes needed?

These changes are needed for providing a dedicated serializer for state-v2.
The changes are part of the work around adding new stateful streaming 
operator for arbitrary state mgmt that provides a bunch of new features listed 
in the SPIP JIRA here - https://issues.apache.org/jira/browse/SPARK-45939

### Does this PR introduce _any_ user-facing change?

Users will need to specify the SQL encoder for their state variable:
`def getValueState[T](stateName: String, valEncoder: Encoder[T]): 
ValueState[T]`
`def getListState[T](stateName: String, valEncoder: Encoder[T]): 
ListState[T]`

For primitive type, Encoder is something as: `Encoders.scalaLong`; for case 
class, `Encoders.product[CaseClass]`; for POJO, 
`Encoders.bean(classOf[POJOClass])`

### How was this patch tested?

Unit tests for primitives, case classes, POJO separately in 
`ValueStateSuite`

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #45447 from jingz-db/sql-encoder-state-v2.

Authored-by: jingz-db 
Signed-off-by: Jungtaek Lim 
---
 .../sql/streaming/StatefulProcessorHandle.scala|   7 +-
 .../sql/execution/streaming/ListStateImpl.scala|   8 +-
 .../streaming/StateTypesEncoderUtils.scala |  41 +---
 .../streaming/StatefulProcessorHandleImpl.scala|   9 +-
 .../sql/execution/streaming/ValueStateImpl.scala   |   9 +-
 .../execution/streaming/state/POJOTestClass.java   |  78 ++
 .../streaming/state/ValueStateSuite.scala  | 117 -
 .../streaming/TransformWithListStateSuite.scala|   7 +-
 .../sql/streaming/TransformWithStateSuite.scala|  11 +-
 9 files changed, 250 insertions(+), 37 deletions(-)

diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala
 
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala
index 5d3390f80f6d..86bf1e85f90c 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.streaming
 import java.io.Serializable
 
 import org.apache.spark.annotation.{Evolving, Experimental}
+import org.apache.spark.sql.Encoder
 
 /**
  * Represents the operation handle provided to the stateful processor used in 
the
@@ -33,20 +34,22 @@ private[sql] trait StatefulProcessorHandle extends 
Serializable {
* The user must ensure to call this function only within the `init()` 
method of the
* StatefulProcessor.
* @param stateName - name of the state variable
+   * @param valEncoder - SQL encoder for state variable
* @tparam T - type of state variable
* @return - instance of ValueState of type T that can be used to store 
state persistently
*/
-  def getValueState[T](stateName: String): ValueState[T]
+  def getValueState[T](stateName: String, valEncoder: Encoder[T]): 
ValueState[T]
 
   /**
* Creates new or returns existing list state associated with stateName.
* The ListState persists values of type T.
*
* @param stateName  - name of the state variable
+   * @param valEncoder - SQL encoder for state variable
* @tparam T - type of state variable
* @return - instance of ListState of type T that can be used to store state 
persistently
*/
-  def getListState[T](stateName: String): ListState[T]
+  def getListState[T](stateName: String, valEncoder: Encoder[T]): ListState[T]
 
   /** Function to return queryInfo for currently running task */
   def getQueryInfo(): QueryInfo
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImpl.scala
 

(spark) branch master updated (f6df78154bac -> 5ac560c76e60)

2024-03-10 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from f6df78154bac [SPARK-47035][SS][CONNECT][FOLLOWUP] Additional protobuf 
change to client side listener
 add 5ac560c76e60 [MINOR][DOCS][SQL] Fix doc comment for 
coalescePartitions.parallelismFirst

No new revisions were added by this update.

Summary of changes:
 sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-47035][SS][CONNECT][FOLLOWUP] Additional protobuf change to client side listener

2024-03-10 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f6df78154bac [SPARK-47035][SS][CONNECT][FOLLOWUP] Additional protobuf 
change to client side listener
f6df78154bac is described below

commit f6df78154bac826bd51d2aad185ce02a7efd36b6
Author: Wei Liu 
AuthorDate: Mon Mar 11 09:07:35 2024 +0900

[SPARK-47035][SS][CONNECT][FOLLOWUP] Additional protobuf change to client 
side listener

### What changes were proposed in this pull request?

Followup of previous protocol change 
https://github.com/apache/spark/pull/45091. Add the request proto `Command` and 
response proto message to `ExecutePlanResponse`

### Why are the changes needed?

Continuation of client side listener for spark connect

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Will be tested in subsequent PR, the proto change itself doesn't do any harm

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #45444 from WweiL/SPARK-47035-protocol-followup.

Authored-by: Wei Liu 
Signed-off-by: Hyukjin Kwon 
---
 .../src/main/protobuf/spark/connect/base.proto |   3 +
 .../src/main/protobuf/spark/connect/commands.proto |   2 +
 python/pyspark/sql/connect/proto/base_pb2.py   | 204 ++---
 python/pyspark/sql/connect/proto/base_pb2.pyi  |  13 ++
 python/pyspark/sql/connect/proto/commands_pb2.py   | 180 +-
 python/pyspark/sql/connect/proto/commands_pb2.pyi  |  40 +++-
 6 files changed, 249 insertions(+), 193 deletions(-)

diff --git 
a/connector/connect/common/src/main/protobuf/spark/connect/base.proto 
b/connector/connect/common/src/main/protobuf/spark/connect/base.proto
index f24ca0a8fc3b..cb9dbe62c193 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/base.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/base.proto
@@ -357,6 +357,9 @@ message ExecutePlanResponse {
 // Response for commands on the streaming query manager.
 StreamingQueryManagerCommandResult streaming_query_manager_command_result 
= 11;
 
+// Response for commands on the client side streaming query listener.
+StreamingQueryListenerEventsResult streaming_query_listener_events_result 
= 16;
+
 // Response type informing if the stream is complete in reattachable 
execution.
 ResultComplete result_complete = 14;
 
diff --git 
a/connector/connect/common/src/main/protobuf/spark/connect/commands.proto 
b/connector/connect/common/src/main/protobuf/spark/connect/commands.proto
index e845d5f29061..76ac106b1de8 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/commands.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/commands.proto
@@ -42,6 +42,7 @@ message Command {
 GetResourcesCommand get_resources_command = 8;
 StreamingQueryManagerCommand streaming_query_manager_command = 9;
 CommonInlineUserDefinedTableFunction register_table_function = 10;
+StreamingQueryListenerBusCommand streaming_query_listener_bus_command = 11;
 
 // This field is used to mark extensions to the protocol. When plugins 
generate arbitrary
 // Commands they can add them here. During the planning the correct 
resolution is done.
@@ -456,6 +457,7 @@ message StreamingQueryListenerEvent {
 
 message StreamingQueryListenerEventsResult {
   repeated StreamingQueryListenerEvent events = 1;
+  optional bool listener_bus_listener_added = 2;
 }
 
 // Command to get the output of 'SparkContext.resources'
diff --git a/python/pyspark/sql/connect/proto/base_pb2.py 
b/python/pyspark/sql/connect/proto/base_pb2.py
index 8326ce511d56..1941900ae69d 100644
--- a/python/pyspark/sql/connect/proto/base_pb2.py
+++ b/python/pyspark/sql/connect/proto/base_pb2.py
@@ -37,7 +37,7 @@ from pyspark.sql.connect.proto import types_pb2 as 
spark_dot_connect_dot_types__
 
 
 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-
b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01
 
\x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02
 
\x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"z\n\x0bUserContext\x12\x17
 [...]
+

(spark) branch master updated: [SPARK-47334][SQL] Make `withColumnRenamed` reuse the implementation of `withColumnsRenamed`

2024-03-10 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new a4603f134fb0 [SPARK-47334][SQL] Make `withColumnRenamed` reuse the 
implementation of `withColumnsRenamed`
a4603f134fb0 is described below

commit a4603f134fb0d496109d4c90889191c506e82691
Author: Ruifeng Zheng 
AuthorDate: Mon Mar 11 09:06:20 2024 +0900

[SPARK-47334][SQL] Make `withColumnRenamed` reuse the implementation of 
`withColumnsRenamed`

### What changes were proposed in this pull request?
Make `withColumnRenamed` reuse the implementation of `withColumnsRenamed`

### Why are the changes needed?
to avoid any divergence in the future

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
ci

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #45450 from zhengruifeng/with_rename_consistent.

Authored-by: Ruifeng Zheng 
Signed-off-by: Hyukjin Kwon 
---
 .../main/scala/org/apache/spark/sql/Dataset.scala  | 27 --
 1 file changed, 9 insertions(+), 18 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index f3bf6119659d..f0c9f7ae53fc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2876,23 +2876,8 @@ class Dataset[T] private[sql](
* @group untypedrel
* @since 2.0.0
*/
-  def withColumnRenamed(existingName: String, newName: String): DataFrame = {
-val resolver = sparkSession.sessionState.analyzer.resolver
-val output = queryExecution.analyzed.output
-val shouldRename = output.exists(f => resolver(f.name, existingName))
-if (shouldRename) {
-  val columns = output.map { col =>
-if (resolver(col.name, existingName)) {
-  Column(col).as(newName)
-} else {
-  Column(col)
-}
-  }
-  select(columns : _*)
-} else {
-  toDF()
-}
-  }
+  def withColumnRenamed(existingName: String, newName: String): DataFrame =
+withColumnsRenamed(Seq(existingName), Seq(newName))
 
   /**
* (Scala-specific)
@@ -2921,18 +2906,24 @@ class Dataset[T] private[sql](
 
 val resolver = sparkSession.sessionState.analyzer.resolver
 val output: Seq[NamedExpression] = queryExecution.analyzed.output
+var shouldRename = false
 
 val projectList = colNames.zip(newColNames).foldLeft(output) {
   case (attrs, (existingName, newName)) =>
 attrs.map(attr =>
   if (resolver(attr.name, existingName)) {
+shouldRename = true
 Alias(attr, newName)()
   } else {
 attr
   }
 )
 }
-withPlan(Project(projectList, logicalPlan))
+if (shouldRename) {
+  withPlan(Project(projectList, logicalPlan))
+} else {
+  toDF()
+}
   }
 
   /**


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-47325][INFRA] Use the latest `buf-setup-action` in github workflow

2024-03-10 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f6a00f2dda93 [SPARK-47325][INFRA] Use the latest `buf-setup-action` in 
github workflow
f6a00f2dda93 is described below

commit f6a00f2dda9379a3b91297a556953d6f4c0f84cd
Author: panbingkun 
AuthorDate: Mon Mar 11 08:53:36 2024 +0900

[SPARK-47325][INFRA] Use the latest `buf-setup-action` in github workflow

### What changes were proposed in this pull request?
The pr aims to `unpin` specific version `buf-setup-action` in github 
workflow building.

### Why are the changes needed?
- [The last](https://github.com/apache/spark/pull/45205) `pin` to a 
`specific version` was due to a bug in the version `v1.29.0-1`.  The latest 
version has been upgraded to `v1.30.0`, and testing has found that this version 
is ok.

- This latest version `v1.30.0` has a `change` regarding the upgrade from 
`node16` to `node20`.
  https://github.com/bufbuild/buf-setup-action/compare/v1.29.0...v1.30.0
  https://github.com/apache/spark/assets/15246973/8277ac62-dc36-4237-8dc0-1522f5f248b8;>

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass GA.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #45433 from panbingkun/test_buf-setup-action_1_30_0.

Authored-by: panbingkun 
Signed-off-by: Hyukjin Kwon 
---
 .github/workflows/build_and_test.yml | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/.github/workflows/build_and_test.yml 
b/.github/workflows/build_and_test.yml
index a24309e137eb..4f2be1c04f98 100644
--- a/.github/workflows/build_and_test.yml
+++ b/.github/workflows/build_and_test.yml
@@ -574,9 +574,8 @@ jobs:
 git -c user.name='Apache Spark Test Account' -c 
user.email='sparktest...@gmail.com' merge --no-commit --progress --squash 
FETCH_HEAD
 git -c user.name='Apache Spark Test Account' -c 
user.email='sparktest...@gmail.com' commit -m "Merged commit" --allow-empty
 - name: Install Buf
-  uses: bufbuild/buf-setup-action@v1.29.0
+  uses: bufbuild/buf-setup-action@v1
   with:
-version: 1.29.0
 github_token: ${{ secrets.GITHUB_TOKEN }}
 - name: Protocol Buffers Linter
   uses: bufbuild/buf-lint-action@v1


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-47200][SS] Make Foreach batch sink user function error handling backward compatible

2024-03-10 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new edb970b8a73e [SPARK-47200][SS] Make Foreach batch sink user function 
error handling backward compatible
edb970b8a73e is described below

commit edb970b8a73e5b1e08b01f9370dadb05a3e231e3
Author: micheal-o 
AuthorDate: Mon Mar 11 08:44:30 2024 +0900

[SPARK-47200][SS] Make Foreach batch sink user function error handling 
backward compatible

### What changes were proposed in this pull request?
I checked in a previous PR (https://github.com/apache/spark/pull/45299), 
that handles and classifies exceptions thrown in user provided functions for 
foreach batch sink. This change is to make it backward compatible in order not 
to break current users, since users may be depending on getting the user code 
error from the `StreamingQueryException.cause` instead of 
`StreamingQueryException.cause.cause`

### Why are the changes needed?
To prevent breaking existing usage pattern.

### Does this PR introduce _any_ user-facing change?
Yes, better error message with error class for ForeachBatchSink user 
function failures.

### How was this patch tested?
updated existing tests

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #45449 from micheal-o/ForeachBatchExBackwardCompat.

Authored-by: micheal-o 
Signed-off-by: Jungtaek Lim 
---
 .../src/main/resources/error/error-classes.json|  2 +-
 docs/sql-error-conditions.md   |  2 +-
 .../sql/execution/streaming/StreamExecution.scala  | 29 +++---
 .../streaming/sources/ForeachBatchSink.scala   | 14 ---
 .../sql/errors/QueryExecutionErrorsSuite.scala |  2 +-
 .../streaming/sources/ForeachBatchSinkSuite.scala  | 17 +++--
 6 files changed, 43 insertions(+), 23 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index 57746d6dbf1e..9717ff2ed49c 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -1297,7 +1297,7 @@
   },
   "FOREACH_BATCH_USER_FUNCTION_ERROR" : {
 "message" : [
-  "An error occurred in the user provided function in foreach batch sink."
+  "An error occurred in the user provided function in foreach batch sink. 
Reason: "
 ],
 "sqlState" : "39000"
   },
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index 7be01f8cb513..0be75cde968f 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -778,7 +778,7 @@ The operation `` is not allowed on the 
``: ``
 
 [SQLSTATE: 
39000](sql-error-conditions-sqlstates.html#class-39-external-routine-invocation-exception)
 
-An error occurred in the user provided function in foreach batch sink.
+An error occurred in the user provided function in foreach batch sink. Reason: 
``
 
 ### FOUND_MULTIPLE_DATA_SOURCES
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 859fce8b1154..50a73082a8c4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -40,6 +40,7 @@ import org.apache.spark.sql.connector.catalog.{SupportsWrite, 
Table}
 import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2, 
ReadLimit, SparkDataStream}
 import org.apache.spark.sql.connector.write.{LogicalWriteInfoImpl, 
SupportsTruncate, Write}
 import org.apache.spark.sql.execution.command.StreamingExplainCommand
+import 
org.apache.spark.sql.execution.streaming.sources.ForeachBatchUserFuncException
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.connector.SupportsStreamingUpdateAsAppend
 import org.apache.spark.sql.streaming._
@@ -279,6 +280,7 @@ abstract class StreamExecution(
* `start()` method returns.
*/
   private def runStream(): Unit = {
+var errorClassOpt: Option[String] = None
 try {
   sparkSession.sparkContext.setJobGroup(runId.toString, 
getBatchDescriptionString,
 interruptOnCancel = true)
@@ -330,9 +332,17 @@ abstract class StreamExecution(
 getLatestExecutionContext().updateStatusMessage("Stopped")
   case e: Throwable =>
 val message = if (e.getMessage == null) "" else e.getMessage
+val cause = if (e.isInstanceOf[ForeachBatchUserFuncException]) {
+  // We want to maintain the current way users get the causing 
exception
+  // from the StreamingQueryException. Hence the ForeachBatch 

(spark) branch master updated: [SPARK-47218][SQL] XML: Changed SchemaOfXml to fail on DROPMALFORMED mode

2024-03-10 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new ae518ecb7068 [SPARK-47218][SQL] XML: Changed SchemaOfXml to fail on 
DROPMALFORMED mode
ae518ecb7068 is described below

commit ae518ecb7068347f70d947255eb54fdfd5ec8d48
Author: Yousof Hosny 
AuthorDate: Mon Mar 11 08:40:19 2024 +0900

[SPARK-47218][SQL] XML: Changed SchemaOfXml to fail on DROPMALFORMED mode

### What changes were proposed in this pull request?

Changed schema_of_xml should fail with an error on DROPMALFORMED mode to 
avoid creating schemas out of invalid XML.

### Why are the changes needed?

DROPMALFORMED parse mode imply silently dropping the malformed record. But 
SchemaOfXml is expected to return a schema and may not have a valid schema to 
return for a malformed record. So DROPMALFORMED cannot be supported..

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #45379 from yhosny/xml-parsemode-error.

Authored-by: Yousof Hosny 
Signed-off-by: Hyukjin Kwon 
---
 .../sql/catalyst/expressions/xmlExpressions.scala  |  8 +++--
 .../sql/execution/datasources/xml/XmlSuite.scala   | 36 ++
 2 files changed, 42 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala
index 800515ca84b5..8cc1c3a89745 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
-import org.apache.spark.sql.catalyst.util.{ArrayData, FailFastMode, 
FailureSafeParser, GenericArrayData, PermissiveMode}
+import org.apache.spark.sql.catalyst.util.{ArrayData, DropMalformedMode, 
FailFastMode, FailureSafeParser, GenericArrayData, PermissiveMode}
 import org.apache.spark.sql.catalyst.xml.{StaxXmlGenerator, StaxXmlParser, 
ValidatorUtil, XmlInferSchema, XmlOptions}
 import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase}
 import org.apache.spark.sql.internal.SQLConf
@@ -189,8 +189,12 @@ case class SchemaOfXml(
   private lazy val xmlFactory = xmlOptions.buildXmlFactory()
 
   @transient
-  private lazy val xmlInferSchema =
+  private lazy val xmlInferSchema = {
+if (xmlOptions.parseMode == DropMalformedMode) {
+  throw QueryCompilationErrors.parseModeUnsupportedError("schema_of_xml", 
xmlOptions.parseMode)
+}
 new XmlInferSchema(xmlOptions, caseSensitive = 
SQLConf.get.caseSensitiveAnalysis)
+  }
 
   @transient
   private lazy val xml = child.eval().asInstanceOf[UTF8String]
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala
index 2194f76e7da6..d7dc96184dab 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala
@@ -1302,6 +1302,42 @@ class XmlSuite
 
assert(result.select("decoded._corrupt_record").head().getString(0).nonEmpty)
   }
 
+  test("schema_of_xml with DROPMALFORMED parse error test") {
+val e = intercept[AnalysisException] {
+   spark.sql(s"""SELECT schema_of_xml('1', map('mode', 
'DROPMALFORMED'))""")
+ .collect()
+}
+checkError(
+  exception = e,
+  errorClass = "_LEGACY_ERROR_TEMP_1099",
+  parameters = Map(
+"funcName" -> "schema_of_xml",
+"mode" -> "DROPMALFORMED",
+"permissiveMode" -> "PERMISSIVE",
+"failFastMode" -> FailFastMode.name)
+)
+  }
+
+  test("schema_of_xml with FAILFAST parse error test") {
+val e = intercept[SparkException] {
+   spark.sql(s"""SELECT schema_of_xml('1', map('mode', 
'FAILFAST'))""")
+ .collect()
+}
+checkError(
+  exception = e,
+  errorClass = "_LEGACY_ERROR_TEMP_2165",
+  parameters = Map(
+"failFastMode" -> FailFastMode.name)
+)
+  }
+
+  test("schema_of_xml with PERMISSIVE check no error test") {
+  val s = spark.sql(s"""SELECT schema_of_xml('1', map('mode', 
'PERMISSIVE'))""")
+.collect()
+  assert(s.head.get(0) == 

(spark) branch master updated: [MINOR][SQL][TEST] Moving tests to related suites

2024-03-10 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 264e00ee12bb [MINOR][SQL][TEST] Moving tests to related suites
264e00ee12bb is described below

commit 264e00ee12bbbd822e52fa8ce79692c60f531495
Author: Mihailo Milosevic 
AuthorDate: Mon Mar 11 00:25:55 2024 +0500

[MINOR][SQL][TEST] Moving tests to related suites

### What changes were proposed in this pull request?
Tests from `QueryCompilationErrorsSuite` were moved to `DDLSuite` and 
`JDBCTableCatalogSuite`.

### Why are the changes needed?
We should move tests to related test suites in order to improve testing.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Corresponding Suites succeed.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #45439 from mihailom-db/SPARK-47326.

Authored-by: Mihailo Milosevic 
Signed-off-by: Max Gekk 
---
 .../sql/errors/QueryCompilationErrorsSuite.scala   | 74 --
 .../spark/sql/execution/command/DDLSuite.scala | 17 +
 .../v2/jdbc/JDBCTableCatalogSuite.scala| 56 
 3 files changed, 73 insertions(+), 74 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala
index c9198c86c720..4574d3328d48 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala
@@ -25,13 +25,11 @@ import org.apache.spark.sql.api.java.{UDF1, UDF2, UDF23Test}
 import org.apache.spark.sql.catalyst.expressions.{Coalesce, Literal, UnsafeRow}
 import org.apache.spark.sql.catalyst.parser.ParseException
 import 
org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter
-import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
 import org.apache.spark.sql.expressions.SparkUserDefinedFunction
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types._
-import org.apache.spark.util.Utils
 
 case class StringLongClass(a: String, b: Long)
 
@@ -817,78 +815,6 @@ class QueryCompilationErrorsSuite
   parameters = Map("extraction" -> "\"array(test)\""))
   }
 
-  test("CREATE NAMESPACE with LOCATION for JDBC catalog should throw an 
error") {
-withTempDir { tempDir =>
-  val url = 
s"jdbc:h2:${tempDir.getCanonicalPath};user=testUser;password=testPass"
-  Utils.classForName("org.h2.Driver")
-  withSQLConf(
-"spark.sql.catalog.h2" -> classOf[JDBCTableCatalog].getName,
-"spark.sql.catalog.h2.url" -> url,
-"spark.sql.catalog.h2.driver" -> "org.h2.Driver") {
-checkError(
-  exception = intercept[AnalysisException] {
-sql("CREATE NAMESPACE h2.test_namespace LOCATION './samplepath'")
-  },
-  errorClass = "NOT_SUPPORTED_IN_JDBC_CATALOG.COMMAND",
-  sqlState = "0A000",
-  parameters = Map("cmd" -> toSQLStmt("CREATE NAMESPACE ... LOCATION 
...")))
-  }
-}
-  }
-
-  test("ALTER NAMESPACE with property other than COMMENT " +
-"for JDBC catalog should throw an exception") {
-withTempDir { tempDir =>
-  val url = 
s"jdbc:h2:${tempDir.getCanonicalPath};user=testUser;password=testPass"
-  Utils.classForName("org.h2.Driver")
-  withSQLConf(
-"spark.sql.catalog.h2" -> classOf[JDBCTableCatalog].getName,
-"spark.sql.catalog.h2.url" -> url,
-"spark.sql.catalog.h2.driver" -> "org.h2.Driver") {
-val namespace = "h2.test_namespace"
-withNamespace(namespace) {
-  sql(s"CREATE NAMESPACE $namespace")
-  checkError(
-exception = intercept[AnalysisException] {
-  sql(s"ALTER NAMESPACE h2.test_namespace SET LOCATION 
'/tmp/loc_test_2'")
-},
-errorClass = "NOT_SUPPORTED_IN_JDBC_CATALOG.COMMAND_WITH_PROPERTY",
-sqlState = "0A000",
-parameters = Map(
-  "cmd" -> toSQLStmt("SET NAMESPACE"),
-  "property" -> toSQLConf("location")))
-
-  checkError(
-exception = intercept[AnalysisException] {
-  sql(s"ALTER NAMESPACE h2.test_namespace SET PROPERTIES('a'='b')")
-},
-errorClass = "NOT_SUPPORTED_IN_JDBC_CATALOG.COMMAND_WITH_PROPERTY",
-sqlState = "0A000",
-parameters = Map(
-  "cmd" -> toSQLStmt("SET NAMESPACE"),
-  "property" -> toSQLConf("a")))
-}
-  }
-}
-  }
-
-