Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/19211#discussion_r139199768
--- Diff:
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
@@ -65,53 +60,61 @@ private[spark] class LiveListenerBus(conf: SparkConf)
extends SparkListenerBus {
/** When `droppedEventsCounter` was logged last time in milliseconds. */
@volatile private var lastReportTimestamp = 0L
- // Indicate if we are processing some event
- // Guarded by `self`
- private var processingEvent = false
-
- private val logDroppedEvent = new AtomicBoolean(false)
-
- // A counter that represents the number of events produced and consumed
in the queue
- private val eventLock = new Semaphore(0)
-
- private val listenerThread = new Thread(name) {
- setDaemon(true)
- override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
- LiveListenerBus.withinListenerThread.withValue(true) {
- val timer = metrics.eventProcessingTime
- while (true) {
- eventLock.acquire()
- self.synchronized {
- processingEvent = true
- }
- try {
- val event = eventQueue.poll
- if (event == null) {
- // Get out of the while loop and shutdown the daemon thread
- if (!stopped.get) {
- throw new IllegalStateException("Polling `null` from
eventQueue means" +
- " the listener bus has been stopped. So `stopped` must
be true")
- }
- return
- }
- val timerContext = timer.time()
- try {
- postToAll(event)
- } finally {
- timerContext.stop()
- }
- } finally {
- self.synchronized {
- processingEvent = false
- }
- }
+ private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()
--- End diff --
I can add specific methods for each internal queue; but I'd like to keep
the internal management of queues more generic. One of the ideas in #18253 was
to allow filtering of which events are enqueued at all (e.g. don't enqueue
`SparkListenerBlockUpdated` because it's not written to the event logs,
reducing the load on the event log queue). Leaving the internal management more
generic would allow that to be more easily / cleanly implemented later (instead
of "addToEventLogQueue", you'd have a "addCustomQueue" method with a subclass
of `AsyncEventQueue`).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]