mridulm commented on code in PR #45367:
URL: https://github.com/apache/spark/pull/45367#discussion_r1562045723


##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -1014,6 +1014,16 @@ package object config {
       .timeConf(TimeUnit.NANOSECONDS)
       .createWithDefaultString("1s")
 
+  private[spark] val 
LISTENER_BUS_EVENT_QUEUE_EVENT_DISPATCH_EXIT_WAITING_TIME_ON_STOP =
+    
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.eventDispatchExitWaitingTimeOnStop")

Review Comment:
   ```suggestion
       ConfigBuilder("spark.scheduler.listenerbus.exitTimeout")
   ```



##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -1014,6 +1014,16 @@ package object config {
       .timeConf(TimeUnit.NANOSECONDS)
       .createWithDefaultString("1s")
 
+  private[spark] val 
LISTENER_BUS_EVENT_QUEUE_EVENT_DISPATCH_EXIT_WAITING_TIME_ON_STOP =

Review Comment:
   ```suggestion
     private[spark] val LISTENER_BUS_EXIT_TIMEOUT =
   ```



##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -1014,6 +1014,16 @@ package object config {
       .timeConf(TimeUnit.NANOSECONDS)
       .createWithDefaultString("1s")
 
+  private[spark] val 
LISTENER_BUS_EVENT_QUEUE_EVENT_DISPATCH_EXIT_WAITING_TIME_ON_STOP =
+    
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.eventDispatchExitWaitingTimeOnStop")
+      .doc("The time that event queue waits until the dispatch thread exits " +
+        "when stop is invoked. " +
+        "This is set to 0 by default for graceful shutdown of the event queue, 
" +
+        "but allow the user to configure the waiting time.")
+      .version("4.0.0")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefault(0)

Review Comment:
   ```suggestion
         .checkValue(_ >= 0, "Listener bus exit timeout must be non-negative 
duration")
         .createWithDefault(0)
   ```



##########
core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala:
##########
@@ -142,10 +142,17 @@ private class AsyncEventQueue(
       eventCount.incrementAndGet()
       eventQueue.put(POISON_PILL)
     }
-    // this thread might be trying to stop itself as part of error handling -- 
we can't join
-    // in that case.
-    if (Thread.currentThread() != dispatchThread) {
-      dispatchThread.join()
+    // 1. This thread might be trying to stop itself as part of error handling 
-- we can't join
+    //    in that case.
+    // 2. If users don't want to wait for the dispatch to end until all events 
are drained,
+    //    they can control the waiting time by themselves
+    //    or omit the thread join by set the waiting time to a negative value.
+    val waitingTimeMs =
+      
conf.get(LISTENER_BUS_EVENT_QUEUE_EVENT_DISPATCH_EXIT_WAITING_TIME_ON_STOP)
+    if (waitingTimeMs >= 0 && Thread.currentThread() != dispatchThread) {
+      // By default, the `waitingTimeMs` is set to 0,
+      // which means it will wait until all events are drained.
+      dispatchThread.join(waitingTimeMs)
     }

Review Comment:
   ```suggestion
       if (Thread.currentThread() != dispatchThread) {
         // By default, the `waitingTimeMs` is set to 0,
         // which means it will wait until all events are drained.
         dispatchThread.join(waitingTimeMs)
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to