Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/10779#issuecomment-172727012
  
    On the streaming side, @andrewor14  and I talked offline and there can be 
cleaner design, with better abstractions. 
    
    Current design basically stores the list of StreamingListener in the 
SparkListenerBus (using the adaptor), and makes each StreamingListenerEvent 
extends a SparkListenerEvent. Since there is no StreamingListenerBus anymore, 
the abstraction is a little hard to understand on what gets posted where and 
who is calling the callbacks. Also the public API is being changed, which is 
also awkward - StreamingListener does not extend SparkListener but 
StreamingListenerEvent extends SparkListenerEvent.
    
    I think a better design is the following. The goal is simply for the 
existing StreamingListenerBus to not maintain its own thread and use the 
SparkListenerBus's thread to post everything, To do that all that needs to be 
done is for the StreamingListenerBus to forward the events into the 
SparkListenerBus. This can be done by the following. 
    
    
    ```
    class StreamingListenerForwardingBus(sparkListenerBus: SparkListenerBus) 
extends SparkListener {
         
       case class WrappedStreamingListenerEvent(streamingListenerEvent: 
StreamingListenerEvent) 
            extends SparkListenerEvent {
            protected[spark] override def logEvent: Boolean = false
       }
    
    
       private val listeners = new ArrayBuffer[StreamingListener]()
    
       sparkListenerBus.add(this)    // for getting callbacks on spark events
    
       def addListener(listener: StreamingListener) { listeners += listener }
    
       def post(event: StreamingListenerEvent) {
            sparkListenerBus.post(new WrappedStreamingListenerEvent(event))
       } 
    
        override def onOtherEvents(event: SparkListenerEvent) {
            event match {
                case WrappedStreamingListenerEvent(sle) => sle match {  .... 
                       // call listeners
                }
                case _ => 
            }
         }
    }
    
    
    class JobScheduler {
       ...
       val listenerBus = new FakeStreamingListenerBus(sparkContext.listenerBus)
    }
    ```
    
    This maintains the clean abstraction that streaming events get posted to 
streaming bus (internally forwarded to spark bus), AND does not require public 
API changes (streaming events do not have to extend spark events).
    
    What do you think?
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to