This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 466a2155ebf [SPARK-43134][CONNECT][SS] JVM client StreamingQuery 
exception() API
466a2155ebf is described below

commit 466a2155ebf8507cecc297a198cb990cd3d431f2
Author: Wei Liu <wei....@databricks.com>
AuthorDate: Wed Apr 26 21:58:21 2023 +0900

    [SPARK-43134][CONNECT][SS] JVM client StreamingQuery exception() API
    
    ### What changes were proposed in this pull request?
    
    Add StreamingQuery exception() API for JVM client
    
    ### Why are the changes needed?
    
    Development of SS Connect
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes
    
    ### How was this patch tested?
    
    Manual test:
    ```
    Spark session available as 'spark'.
       _____                  __      ______                            __
      / ___/____  ____ ______/ /__   / ____/___  ____  ____  ___  _____/ /_
      \__ \/ __ \/ __ `/ ___/ //_/  / /   / __ \/ __ \/ __ \/ _ \/ ___/ __/
     ___/ / /_/ / /_/ / /  / ,<    / /___/ /_/ / / / / / / /  __/ /__/ /_
    /____/ .___/\__,_/_/  /_/|_|   \____/\____/_/ /_/_/ /_/\___/\___/\__/
        /_/
    
      val q = 
spark.readStream.format("rate").load().writeStream.option("checkpointLocation", 
"/home/wei.liu/ckpt").toTable("my_table")
    q: org.apache.spark.sql.streaming.StreamingQuery = 
org.apache.spark.sql.streaming.RemoteStreamingQuery772f3a3f
    
     q.exception
    res1: Option[org.apache.spark.sql.streaming.StreamingQueryException] = None
    
     q.stop()
    
    ```
    
    Closes #40906 from WweiL/SPARK-43134-scala-exception.
    
    Authored-by: Wei Liu <wei....@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../spark/sql/streaming/StreamingQuery.scala       | 20 ++++++++++++
 .../sql/streaming/StreamingQueryException.scala    | 38 ++++++++++++++++++++++
 .../CheckConnectJvmClientCompatibility.scala       |  3 --
 .../src/main/protobuf/spark/connect/commands.proto |  6 +++-
 .../sql/connect/planner/SparkConnectPlanner.scala  |  3 +-
 .../sql/connect/service/SparkConnectService.scala  |  4 ++-
 python/pyspark/sql/connect/proto/commands_pb2.py   | 24 +++++++-------
 python/pyspark/sql/connect/proto/commands_pb2.pyi  | 23 ++++++++++++-
 python/pyspark/sql/connect/streaming/query.py      |  6 +++-
 9 files changed, 107 insertions(+), 20 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
index 8bb35382162..a1bd8e264cc 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
@@ -78,6 +78,12 @@ trait StreamingQuery {
    */
   def isActive: Boolean
 
+  /**
+   * Returns the [[StreamingQueryException]] if the query was terminated by an 
exception.
+   * @since 3.5.0
+   */
+  def exception: Option[StreamingQueryException]
+
   /**
    * Returns the current status of the query.
    *
@@ -199,6 +205,20 @@ class RemoteStreamingQuery(
     // scalastyle:on println
   }
 
+  override def exception: Option[StreamingQueryException] = {
+    val exception = executeQueryCmd(_.setException(true)).getException
+    if (exception.hasExceptionMessage) {
+      // TODO(SPARK-43206): Add more information to StreamingQueryException.
+      Some(
+        new StreamingQueryException(
+          // message maps to the return value of original 
StreamingQueryException's toString method
+          message = exception.getExceptionMessage,
+          errorClass = exception.getErrorClass))
+    } else {
+      None
+    }
+  }
+
   private def executeQueryCmd(
       setCmdFn: StreamingQueryCommand.Builder => Unit // Sets the command 
field, like stop().
   ): StreamingQueryCommandResult = {
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
new file mode 100644
index 00000000000..875c216a3e7
--- /dev/null
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.SparkThrowable
+import org.apache.spark.annotation.Evolving
+
+/**
+ * Exception that stopped a [[StreamingQuery]] in Spark Connect. Currently not 
all fields in the
+ * original StreamingQueryException are supported.
+ * @param message
+ *   Maps to return value of original StreamingQueryException's toString method
+ * @param errorClass
+ *   Error class of this exception
+ * @since 3.5.0
+ */
+@Evolving
+class StreamingQueryException private[sql] (message: String, errorClass: 
String)
+    extends SparkThrowable {
+
+  // TODO(SPARK-43206): Add stack trace
+  override def getErrorClass: String = errorClass
+}
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
index 1b8aacebc54..c71017bb271 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
@@ -247,9 +247,6 @@ object CheckConnectJvmClientCompatibility {
       ProblemFilters.exclude[Problem](
         "org.apache.spark.sql.streaming.StreamingQuery.awaitTermination" // 
TODO(SPARK-43143)
       ),
-      ProblemFilters.exclude[Problem](
-        "org.apache.spark.sql.streaming.StreamingQuery.exception" // 
TODO(SPARK-43134)
-      ),
       ProblemFilters.exclude[Problem](
         "org.apache.spark.sql.streaming.StreamingQueryProgress.*" // 
TODO(SPARK-43128)
       ),
diff --git 
a/connector/connect/common/src/main/protobuf/spark/connect/commands.proto 
b/connector/connect/common/src/main/protobuf/spark/connect/commands.proto
index 2b648bf0f9a..0d6c29da9f8 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/commands.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/commands.proto
@@ -308,8 +308,12 @@ message StreamingQueryCommandResult {
   }
 
   message ExceptionResult {
-    // Exception message as string
+    // (Optional) Exception message as string, maps to the return value of 
original
+    // StreamingQueryException's toString method
     optional string exception_message = 1;
+    // (Optional) Exception error class as string
+    optional string error_class = 2;
+    // TODO(SPARK-43206): Add stack trace
   }
 
   message AwaitTerminationResult {
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 11c02c72187..e7de15f62f9 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -2328,7 +2328,8 @@ class SparkConnectPlanner(val session: SparkSession) {
         val result = query.exception
         result.foreach(e =>
           respBuilder.getExceptionBuilder
-            .setExceptionMessage(SparkConnectService.extractErrorMessage(e)))
+            
.setExceptionMessage(SparkConnectService.abbreviateErrorMessage(e.toString))
+            .setErrorClass(e.getErrorClass))
 
       case StreamingQueryCommand.CommandCase.AWAIT_TERMINATION =>
         if (command.getAwaitTermination.hasTimeoutMs) {
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
index b894f30990c..09a3ff39698 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
@@ -334,8 +334,10 @@ object SparkConnectService {
     }
   }
 
+  def abbreviateErrorMessage(msg: String): String = 
StringUtils.abbreviate(msg, 2048)
+
   def extractErrorMessage(st: Throwable): String = {
-    val message = StringUtils.abbreviate(st.getMessage, 2048)
+    val message = abbreviateErrorMessage(st.getMessage)
     if (message != null) {
       message
     } else {
diff --git a/python/pyspark/sql/connect/proto/commands_pb2.py 
b/python/pyspark/sql/connect/proto/commands_pb2.py
index 27de95a7aaa..73575fbed85 100644
--- a/python/pyspark/sql/connect/proto/commands_pb2.py
+++ b/python/pyspark/sql/connect/proto/commands_pb2.py
@@ -36,7 +36,7 @@ from pyspark.sql.connect.proto import relations_pb2 as 
spark_dot_connect_dot_rel
 
 
 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-    
b'\n\x1cspark/connect/commands.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto"\x90\x06\n\x07\x43ommand\x12]\n\x11register_function\x18\x01
 
\x01(\x0b\x32..spark.connect.CommonInlineUserDefinedFunctionH\x00R\x10registerFunction\x12H\n\x0fwrite_operation\x18\x02
 
\x01(\x0b\x32\x1d.spark.connect.WriteOperationH\x00R\x0ewriteOperation\x12_\n\x15\x63reate_dataframe_view\x
 [...]
+    
b'\n\x1cspark/connect/commands.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto"\x90\x06\n\x07\x43ommand\x12]\n\x11register_function\x18\x01
 
\x01(\x0b\x32..spark.connect.CommonInlineUserDefinedFunctionH\x00R\x10registerFunction\x12H\n\x0fwrite_operation\x18\x02
 
\x01(\x0b\x32\x1d.spark.connect.WriteOperationH\x00R\x0ewriteOperation\x12_\n\x15\x63reate_dataframe_view\x
 [...]
 )
 
 
@@ -435,21 +435,21 @@ if _descriptor._USE_C_DESCRIPTORS == False:
     _STREAMINGQUERYCOMMAND_AWAITTERMINATIONCOMMAND._serialized_start = 4539
     _STREAMINGQUERYCOMMAND_AWAITTERMINATIONCOMMAND._serialized_end = 4615
     _STREAMINGQUERYCOMMANDRESULT._serialized_start = 4629
-    _STREAMINGQUERYCOMMANDRESULT._serialized_end = 5661
+    _STREAMINGQUERYCOMMANDRESULT._serialized_end = 5716
     _STREAMINGQUERYCOMMANDRESULT_STATUSRESULT._serialized_start = 5212
     _STREAMINGQUERYCOMMANDRESULT_STATUSRESULT._serialized_end = 5382
     _STREAMINGQUERYCOMMANDRESULT_RECENTPROGRESSRESULT._serialized_start = 5384
     _STREAMINGQUERYCOMMANDRESULT_RECENTPROGRESSRESULT._serialized_end = 5456
     _STREAMINGQUERYCOMMANDRESULT_EXPLAINRESULT._serialized_start = 5458
     _STREAMINGQUERYCOMMANDRESULT_EXPLAINRESULT._serialized_end = 5497
-    _STREAMINGQUERYCOMMANDRESULT_EXCEPTIONRESULT._serialized_start = 5499
-    _STREAMINGQUERYCOMMANDRESULT_EXCEPTIONRESULT._serialized_end = 5588
-    _STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT._serialized_start = 
5590
-    _STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT._serialized_end = 5646
-    _GETRESOURCESCOMMAND._serialized_start = 5663
-    _GETRESOURCESCOMMAND._serialized_end = 5684
-    _GETRESOURCESCOMMANDRESULT._serialized_start = 5687
-    _GETRESOURCESCOMMANDRESULT._serialized_end = 5899
-    _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._serialized_start = 5803
-    _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._serialized_end = 5899
+    _STREAMINGQUERYCOMMANDRESULT_EXCEPTIONRESULT._serialized_start = 5500
+    _STREAMINGQUERYCOMMANDRESULT_EXCEPTIONRESULT._serialized_end = 5643
+    _STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT._serialized_start = 
5645
+    _STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT._serialized_end = 5701
+    _GETRESOURCESCOMMAND._serialized_start = 5718
+    _GETRESOURCESCOMMAND._serialized_end = 5739
+    _GETRESOURCESCOMMANDRESULT._serialized_start = 5742
+    _GETRESOURCESCOMMANDRESULT._serialized_end = 5954
+    _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._serialized_start = 5858
+    _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._serialized_end = 5954
 # @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/commands_pb2.pyi 
b/python/pyspark/sql/connect/proto/commands_pb2.pyi
index 972fe7503a1..81856352167 100644
--- a/python/pyspark/sql/connect/proto/commands_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/commands_pb2.pyi
@@ -1087,18 +1087,30 @@ class 
StreamingQueryCommandResult(google.protobuf.message.Message):
         DESCRIPTOR: google.protobuf.descriptor.Descriptor
 
         EXCEPTION_MESSAGE_FIELD_NUMBER: builtins.int
+        ERROR_CLASS_FIELD_NUMBER: builtins.int
         exception_message: builtins.str
-        """Exception message as string"""
+        """(Optional) Exception message as string, maps to the return value of 
original
+        StreamingQueryException's toString method
+        """
+        error_class: builtins.str
+        """(Optional) Exception error class as string
+        TODO(SPARK-43206): Add stack trace
+        """
         def __init__(
             self,
             *,
             exception_message: builtins.str | None = ...,
+            error_class: builtins.str | None = ...,
         ) -> None: ...
         def HasField(
             self,
             field_name: typing_extensions.Literal[
+                "_error_class",
+                b"_error_class",
                 "_exception_message",
                 b"_exception_message",
+                "error_class",
+                b"error_class",
                 "exception_message",
                 b"exception_message",
             ],
@@ -1106,12 +1118,21 @@ class 
StreamingQueryCommandResult(google.protobuf.message.Message):
         def ClearField(
             self,
             field_name: typing_extensions.Literal[
+                "_error_class",
+                b"_error_class",
                 "_exception_message",
                 b"_exception_message",
+                "error_class",
+                b"error_class",
                 "exception_message",
                 b"exception_message",
             ],
         ) -> None: ...
+        @typing.overload
+        def WhichOneof(
+            self, oneof_group: typing_extensions.Literal["_error_class", 
b"_error_class"]
+        ) -> typing_extensions.Literal["error_class"] | None: ...
+        @typing.overload
         def WhichOneof(
             self,
             oneof_group: typing_extensions.Literal["_exception_message", 
b"_exception_message"],
diff --git a/python/pyspark/sql/connect/streaming/query.py 
b/python/pyspark/sql/connect/streaming/query.py
index eb196971985..fc207243ff3 100644
--- a/python/pyspark/sql/connect/streaming/query.py
+++ b/python/pyspark/sql/connect/streaming/query.py
@@ -148,7 +148,11 @@ class StreamingQuery:
         cmd.exception = True
         exception = self._execute_streaming_query_cmd(cmd).exception
         if exception.HasField("exception_message"):
-            return CapturedStreamingQueryException(exception.exception_message)
+            # Drop the Java StreamingQueryException type info
+            # exception_message maps to the return value of original
+            # StreamingQueryException's toString method
+            msg = exception.exception_message.split(": ", 1)[1]
+            return CapturedStreamingQueryException(msg)
         else:
             return None
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to