dydeve commented on a change in pull request #24676: [SPARK-27804]
LiveListenerBus may create multiple AsyncEventQueues with same name in
concurrent scene
URL: https://github.com/apache/spark/pull/24676#discussion_r286612817
##########
File path: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
##########
@@ -86,44 +87,61 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
}
/**
- * Add a listener to a specific queue, creating a new queue if needed.
Queues are independent
- * of each other (each one uses a separate thread for delivering events),
allowing slower
- * listeners to be somewhat isolated from others.
- */
+ * Add a listener to a specific queue, creating a new queue if needed.
Queues are independent
+ * of each other (each one uses a separate thread for delivering events),
allowing slower
+ * listeners to be somewhat isolated from others.
+ *
+ * @param listener
+ * @param name name of AsyncEventQueue, @see [[AsyncEventQueue.name]] for
more
+ */
private[spark] def addToQueue(
listener: SparkListenerInterface,
- queue: String): Unit = synchronized {
Review comment:
Thak you. I ignored the word `synchronized`,I am careless.
And I know the reason of using CopyOnWriteArrayList instead of ArrayList is
to avoid `ConcurrentModificationException`.
How about improving the usage of synchronized ?
```scala
private[spark] def addToQueue(
listener: SparkListenerInterface,
queue: String): Unit = {
if (stopped.get()) {//read barrier
throw new IllegalStateException("LiveListenerBus is stopped.")
}
queues.asScala.find(_.name == queue) match {
case Some(queue) =>
queue.addListener(listener)
case None =>
this.synchronized {
queues.asScala.find(_.name == queue) match {
case Some(queue) =>
queue.addListener(listener)
case None =>
val newQueue = new AsyncEventQueue(queue, conf, metrics, this)
newQueue.addListener(listener)
if (started.get()) {
newQueue.start(sparkContext)
}
queues.add(newQueue)
}
}
}
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]