Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/19211#discussion_r139187703
--- Diff:
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
@@ -65,53 +60,61 @@ private[spark] class LiveListenerBus(conf: SparkConf)
extends SparkListenerBus {
/** When `droppedEventsCounter` was logged last time in milliseconds. */
@volatile private var lastReportTimestamp = 0L
- // Indicate if we are processing some event
- // Guarded by `self`
- private var processingEvent = false
-
- private val logDroppedEvent = new AtomicBoolean(false)
-
- // A counter that represents the number of events produced and consumed
in the queue
- private val eventLock = new Semaphore(0)
-
- private val listenerThread = new Thread(name) {
- setDaemon(true)
- override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
- LiveListenerBus.withinListenerThread.withValue(true) {
- val timer = metrics.eventProcessingTime
- while (true) {
- eventLock.acquire()
- self.synchronized {
- processingEvent = true
- }
- try {
- val event = eventQueue.poll
- if (event == null) {
- // Get out of the while loop and shutdown the daemon thread
- if (!stopped.get) {
- throw new IllegalStateException("Polling `null` from
eventQueue means" +
- " the listener bus has been stopped. So `stopped` must
be true")
- }
- return
- }
- val timerContext = timer.time()
- try {
- postToAll(event)
- } finally {
- timerContext.stop()
- }
- } finally {
- self.synchronized {
- processingEvent = false
- }
- }
+ private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()
--- End diff --
can we have something like
```
private val defaultQueue = ...
private val executorManagementQueue = ...
...
```
I think the number of queues will be small.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]