Github user tdas commented on the pull request:
https://github.com/apache/spark/pull/10779#issuecomment-172727012
On the streaming side, @andrewor14 and I talked offline and there can be
cleaner design, with better abstractions.
Current design basically stores the list of StreamingListener in the
SparkListenerBus (using the adaptor), and makes each StreamingListenerEvent
extends a SparkListenerEvent. Since there is no StreamingListenerBus anymore,
the abstraction is a little hard to understand on what gets posted where and
who is calling the callbacks. Also the public API is being changed, which is
also awkward - StreamingListener does not extend SparkListener but
StreamingListenerEvent extends SparkListenerEvent.
I think a better design is the following. The goal is simply for the
existing StreamingListenerBus to not maintain its own thread and use the
SparkListenerBus's thread to post everything, To do that all that needs to be
done is for the StreamingListenerBus to forward the events into the
SparkListenerBus. This can be done by the following.
```
class StreamingListenerForwardingBus(sparkListenerBus: SparkListenerBus)
extends SparkListener {
case class WrappedStreamingListenerEvent(streamingListenerEvent:
StreamingListenerEvent)
extends SparkListenerEvent {
protected[spark] override def logEvent: Boolean = false
}
private val listeners = new ArrayBuffer[StreamingListener]()
sparkListenerBus.add(this) // for getting callbacks on spark events
def addListener(listener: StreamingListener) { listeners += listener }
def post(event: StreamingListenerEvent) {
sparkListenerBus.post(new WrappedStreamingListenerEvent(event))
}
override def onOtherEvents(event: SparkListenerEvent) {
event match {
case WrappedStreamingListenerEvent(sle) => sle match { ....
// call listeners
}
case _ =>
}
}
}
class JobScheduler {
...
val listenerBus = new FakeStreamingListenerBus(sparkContext.listenerBus)
}
```
This maintains the clean abstraction that streaming events get posted to
streaming bus (internally forwarded to spark bus), AND does not require public
API changes (streaming events do not have to extend spark events).
What do you think?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]