Github user andrewor14 commented on a diff in the pull request:
https://github.com/apache/spark/pull/10779#discussion_r50197399
--- Diff:
streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
---
@@ -17,19 +17,36 @@
package org.apache.spark.streaming.scheduler
-import java.util.concurrent.atomic.AtomicBoolean
+import org.apache.spark.scheduler.{LiveListenerBus, SparkListener,
SparkListenerEvent}
+import org.apache.spark.util.ListenerBus
-import org.apache.spark.Logging
-import org.apache.spark.util.AsynchronousListenerBus
+/**
+ * A Streaming listener bus to forward events to StreamingListeners. This
one will wrap received
+ * Streaming events as WrappedStreamingListenerEvent and send them to
Spark listener bus. It also
+ * registers itself with Spark listener bus, so that it can receive
WrappedStreamingListenerEvents,
+ * unwrap them as StreamingListenerEvent and dispatch them to
StreamingListeners.
+ */
+private[streaming] class StreamingListenerBus(sparkListenerBus:
LiveListenerBus)
+ extends SparkListener with ListenerBus[StreamingListener,
StreamingListenerEvent] {
-/** Asynchronously passes StreamingListenerEvents to registered
StreamingListeners. */
-private[spark] class StreamingListenerBus
- extends AsynchronousListenerBus[StreamingListener,
StreamingListenerEvent]("StreamingListenerBus")
- with Logging {
+ /**
+ * Post a StreamingListenerEvent to the Spark listener bus
asynchronously. This event will be
+ * dispatched to all StreamingListeners in the thread of the Spark
listener bus.
+ */
+ def post(event: StreamingListenerEvent) {
+ sparkListenerBus.post(new WrappedStreamingListenerEvent(event))
+ }
- private val logDroppedEvent = new AtomicBoolean(false)
+ override def onOtherEvent(event: SparkListenerEvent): Unit = {
+ event match {
+ case WrappedStreamingListenerEvent(e) =>
+ postToAll(e)
+ case _ =>
+ }
+ }
- override def onPostEvent(listener: StreamingListener, event:
StreamingListenerEvent): Unit = {
+ protected override def doPostEvent(
+ listener: StreamingListener, event: StreamingListenerEvent): Unit = {
--- End diff --
style:
```
protected override def doPostEvent(
listener: StreamingListener,
event: StreamingListenerEvent): Unit = {
}
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]