Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/19211#discussion_r139342382
--- Diff:
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
@@ -65,53 +60,76 @@ private[spark] class LiveListenerBus(conf: SparkConf)
extends SparkListenerBus {
/** When `droppedEventsCounter` was logged last time in milliseconds. */
@volatile private var lastReportTimestamp = 0L
- // Indicate if we are processing some event
- // Guarded by `self`
- private var processingEvent = false
-
- private val logDroppedEvent = new AtomicBoolean(false)
-
- // A counter that represents the number of events produced and consumed
in the queue
- private val eventLock = new Semaphore(0)
-
- private val listenerThread = new Thread(name) {
- setDaemon(true)
- override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
- LiveListenerBus.withinListenerThread.withValue(true) {
- val timer = metrics.eventProcessingTime
- while (true) {
- eventLock.acquire()
- self.synchronized {
- processingEvent = true
- }
- try {
- val event = eventQueue.poll
- if (event == null) {
- // Get out of the while loop and shutdown the daemon thread
- if (!stopped.get) {
- throw new IllegalStateException("Polling `null` from
eventQueue means" +
- " the listener bus has been stopped. So `stopped` must
be true")
- }
- return
- }
- val timerContext = timer.time()
- try {
- postToAll(event)
- } finally {
- timerContext.stop()
- }
- } finally {
- self.synchronized {
- processingEvent = false
- }
- }
+ private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()
+
+ /** Add a listener to queue shared by all non-internal listeners. */
+ def addToSharedQueue(listener: SparkListenerInterface): Unit = {
+ addToQueue(listener, SHARED_QUEUE)
+ }
+
+ /** Add a listener to the executor management queue. */
+ def addToManagementQueue(listener: SparkListenerInterface): Unit = {
+ addToQueue(listener, EXECUTOR_MGMT_QUEUE)
+ }
+
+ /** Add a listener to the application status queue. */
+ def addToStatusQueue(listener: SparkListenerInterface): Unit = {
+ addToQueue(listener, APP_STATUS_QUEUE)
+ }
+
+ /** Add a listener to the event log queue. */
+ def addToEventLogQueue(listener: SparkListenerInterface): Unit = {
+ addToQueue(listener, EVENT_LOG_QUEUE)
+ }
+
+ /**
+ * Add a listener to a specific queue, creating a new queue if needed.
Queues are independent
+ * of each other (each one uses a separate thread for delivering
events), allowing slower
+ * listeners to be somewhat isolated from others.
+ */
+ private def addToQueue(listener: SparkListenerInterface, queue: String):
Unit = synchronized {
+ if (stopped.get()) {
+ throw new IllegalStateException("LiveListenerBus is stopped.")
+ }
+
+ queues.asScala.find(_.name == queue) match {
+ case Some(queue) =>
+ queue.addListener(listener)
+
+ case None =>
+ val newQueue = new AsyncEventQueue(queue, conf, metrics)
+ newQueue.addListener(listener)
+ if (started.get() && !stopped.get()) {
+ newQueue.start(sparkContext)
}
- }
+ queues.add(newQueue)
}
}
- override protected def getTimer(listener: SparkListenerInterface):
Option[Timer] = {
-
metrics.getTimerForListenerClass(listener.getClass.asSubclass(classOf[SparkListenerInterface]))
+ def removeListener(listener: SparkListenerInterface): Unit =
synchronized {
+ // Remove listener from all queues it was added to, and stop queues
that have become empty.
+ queues.asScala
+ .filter { queue =>
+ queue.removeListener(listener)
+ queue.listeners.isEmpty()
+ }
+ .foreach { toRemove =>
+ if (started.get() && !stopped.get()) {
+ toRemove.stop()
--- End diff --
this logic is a little weird, why we stop the queue when there is no
listeners? is it possible that we may add listeners back to this queue later?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]