Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19211#discussion_r139748516
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -173,80 +159,62 @@ private[spark] class LiveListenerBus(conf: SparkConf) 
extends SparkListenerBus {
        */
       @throws(classOf[TimeoutException])
       def waitUntilEmpty(timeoutMillis: Long): Unit = {
    -    val finishTime = System.currentTimeMillis + timeoutMillis
    -    while (!queueIsEmpty) {
    -      if (System.currentTimeMillis > finishTime) {
    -        throw new TimeoutException(
    -          s"The event queue is not empty after $timeoutMillis 
milliseconds")
    +    val deadline = System.currentTimeMillis + timeoutMillis
    +    queues.asScala.foreach { queue =>
    +      if (!queue.waitUntilEmpty(deadline)) {
    +        throw new TimeoutException(s"The event queue is not empty after 
$timeoutMillis ms.")
           }
    -      /* Sleep rather than using wait/notify, because this is used only 
for testing and
    -       * wait/notify add overhead in the general case. */
    -      Thread.sleep(10)
         }
       }
     
       /**
    -   * For testing only. Return whether the listener daemon thread is still 
alive.
    -   * Exposed for testing.
    -   */
    -  def listenerThreadIsAlive: Boolean = listenerThread.isAlive
    -
    -  /**
    -   * Return whether the event queue is empty.
    -   *
    -   * The use of synchronized here guarantees that all events that once 
belonged to this queue
    -   * have already been processed by all attached listeners, if this 
returns true.
    -   */
    -  private def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty && 
!processingEvent }
    -
    -  /**
        * Stop the listener bus. It will wait until the queued events have been 
processed, but drop the
        * new events after stopping.
        */
       def stop(): Unit = {
         if (!started.get()) {
    -      throw new IllegalStateException(s"Attempted to stop $name that has 
not yet started!")
    +      throw new IllegalStateException(s"Attempted to stop bus that has not 
yet started!")
         }
    -    if (stopped.compareAndSet(false, true)) {
    -      // Call eventLock.release() so that listenerThread will poll `null` 
from `eventQueue` and know
    -      // `stop` is called.
    -      eventLock.release()
    -      listenerThread.join()
    -    } else {
    -      // Keep quiet
    +
    +    if (!stopped.compareAndSet(false, true)) {
    +      return
         }
    -  }
     
    -  /**
    -   * If the event queue exceeds its capacity, the new events will be 
dropped. The subclasses will be
    -   * notified with the dropped events.
    -   *
    -   * Note: `onDropEvent` can be called in any thread.
    -   */
    -  def onDropEvent(event: SparkListenerEvent): Unit = {
    -    metrics.numDroppedEvents.inc()
    -    droppedEventsCounter.incrementAndGet()
    -    if (logDroppedEvent.compareAndSet(false, true)) {
    -      // Only log the following message once to avoid duplicated annoying 
logs.
    -      logError("Dropping SparkListenerEvent because no remaining room in 
event queue. " +
    -        "This likely means one of the SparkListeners is too slow and 
cannot keep up with " +
    -        "the rate at which tasks are being started by the scheduler.")
    +    synchronized {
    +      queues.asScala.foreach(_.stop())
    +      queues.clear()
         }
    -    logTrace(s"Dropping event $event")
       }
    +
    +  private[spark] def findListenersByClass[T <: SparkListenerInterface : 
ClassTag](): Seq[T] = {
    +    queues.asScala.flatMap { queue => queue.findListenersByClass[T]() }
    +  }
    +
    +  private[spark] def listeners: JList[SparkListenerInterface] = {
    --- End diff --
    
    Used by tests.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to