squito commented on a change in pull request #26924: [SPARK-30285][CORE]Fix 
race condition between LiveListenerBus#stop and 
AsyncEventQueue#removeListenerOnError
URL: https://github.com/apache/spark/pull/26924#discussion_r359032665
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala
 ##########
 @@ -201,10 +201,24 @@ private class AsyncEventQueue(
     true
   }
 
+  override def doPostEvent(listener: SparkListenerInterface, event: 
SparkListenerEvent): Unit = {
+    // If listener is dead, we don't post any event to it.
+    if (!listener.dead) {
+      super.doPostEvent(listener, event)
+    }
+  }
+
   override def removeListenerOnError(listener: SparkListenerInterface): Unit = 
{
-    // the listener failed in an unrecoverably way, we want to remove it from 
the entire
-    // LiveListenerBus (potentially stopping a queue if it is empty)
-    bus.removeListener(listener)
+    if (bus.isInStop) {
+      // If bus is in the progress of stop, we just mark the listener as dead 
instead of removing
+      // via calling `bus.removeListener` to avoid race condition
+      // dead listeners will be removed eventually in `bus.stop`
 
 Review comment:
   some grammar nits:
   
   If we're in the middle of stopping the bus, we just mark the listener as 
dead,
   instead of removing, to avoid a deadlock.
   Dead listeners will be removed eventually in `bus.stop`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to