Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/19211#discussion_r139458303
--- Diff:
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
@@ -65,53 +60,76 @@ private[spark] class LiveListenerBus(conf: SparkConf)
extends SparkListenerBus {
/** When `droppedEventsCounter` was logged last time in milliseconds. */
@volatile private var lastReportTimestamp = 0L
- // Indicate if we are processing some event
- // Guarded by `self`
- private var processingEvent = false
-
- private val logDroppedEvent = new AtomicBoolean(false)
-
- // A counter that represents the number of events produced and consumed
in the queue
- private val eventLock = new Semaphore(0)
-
- private val listenerThread = new Thread(name) {
- setDaemon(true)
- override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
- LiveListenerBus.withinListenerThread.withValue(true) {
- val timer = metrics.eventProcessingTime
- while (true) {
- eventLock.acquire()
- self.synchronized {
- processingEvent = true
- }
- try {
- val event = eventQueue.poll
- if (event == null) {
- // Get out of the while loop and shutdown the daemon thread
- if (!stopped.get) {
- throw new IllegalStateException("Polling `null` from
eventQueue means" +
- " the listener bus has been stopped. So `stopped` must
be true")
- }
- return
- }
- val timerContext = timer.time()
- try {
- postToAll(event)
- } finally {
- timerContext.stop()
- }
- } finally {
- self.synchronized {
- processingEvent = false
- }
- }
+ private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()
+
+ /** Add a listener to queue shared by all non-internal listeners. */
+ def addToSharedQueue(listener: SparkListenerInterface): Unit = {
+ addToQueue(listener, SHARED_QUEUE)
+ }
+
+ /** Add a listener to the executor management queue. */
+ def addToManagementQueue(listener: SparkListenerInterface): Unit = {
+ addToQueue(listener, EXECUTOR_MGMT_QUEUE)
+ }
+
+ /** Add a listener to the application status queue. */
+ def addToStatusQueue(listener: SparkListenerInterface): Unit = {
+ addToQueue(listener, APP_STATUS_QUEUE)
+ }
+
+ /** Add a listener to the event log queue. */
+ def addToEventLogQueue(listener: SparkListenerInterface): Unit = {
+ addToQueue(listener, EVENT_LOG_QUEUE)
+ }
+
+ /**
+ * Add a listener to a specific queue, creating a new queue if needed.
Queues are independent
+ * of each other (each one uses a separate thread for delivering
events), allowing slower
+ * listeners to be somewhat isolated from others.
+ */
+ private def addToQueue(listener: SparkListenerInterface, queue: String):
Unit = synchronized {
+ if (stopped.get()) {
+ throw new IllegalStateException("LiveListenerBus is stopped.")
--- End diff --
No. It's ok and actually expected to add listeners before the bus is
started.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]