Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/21356#discussion_r189100255
--- Diff:
core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala ---
@@ -489,6 +489,38 @@ class SparkListenerSuite extends SparkFunSuite with
LocalSparkContext with Match
assert(bus.findListenersByClass[BasicJobCounter]().isEmpty)
}
+ test("interrupt within listener is handled correctly") {
+ val conf = new SparkConf(false)
+ .set(LISTENER_BUS_EVENT_QUEUE_CAPACITY, 5)
+ val bus = new LiveListenerBus(conf)
+ val counter1 = new BasicJobCounter()
+ val counter2 = new BasicJobCounter()
+ val interruptingListener = new InterruptingListener
+ bus.addToSharedQueue(counter1)
+ bus.addToSharedQueue(interruptingListener)
+ bus.addToStatusQueue(counter2)
+ assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE))
+ assert(bus.findListenersByClass[BasicJobCounter]().size === 2)
+
+ bus.start(mockSparkContext, mockMetricsSystem)
+
+ // after we post one event, the shared queue should get stopped
because of the interrupt
--- End diff --
s/get stopped/stop
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]