rangadi commented on code in PR #40785:
URL: https://github.com/apache/spark/pull/40785#discussion_r1166067405


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2221,6 +2221,31 @@ class SparkConnectPlanner(val session: SparkSession) {
           .build()
         respBuilder.setExplain(explain)
 
+      case StreamingQueryCommand.CommandCase.EXCEPTION =>
+        val result = query.exception
+        val exception = result match {
+          case Some(e) =>
+            StreamingQueryCommandResult.ExceptionResult
+              .newBuilder()
+              .setHasException(true)
+              .setErrorMessage(SparkConnectService.extractErrorMessage(e))
+              .build()
+          case None =>
+            StreamingQueryCommandResult.ExceptionResult
+              .newBuilder()
+              .setHasException(false)
+              .build()
+        }
+        respBuilder.setException(exception)
+
+      case StreamingQueryCommand.CommandCase.AWAIT_TERMINATION =>
+        val terminated = 
query.awaitTermination(command.getAwaitTermination.getTimeoutMs)

Review Comment:
   Is this tested? default timeout here is '0', but that would be an error. 



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2221,6 +2221,31 @@ class SparkConnectPlanner(val session: SparkSession) {
           .build()
         respBuilder.setExplain(explain)
 
+      case StreamingQueryCommand.CommandCase.EXCEPTION =>
+        val result = query.exception
+        val exception = result match {
+          case Some(e) =>
+            StreamingQueryCommandResult.ExceptionResult
+              .newBuilder()
+              .setHasException(true)
+              .setErrorMessage(SparkConnectService.extractErrorMessage(e))
+              .build()
+          case None =>
+            StreamingQueryCommandResult.ExceptionResult
+              .newBuilder()
+              .setHasException(false)
+              .build()
+        }
+        respBuilder.setException(exception)
+
+      case StreamingQueryCommand.CommandCase.AWAIT_TERMINATION =>
+        val terminated = 
query.awaitTermination(command.getAwaitTermination.getTimeoutMs)
+        val terminatedResult = 
StreamingQueryCommandResult.AwaitTerminationResult
+          .newBuilder()
+          .setTerminated(terminated)
+          .build()
+        respBuilder.setAwaitTermination(terminatedResult)

Review Comment:
   A simpler way to do this (can be used above for exception as well):
   ```
   respBuilder
      .getAwaitTerminationBuilder
      .setTerminated(terminated)
   ```
   The above has the same effect. I recently realized. 



##########
python/pyspark/sql/connect/streaming/query.py:
##########
@@ -66,11 +70,35 @@ def isActive(self) -> bool:
 
     isActive.__doc__ = PySparkStreamingQuery.isActive.__doc__
 
-    # TODO (SPARK-42960): Implement and uncomment the doc
+    def _execute_await_termination_cmd(self, timeout: int = 10) -> bool:
+        cmd = pb2.StreamingQueryCommand()
+        cmd.await_termination.timeout_ms = timeout
+        terminated = 
self._execute_streaming_query_cmd(cmd).await_termination.terminated
+        return terminated
+
+    def _await_termination(self, timeoutMs: Optional[int]) -> Optional[bool]:
+        terminated = False
+        if timeoutMs is None:
+            while not terminated:
+                # When no timeout is set, query the server every 10ms until 
query terminates

Review Comment:
   Every 10 milliseconds? 



##########
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -298,6 +306,16 @@ message StreamingQueryCommandResult {
     // Logical and physical plans as string
     string result = 1;
   }
+
+  message ExceptionResult {
+    // Exception as string
+    bool has_exception = 1;
+    optional string error_message = 2;

Review Comment:
   Are two fields necessary? Please add comment with the rational.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to