This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 466a2155ebf [SPARK-43134][CONNECT][SS] JVM client StreamingQuery exception() API 466a2155ebf is described below commit 466a2155ebf8507cecc297a198cb990cd3d431f2 Author: Wei Liu <wei....@databricks.com> AuthorDate: Wed Apr 26 21:58:21 2023 +0900 [SPARK-43134][CONNECT][SS] JVM client StreamingQuery exception() API ### What changes were proposed in this pull request? Add StreamingQuery exception() API for JVM client ### Why are the changes needed? Development of SS Connect ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? Manual test: ``` Spark session available as 'spark'. _____ __ ______ __ / ___/____ ____ ______/ /__ / ____/___ ____ ____ ___ _____/ /_ \__ \/ __ \/ __ `/ ___/ //_/ / / / __ \/ __ \/ __ \/ _ \/ ___/ __/ ___/ / /_/ / /_/ / / / ,< / /___/ /_/ / / / / / / / __/ /__/ /_ /____/ .___/\__,_/_/ /_/|_| \____/\____/_/ /_/_/ /_/\___/\___/\__/ /_/ val q = spark.readStream.format("rate").load().writeStream.option("checkpointLocation", "/home/wei.liu/ckpt").toTable("my_table") q: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.streaming.RemoteStreamingQuery772f3a3f q.exception res1: Option[org.apache.spark.sql.streaming.StreamingQueryException] = None q.stop() ``` Closes #40906 from WweiL/SPARK-43134-scala-exception. Authored-by: Wei Liu <wei....@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../spark/sql/streaming/StreamingQuery.scala | 20 ++++++++++++ .../sql/streaming/StreamingQueryException.scala | 38 ++++++++++++++++++++++ .../CheckConnectJvmClientCompatibility.scala | 3 -- .../src/main/protobuf/spark/connect/commands.proto | 6 +++- .../sql/connect/planner/SparkConnectPlanner.scala | 3 +- .../sql/connect/service/SparkConnectService.scala | 4 ++- python/pyspark/sql/connect/proto/commands_pb2.py | 24 +++++++------- python/pyspark/sql/connect/proto/commands_pb2.pyi | 23 ++++++++++++- python/pyspark/sql/connect/streaming/query.py | 6 +++- 9 files changed, 107 insertions(+), 20 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala index 8bb35382162..a1bd8e264cc 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala @@ -78,6 +78,12 @@ trait StreamingQuery { */ def isActive: Boolean + /** + * Returns the [[StreamingQueryException]] if the query was terminated by an exception. + * @since 3.5.0 + */ + def exception: Option[StreamingQueryException] + /** * Returns the current status of the query. * @@ -199,6 +205,20 @@ class RemoteStreamingQuery( // scalastyle:on println } + override def exception: Option[StreamingQueryException] = { + val exception = executeQueryCmd(_.setException(true)).getException + if (exception.hasExceptionMessage) { + // TODO(SPARK-43206): Add more information to StreamingQueryException. + Some( + new StreamingQueryException( + // message maps to the return value of original StreamingQueryException's toString method + message = exception.getExceptionMessage, + errorClass = exception.getErrorClass)) + } else { + None + } + } + private def executeQueryCmd( setCmdFn: StreamingQueryCommand.Builder => Unit // Sets the command field, like stop(). ): StreamingQueryCommandResult = { diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala new file mode 100644 index 00000000000..875c216a3e7 --- /dev/null +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import org.apache.spark.SparkThrowable +import org.apache.spark.annotation.Evolving + +/** + * Exception that stopped a [[StreamingQuery]] in Spark Connect. Currently not all fields in the + * original StreamingQueryException are supported. + * @param message + * Maps to return value of original StreamingQueryException's toString method + * @param errorClass + * Error class of this exception + * @since 3.5.0 + */ +@Evolving +class StreamingQueryException private[sql] (message: String, errorClass: String) + extends SparkThrowable { + + // TODO(SPARK-43206): Add stack trace + override def getErrorClass: String = errorClass +} diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala index 1b8aacebc54..c71017bb271 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala @@ -247,9 +247,6 @@ object CheckConnectJvmClientCompatibility { ProblemFilters.exclude[Problem]( "org.apache.spark.sql.streaming.StreamingQuery.awaitTermination" // TODO(SPARK-43143) ), - ProblemFilters.exclude[Problem]( - "org.apache.spark.sql.streaming.StreamingQuery.exception" // TODO(SPARK-43134) - ), ProblemFilters.exclude[Problem]( "org.apache.spark.sql.streaming.StreamingQueryProgress.*" // TODO(SPARK-43128) ), diff --git a/connector/connect/common/src/main/protobuf/spark/connect/commands.proto b/connector/connect/common/src/main/protobuf/spark/connect/commands.proto index 2b648bf0f9a..0d6c29da9f8 100644 --- a/connector/connect/common/src/main/protobuf/spark/connect/commands.proto +++ b/connector/connect/common/src/main/protobuf/spark/connect/commands.proto @@ -308,8 +308,12 @@ message StreamingQueryCommandResult { } message ExceptionResult { - // Exception message as string + // (Optional) Exception message as string, maps to the return value of original + // StreamingQueryException's toString method optional string exception_message = 1; + // (Optional) Exception error class as string + optional string error_class = 2; + // TODO(SPARK-43206): Add stack trace } message AwaitTerminationResult { diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 11c02c72187..e7de15f62f9 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -2328,7 +2328,8 @@ class SparkConnectPlanner(val session: SparkSession) { val result = query.exception result.foreach(e => respBuilder.getExceptionBuilder - .setExceptionMessage(SparkConnectService.extractErrorMessage(e))) + .setExceptionMessage(SparkConnectService.abbreviateErrorMessage(e.toString)) + .setErrorClass(e.getErrorClass)) case StreamingQueryCommand.CommandCase.AWAIT_TERMINATION => if (command.getAwaitTermination.hasTimeoutMs) { diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala index b894f30990c..09a3ff39698 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala @@ -334,8 +334,10 @@ object SparkConnectService { } } + def abbreviateErrorMessage(msg: String): String = StringUtils.abbreviate(msg, 2048) + def extractErrorMessage(st: Throwable): String = { - val message = StringUtils.abbreviate(st.getMessage, 2048) + val message = abbreviateErrorMessage(st.getMessage) if (message != null) { message } else { diff --git a/python/pyspark/sql/connect/proto/commands_pb2.py b/python/pyspark/sql/connect/proto/commands_pb2.py index 27de95a7aaa..73575fbed85 100644 --- a/python/pyspark/sql/connect/proto/commands_pb2.py +++ b/python/pyspark/sql/connect/proto/commands_pb2.py @@ -36,7 +36,7 @@ from pyspark.sql.connect.proto import relations_pb2 as spark_dot_connect_dot_rel DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x1cspark/connect/commands.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto"\x90\x06\n\x07\x43ommand\x12]\n\x11register_function\x18\x01 \x01(\x0b\x32..spark.connect.CommonInlineUserDefinedFunctionH\x00R\x10registerFunction\x12H\n\x0fwrite_operation\x18\x02 \x01(\x0b\x32\x1d.spark.connect.WriteOperationH\x00R\x0ewriteOperation\x12_\n\x15\x63reate_dataframe_view\x [...] + b'\n\x1cspark/connect/commands.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto"\x90\x06\n\x07\x43ommand\x12]\n\x11register_function\x18\x01 \x01(\x0b\x32..spark.connect.CommonInlineUserDefinedFunctionH\x00R\x10registerFunction\x12H\n\x0fwrite_operation\x18\x02 \x01(\x0b\x32\x1d.spark.connect.WriteOperationH\x00R\x0ewriteOperation\x12_\n\x15\x63reate_dataframe_view\x [...] ) @@ -435,21 +435,21 @@ if _descriptor._USE_C_DESCRIPTORS == False: _STREAMINGQUERYCOMMAND_AWAITTERMINATIONCOMMAND._serialized_start = 4539 _STREAMINGQUERYCOMMAND_AWAITTERMINATIONCOMMAND._serialized_end = 4615 _STREAMINGQUERYCOMMANDRESULT._serialized_start = 4629 - _STREAMINGQUERYCOMMANDRESULT._serialized_end = 5661 + _STREAMINGQUERYCOMMANDRESULT._serialized_end = 5716 _STREAMINGQUERYCOMMANDRESULT_STATUSRESULT._serialized_start = 5212 _STREAMINGQUERYCOMMANDRESULT_STATUSRESULT._serialized_end = 5382 _STREAMINGQUERYCOMMANDRESULT_RECENTPROGRESSRESULT._serialized_start = 5384 _STREAMINGQUERYCOMMANDRESULT_RECENTPROGRESSRESULT._serialized_end = 5456 _STREAMINGQUERYCOMMANDRESULT_EXPLAINRESULT._serialized_start = 5458 _STREAMINGQUERYCOMMANDRESULT_EXPLAINRESULT._serialized_end = 5497 - _STREAMINGQUERYCOMMANDRESULT_EXCEPTIONRESULT._serialized_start = 5499 - _STREAMINGQUERYCOMMANDRESULT_EXCEPTIONRESULT._serialized_end = 5588 - _STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT._serialized_start = 5590 - _STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT._serialized_end = 5646 - _GETRESOURCESCOMMAND._serialized_start = 5663 - _GETRESOURCESCOMMAND._serialized_end = 5684 - _GETRESOURCESCOMMANDRESULT._serialized_start = 5687 - _GETRESOURCESCOMMANDRESULT._serialized_end = 5899 - _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._serialized_start = 5803 - _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._serialized_end = 5899 + _STREAMINGQUERYCOMMANDRESULT_EXCEPTIONRESULT._serialized_start = 5500 + _STREAMINGQUERYCOMMANDRESULT_EXCEPTIONRESULT._serialized_end = 5643 + _STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT._serialized_start = 5645 + _STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT._serialized_end = 5701 + _GETRESOURCESCOMMAND._serialized_start = 5718 + _GETRESOURCESCOMMAND._serialized_end = 5739 + _GETRESOURCESCOMMANDRESULT._serialized_start = 5742 + _GETRESOURCESCOMMANDRESULT._serialized_end = 5954 + _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._serialized_start = 5858 + _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._serialized_end = 5954 # @@protoc_insertion_point(module_scope) diff --git a/python/pyspark/sql/connect/proto/commands_pb2.pyi b/python/pyspark/sql/connect/proto/commands_pb2.pyi index 972fe7503a1..81856352167 100644 --- a/python/pyspark/sql/connect/proto/commands_pb2.pyi +++ b/python/pyspark/sql/connect/proto/commands_pb2.pyi @@ -1087,18 +1087,30 @@ class StreamingQueryCommandResult(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor EXCEPTION_MESSAGE_FIELD_NUMBER: builtins.int + ERROR_CLASS_FIELD_NUMBER: builtins.int exception_message: builtins.str - """Exception message as string""" + """(Optional) Exception message as string, maps to the return value of original + StreamingQueryException's toString method + """ + error_class: builtins.str + """(Optional) Exception error class as string + TODO(SPARK-43206): Add stack trace + """ def __init__( self, *, exception_message: builtins.str | None = ..., + error_class: builtins.str | None = ..., ) -> None: ... def HasField( self, field_name: typing_extensions.Literal[ + "_error_class", + b"_error_class", "_exception_message", b"_exception_message", + "error_class", + b"error_class", "exception_message", b"exception_message", ], @@ -1106,12 +1118,21 @@ class StreamingQueryCommandResult(google.protobuf.message.Message): def ClearField( self, field_name: typing_extensions.Literal[ + "_error_class", + b"_error_class", "_exception_message", b"_exception_message", + "error_class", + b"error_class", "exception_message", b"exception_message", ], ) -> None: ... + @typing.overload + def WhichOneof( + self, oneof_group: typing_extensions.Literal["_error_class", b"_error_class"] + ) -> typing_extensions.Literal["error_class"] | None: ... + @typing.overload def WhichOneof( self, oneof_group: typing_extensions.Literal["_exception_message", b"_exception_message"], diff --git a/python/pyspark/sql/connect/streaming/query.py b/python/pyspark/sql/connect/streaming/query.py index eb196971985..fc207243ff3 100644 --- a/python/pyspark/sql/connect/streaming/query.py +++ b/python/pyspark/sql/connect/streaming/query.py @@ -148,7 +148,11 @@ class StreamingQuery: cmd.exception = True exception = self._execute_streaming_query_cmd(cmd).exception if exception.HasField("exception_message"): - return CapturedStreamingQueryException(exception.exception_message) + # Drop the Java StreamingQueryException type info + # exception_message maps to the return value of original + # StreamingQueryException's toString method + msg = exception.exception_message.split(": ", 1)[1] + return CapturedStreamingQueryException(msg) else: return None --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org