Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/19211#discussion_r139748516
--- Diff:
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
@@ -173,80 +159,62 @@ private[spark] class LiveListenerBus(conf: SparkConf)
extends SparkListenerBus {
*/
@throws(classOf[TimeoutException])
def waitUntilEmpty(timeoutMillis: Long): Unit = {
- val finishTime = System.currentTimeMillis + timeoutMillis
- while (!queueIsEmpty) {
- if (System.currentTimeMillis > finishTime) {
- throw new TimeoutException(
- s"The event queue is not empty after $timeoutMillis
milliseconds")
+ val deadline = System.currentTimeMillis + timeoutMillis
+ queues.asScala.foreach { queue =>
+ if (!queue.waitUntilEmpty(deadline)) {
+ throw new TimeoutException(s"The event queue is not empty after
$timeoutMillis ms.")
}
- /* Sleep rather than using wait/notify, because this is used only
for testing and
- * wait/notify add overhead in the general case. */
- Thread.sleep(10)
}
}
/**
- * For testing only. Return whether the listener daemon thread is still
alive.
- * Exposed for testing.
- */
- def listenerThreadIsAlive: Boolean = listenerThread.isAlive
-
- /**
- * Return whether the event queue is empty.
- *
- * The use of synchronized here guarantees that all events that once
belonged to this queue
- * have already been processed by all attached listeners, if this
returns true.
- */
- private def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty &&
!processingEvent }
-
- /**
* Stop the listener bus. It will wait until the queued events have been
processed, but drop the
* new events after stopping.
*/
def stop(): Unit = {
if (!started.get()) {
- throw new IllegalStateException(s"Attempted to stop $name that has
not yet started!")
+ throw new IllegalStateException(s"Attempted to stop bus that has not
yet started!")
}
- if (stopped.compareAndSet(false, true)) {
- // Call eventLock.release() so that listenerThread will poll `null`
from `eventQueue` and know
- // `stop` is called.
- eventLock.release()
- listenerThread.join()
- } else {
- // Keep quiet
+
+ if (!stopped.compareAndSet(false, true)) {
+ return
}
- }
- /**
- * If the event queue exceeds its capacity, the new events will be
dropped. The subclasses will be
- * notified with the dropped events.
- *
- * Note: `onDropEvent` can be called in any thread.
- */
- def onDropEvent(event: SparkListenerEvent): Unit = {
- metrics.numDroppedEvents.inc()
- droppedEventsCounter.incrementAndGet()
- if (logDroppedEvent.compareAndSet(false, true)) {
- // Only log the following message once to avoid duplicated annoying
logs.
- logError("Dropping SparkListenerEvent because no remaining room in
event queue. " +
- "This likely means one of the SparkListeners is too slow and
cannot keep up with " +
- "the rate at which tasks are being started by the scheduler.")
+ synchronized {
+ queues.asScala.foreach(_.stop())
+ queues.clear()
}
- logTrace(s"Dropping event $event")
}
+
+ private[spark] def findListenersByClass[T <: SparkListenerInterface :
ClassTag](): Seq[T] = {
+ queues.asScala.flatMap { queue => queue.findListenersByClass[T]() }
+ }
+
+ private[spark] def listeners: JList[SparkListenerInterface] = {
--- End diff --
Used by tests.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]