HyukjinKwon commented on code in PR #46929:
URL: https://github.com/apache/spark/pull/46929#discussion_r1701072593


##########
connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala:
##########
@@ -238,17 +238,52 @@ private[connect] class ExecuteThreadRunner(executeHolder: 
ExecuteHolder) extends
         }
         completed = true // no longer interruptible
 
-        if (executeHolder.reattachable) {
-          // Reattachable execution sends a ResultComplete at the end of the 
stream
-          // to signal that there isn't more coming.
-          executeHolder.responseObserver.onNextComplete(createResultComplete())
-        } else {
-          executeHolder.responseObserver.onCompleted()
+        // If the request starts a long running iterator (e.g. 
StreamingQueryListener needs
+        // a long-running iterator to continuously stream back events, it runs 
in a separate
+        // thread, and holds the responseObserver to send back the listener 
events.)
+        // In such cases, even after the ExecuteThread returns, we still want 
to keep the
+        // client side iterator open, i.e. don't send the ResultComplete to 
the client.
+        // So delegate the sending of the final ResultComplete to the listener 
thread itself.
+        if (!shouldDelegateCompleteResponse(executeHolder.request)) {
+          if (executeHolder.reattachable) {
+            // Reattachable execution sends a ResultComplete at the end of the 
stream
+            // to signal that there isn't more coming.
+            
executeHolder.responseObserver.onNextComplete(createResultComplete())
+          } else {
+            executeHolder.responseObserver.onCompleted()
+          }
         }
       }
     }
   }
 
+  /**
+   * Perform a check to see if we should delegate sending ResultCompelete. 
Currently, the
+   * ADD_LISTENER_BUS_LISTENER command creates a new thread and continuously 
streams back listener
+   * events to the client side StreamingQueryListenerBus. In this case, we 
would like to delegate
+   * the sending of the final ResultComplete to the handler thread itself.
+   * @param request
+   *   The request to check
+   * @return
+   *   True if we should delegate sending the final ResultComplete to the 
handler thread, i.e.
+   *   don't send a ResultComplete when the ExecuteThread returns.
+   */
+  private def shouldDelegateCompleteResponse(request: 
proto.ExecutePlanRequest): Boolean = {
+    request.getPlan.getOpTypeCase match {

Review Comment:
   I think it'd be easier with just if condition like:
   
   ```
   request.getPlan.getOpTypeCase == proto.Plan.OpTypeCase.COMMAND &&
     request.getPlan.getCommand.getCommandTypeCase == 
proto.Command.CommandTypeCase.STREAMING_QUERY_LISTENER_BUS_COMMAND &&
     
request.getPlan.getCommand.getStreamingQueryListenerBusCommand.getCommandCase 
== proto.StreamingQueryListenerBusCommand.CommandCase.ADD_LISTENER_BUS_LISTENER 
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to