juliuszsompolski commented on code in PR #46929:
URL: https://github.com/apache/spark/pull/46929#discussion_r1700355954


##########
connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala:
##########
@@ -238,17 +238,45 @@ private[connect] class ExecuteThreadRunner(executeHolder: 
ExecuteHolder) extends
         }
         completed = true // no longer interruptible
 
-        if (executeHolder.reattachable) {
-          // Reattachable execution sends a ResultComplete at the end of the 
stream
-          // to signal that there isn't more coming.
-          executeHolder.responseObserver.onNextComplete(createResultComplete())
-        } else {
-          executeHolder.responseObserver.onCompleted()
+        // If the request returns a long running iterator (e.g. 
StreamingQueryListener needs
+        // a long-running iterator to continuously stream back events),
+        // we delegate the sending of the final ResultComplete to the handler 
itself.

Review Comment:
   it's not only the final ResultComplete, it's also all the events that are 
sent from the listener thread?
   Maybe mention this in the comment here, that the execution thread exits, but 
more replies can be sent from somewhere else (background?) and then the result 
complete is also delegated there.



##########
connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala:
##########
@@ -238,17 +238,45 @@ private[connect] class ExecuteThreadRunner(executeHolder: 
ExecuteHolder) extends
         }
         completed = true // no longer interruptible
 
-        if (executeHolder.reattachable) {
-          // Reattachable execution sends a ResultComplete at the end of the 
stream
-          // to signal that there isn't more coming.
-          executeHolder.responseObserver.onNextComplete(createResultComplete())
-        } else {
-          executeHolder.responseObserver.onCompleted()
+        // If the request returns a long running iterator (e.g. 
StreamingQueryListener needs
+        // a long-running iterator to continuously stream back events),
+        // we delegate the sending of the final ResultComplete to the handler 
itself.
+        if (!createsLongRunningIterator(executeHolder.request)) {
+          if (executeHolder.reattachable) {
+            // Reattachable execution sends a ResultComplete at the end of the 
stream
+            // to signal that there isn't more coming.
+            
executeHolder.responseObserver.onNextComplete(createResultComplete())
+          } else {
+            executeHolder.responseObserver.onCompleted()
+          }
         }
       }
     }
   }
 
+  /**
+   * Perform a check to see if the request creates a long running iterator. 
Currently, only the
+   * ADD_LISTENER_BUS_LISTENER command creates a long running iterator. This 
is used to
+   * continuously stream back events to the client side StreamingQueryListener.
+   * @param request The request to check
+   * @return True if the iterator is long running
+   */
+  private def createsLongRunningIterator(request: proto.ExecutePlanRequest): 
Boolean = {

Review Comment:
   nit: maybe "Background" instead of "LongRunning" could be a better name, 
because essentially it means that it runs from somewhere in the background, not 
using an execution thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to