Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/19211#discussion_r139458812
--- Diff:
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
@@ -65,53 +60,76 @@ private[spark] class LiveListenerBus(conf: SparkConf)
extends SparkListenerBus {
/** When `droppedEventsCounter` was logged last time in milliseconds. */
@volatile private var lastReportTimestamp = 0L
- // Indicate if we are processing some event
- // Guarded by `self`
- private var processingEvent = false
-
- private val logDroppedEvent = new AtomicBoolean(false)
-
- // A counter that represents the number of events produced and consumed
in the queue
- private val eventLock = new Semaphore(0)
-
- private val listenerThread = new Thread(name) {
- setDaemon(true)
- override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
- LiveListenerBus.withinListenerThread.withValue(true) {
- val timer = metrics.eventProcessingTime
- while (true) {
- eventLock.acquire()
- self.synchronized {
- processingEvent = true
- }
- try {
- val event = eventQueue.poll
- if (event == null) {
- // Get out of the while loop and shutdown the daemon thread
- if (!stopped.get) {
- throw new IllegalStateException("Polling `null` from
eventQueue means" +
- " the listener bus has been stopped. So `stopped` must
be true")
- }
- return
- }
- val timerContext = timer.time()
- try {
- postToAll(event)
- } finally {
- timerContext.stop()
- }
- } finally {
- self.synchronized {
- processingEvent = false
- }
- }
+ private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()
+
+ /** Add a listener to queue shared by all non-internal listeners. */
+ def addToSharedQueue(listener: SparkListenerInterface): Unit = {
+ addToQueue(listener, SHARED_QUEUE)
+ }
+
+ /** Add a listener to the executor management queue. */
+ def addToManagementQueue(listener: SparkListenerInterface): Unit = {
+ addToQueue(listener, EXECUTOR_MGMT_QUEUE)
+ }
+
+ /** Add a listener to the application status queue. */
+ def addToStatusQueue(listener: SparkListenerInterface): Unit = {
+ addToQueue(listener, APP_STATUS_QUEUE)
+ }
+
+ /** Add a listener to the event log queue. */
+ def addToEventLogQueue(listener: SparkListenerInterface): Unit = {
+ addToQueue(listener, EVENT_LOG_QUEUE)
+ }
+
+ /**
+ * Add a listener to a specific queue, creating a new queue if needed.
Queues are independent
+ * of each other (each one uses a separate thread for delivering
events), allowing slower
+ * listeners to be somewhat isolated from others.
+ */
+ private def addToQueue(listener: SparkListenerInterface, queue: String):
Unit = synchronized {
+ if (stopped.get()) {
+ throw new IllegalStateException("LiveListenerBus is stopped.")
+ }
+
+ queues.asScala.find(_.name == queue) match {
+ case Some(queue) =>
+ queue.addListener(listener)
+
+ case None =>
+ val newQueue = new AsyncEventQueue(queue, conf, metrics)
+ newQueue.addListener(listener)
+ if (started.get() && !stopped.get()) {
+ newQueue.start(sparkContext)
}
- }
+ queues.add(newQueue)
}
}
- override protected def getTimer(listener: SparkListenerInterface):
Option[Timer] = {
-
metrics.getTimerForListenerClass(listener.getClass.asSubclass(classOf[SparkListenerInterface]))
+ def removeListener(listener: SparkListenerInterface): Unit =
synchronized {
+ // Remove listener from all queues it was added to, and stop queues
that have become empty.
+ queues.asScala
+ .filter { queue =>
+ queue.removeListener(listener)
+ queue.listeners.isEmpty()
+ }
+ .foreach { toRemove =>
+ if (started.get() && !stopped.get()) {
+ toRemove.stop()
--- End diff --
That would waste resources in the normal case.
The default configuration would never add anything to the default and event
log queues, so you'd have two threads dispatching events to no listeners.
Stopping the queue when it empties, while a rare case, handles when users
add listeners and later remove them, potentially leaving the default queue
empty. Singling out the default queue here just complicates the code.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]