Github user bOOm-X commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18253#discussion_r132593478
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -39,98 +40,114 @@ import org.apache.spark.util.Utils
      * has started will events be actually propagated to all attached 
listeners. This listener bus
      * is stopped when `stop()` is called, and it will drop further events 
after stopping.
      */
    -private[spark] class LiveListenerBus(conf: SparkConf) extends 
SparkListenerBus {
    -
    -  self =>
    +private[spark] class LiveListenerBus(conf: SparkConf)
    +  extends WithListenerBus[SparkListenerInterface, SparkListenerEvent] with 
Logging{
     
       import LiveListenerBus._
     
       private var sparkContext: SparkContext = _
    +  private var metricsSystem: MetricsSystem = _
     
    -  // Cap the capacity of the event queue so we get an explicit error 
(rather than
    -  // an OOM exception) if it's perpetually being added to more quickly 
than it's being drained.
    -  private val eventQueue =
    -    new 
LinkedBlockingQueue[SparkListenerEvent](conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))
    +  private val defaultListenerPool = GroupOfListener("default")
    +  private val defaultListenerQueue = BusQueue(
    +    conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY),
    +    defaultListenerPool,
    +    BusQueue.ALL_MESSAGES)
     
    -  private[spark] val metrics = new LiveListenerBusMetrics(conf, eventQueue)
    +  @volatile private var otherListenerQueues = Seq.empty[BusQueue]
     
    -  // Indicate if `start()` is called
    -  private val started = new AtomicBoolean(false)
    -  // Indicate if `stop()` is called
    +  // start, stop and add/remove listener should be mutually exclusive
    +  private val startStopAddRemoveLock = new ReentrantLock()
    +  // Will be set modified in a synchronized function
    +  @volatile private var started = false
       private val stopped = new AtomicBoolean(false)
     
    -  /** A counter for dropped events. It will be reset every time we log it. 
*/
    -  private val droppedEventsCounter = new AtomicLong(0L)
    -
    -  /** When `droppedEventsCounter` was logged last time in milliseconds. */
    -  @volatile private var lastReportTimestamp = 0L
    -
    -  // Indicate if we are processing some event
    -  // Guarded by `self`
    -  private var processingEvent = false
    -
    -  private val logDroppedEvent = new AtomicBoolean(false)
    +   /**
    +    * if isolatedIfPossible is true, add the listener to an isolated pool.
    +    * Otherwise add it to the default pool.
    +    * This method is thread-safe and can be called in any thread.
    +    */
    +  final override def addListener(listener: SparkListenerInterface,
    +                                 isolatedIfPossible: Boolean): Unit = {
    +    startStopAddRemoveLock.lock()
    +    Try{
    +      if (isolatedIfPossible) {
    +         addQueue(BusQueue(
    +          conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY),
    +          listener,
    +          BusQueue.ALL_MESSAGES))
    +      } else {
    +        defaultListenerPool.addListener(listener)
    +      }
    +    }
    +    startStopAddRemoveLock.unlock()
    +  }
     
    -  // A counter that represents the number of events produced and consumed 
in the queue
    -  private val eventLock = new Semaphore(0)
    +   /**
    +    * Add a generic listener to an isolated pool.
    +    */
    +  def addProcessor(processor: SparkListenerEvent => Unit,
    --- End diff --
    
    It is just a small technical refactoring which came for almost free with 
the new qeue object. It is also very convenient to be able to handle the 
asynchronous LiveListenerBus in the test.  I think that we can keep it in this 
PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to