Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21356#discussion_r189100255
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala ---
    @@ -489,6 +489,38 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
         assert(bus.findListenersByClass[BasicJobCounter]().isEmpty)
       }
     
    +  test("interrupt within listener is handled correctly") {
    +    val conf = new SparkConf(false)
    +      .set(LISTENER_BUS_EVENT_QUEUE_CAPACITY, 5)
    +    val bus = new LiveListenerBus(conf)
    +    val counter1 = new BasicJobCounter()
    +    val counter2 = new BasicJobCounter()
    +    val interruptingListener = new InterruptingListener
    +    bus.addToSharedQueue(counter1)
    +    bus.addToSharedQueue(interruptingListener)
    +    bus.addToStatusQueue(counter2)
    +    assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE))
    +    assert(bus.findListenersByClass[BasicJobCounter]().size === 2)
    +
    +    bus.start(mockSparkContext, mockMetricsSystem)
    +
    +    // after we post one event, the shared queue should get stopped 
because of the interrupt
    --- End diff --
    
    s/get stopped/stop


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to