rangadi commented on code in PR #40785:
URL: https://github.com/apache/spark/pull/40785#discussion_r1170374413
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2245,6 +2245,21 @@ class SparkConnectPlanner(val session: SparkSession) {
.build()
respBuilder.setExplain(explain)
+ case StreamingQueryCommand.CommandCase.EXCEPTION =>
+ val result = query.exception
+ result.foreach(e =>
+ respBuilder.getExceptionBuilder
+ .setExceptionMessage(SparkConnectService.extractErrorMessage(e)))
+
+ case StreamingQueryCommand.CommandCase.AWAIT_TERMINATION =>
+ if (command.getAwaitTermination.hasTimeoutMs) {
+ val terminated =
query.awaitTermination(command.getAwaitTermination.getTimeoutMs)
+ respBuilder.getAwaitTerminationBuilder
+ .setTerminated(terminated)
+ } else {
+ query.awaitTermination()
Review Comment:
We we need to set response here, right?
##########
python/pyspark/sql/tests/streaming/test_streaming.py:
##########
@@ -253,8 +253,12 @@ def test_stream_await_termination(self):
duration = time.time() - now
self.assertTrue(duration >= 2)
self.assertFalse(res)
- finally:
+
q.processAllAvailable()
+ q.stop()
+ q.awaitTermination()
Review Comment:
Make this a comment in the code.
##########
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -298,6 +309,15 @@ message StreamingQueryCommandResult {
// Logical and physical plans as string
string result = 1;
}
+
+ message ExceptionResult {
+ // Exception message as string
+ optional string exception_message = 1;
+ }
+
+ message AwaitTerminationResult {
+ optional bool terminated = 1;
Review Comment:
We don't need separate message if this is just one field. We can directly
use bool.
##########
python/pyspark/sql/connect/streaming/query.py:
##########
@@ -66,11 +69,25 @@ def isActive(self) -> bool:
isActive.__doc__ = PySparkStreamingQuery.isActive.__doc__
- # TODO (SPARK-42960): Implement and uncomment the doc
+ def _execute_await_termination_cmd(self, timeoutMs: Optional[int] = None)
-> Optional[bool]:
+ cmd = pb2.StreamingQueryCommand()
+ if timeoutMs is not None:
+ cmd.await_termination.timeout_ms = timeoutMs
+ terminated =
self._execute_streaming_query_cmd(cmd).await_termination.terminated
+ return terminated
+ else:
+ cmd.await_termination.no_timeout = True
Review Comment:
We don't really check this field on the server. We don't need to
'no_timeout'. It is implied when `timeout_ms` is not set.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]