rangadi commented on code in PR #40785:
URL: https://github.com/apache/spark/pull/40785#discussion_r1166104079
##########
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -298,6 +306,16 @@ message StreamingQueryCommandResult {
// Logical and physical plans as string
string result = 1;
}
+
+ message ExceptionResult {
Review Comment:
We probably need at least the stacktrace. Leave a comment about what our
thinking is there.
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2221,6 +2221,31 @@ class SparkConnectPlanner(val session: SparkSession) {
.build()
respBuilder.setExplain(explain)
+ case StreamingQueryCommand.CommandCase.EXCEPTION =>
+ val result = query.exception
+ val exception = result match {
+ case Some(e) =>
+ StreamingQueryCommandResult.ExceptionResult
+ .newBuilder()
+ .setHasException(true)
+ .setErrorMessage(SparkConnectService.extractErrorMessage(e))
+ .build()
+ case None =>
+ StreamingQueryCommandResult.ExceptionResult
+ .newBuilder()
+ .setHasException(false)
+ .build()
+ }
+ respBuilder.setException(exception)
+
+ case StreamingQueryCommand.CommandCase.AWAIT_TERMINATION =>
+ val terminated =
query.awaitTermination(command.getAwaitTermination.getTimeoutMs)
Review Comment:
Server side should honor the API independent of what one client does. That
includes handling '0' correctly.
##########
python/pyspark/sql/connect/streaming/query.py:
##########
@@ -66,11 +70,35 @@ def isActive(self) -> bool:
isActive.__doc__ = PySparkStreamingQuery.isActive.__doc__
- # TODO (SPARK-42960): Implement and uncomment the doc
+ def _execute_await_termination_cmd(self, timeout: int = 10) -> bool:
+ cmd = pb2.StreamingQueryCommand()
+ cmd.await_termination.timeout_ms = timeout
+ terminated =
self._execute_streaming_query_cmd(cmd).await_termination.terminated
+ return terminated
+
+ def _await_termination(self, timeoutMs: Optional[int]) -> Optional[bool]:
Review Comment:
Something like this is probably better to do on the server side.
For this PR, I think we can skip doing this. We already have long running
RPCs like 'processAllAvailable()'. We can handle them better together on the
server side.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]