Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10779#discussion_r50197399
  
    --- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
 ---
    @@ -17,19 +17,36 @@
     
     package org.apache.spark.streaming.scheduler
     
    -import java.util.concurrent.atomic.AtomicBoolean
    +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, 
SparkListenerEvent}
    +import org.apache.spark.util.ListenerBus
     
    -import org.apache.spark.Logging
    -import org.apache.spark.util.AsynchronousListenerBus
    +/**
    + * A Streaming listener bus to forward events to StreamingListeners. This 
one will wrap received
    + * Streaming events as WrappedStreamingListenerEvent and send them to 
Spark listener bus. It also
    + * registers itself with Spark listener bus, so that it can receive 
WrappedStreamingListenerEvents,
    + * unwrap them as StreamingListenerEvent and dispatch them to 
StreamingListeners.
    + */
    +private[streaming] class StreamingListenerBus(sparkListenerBus: 
LiveListenerBus)
    +  extends SparkListener with ListenerBus[StreamingListener, 
StreamingListenerEvent] {
     
    -/** Asynchronously passes StreamingListenerEvents to registered 
StreamingListeners. */
    -private[spark] class StreamingListenerBus
    -  extends AsynchronousListenerBus[StreamingListener, 
StreamingListenerEvent]("StreamingListenerBus")
    -  with Logging {
    +  /**
    +   * Post a StreamingListenerEvent to the Spark listener bus 
asynchronously. This event will be
    +   * dispatched to all StreamingListeners in the thread of the Spark 
listener bus.
    +   */
    +  def post(event: StreamingListenerEvent) {
    +    sparkListenerBus.post(new WrappedStreamingListenerEvent(event))
    +  }
     
    -  private val logDroppedEvent = new AtomicBoolean(false)
    +  override def onOtherEvent(event: SparkListenerEvent): Unit = {
    +    event match {
    +      case WrappedStreamingListenerEvent(e) =>
    +        postToAll(e)
    +      case _ =>
    +    }
    +  }
     
    -  override def onPostEvent(listener: StreamingListener, event: 
StreamingListenerEvent): Unit = {
    +  protected override def doPostEvent(
    +      listener: StreamingListener, event: StreamingListenerEvent): Unit = {
    --- End diff --
    
    style:
    ```
    protected override def doPostEvent(
        listener: StreamingListener,
        event: StreamingListenerEvent): Unit = {
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to