Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/21356#discussion_r189152423
--- Diff:
core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala ---
@@ -97,6 +98,11 @@ private class AsyncEventQueue(val name: String, conf:
SparkConf, metrics: LiveLi
} catch {
case ie: InterruptedException =>
logInfo(s"Stopping listener queue $name.", ie)
+ stopped.set(true)
--- End diff --
The old bus would stop the entire spark context (details below).
I dunno what the right behavior is either -- I figured this was the
intention given the logInfo. Alternatively we could (a) stop the entire spark
context, (b) skip this particular event and keep going or (c) stop the one
listener which happened to be active on the interrupt, but keep the queue
active (if there were more listeners).
more details on the 2.2 behavior:
[`ListenerBus.postToAll`](https://github.com/apache/spark/blob/branch-2.2/core/src/main/scala/org/apache/spark/util/ListenerBus.scala#L55)
wouldn't catch the event.
And the polling thread in
[`LiveListenerBus`](https://github.com/apache/spark/blob/branch-2.2/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L77)
wraps everything in `Utils.tryOrStopSparkContext`.
I did a similar test on branch-2.2:
https://github.com/squito/spark/commit/72951bd69fca0c58c8a8b202ca59e167ebb5d71b
```
18/05/17 21:38:23.446 SparkListenerBus ERROR Utils: uncaught error in
thread SparkListenerBus, stopping SparkContext
java.lang.InterruptedException
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1302)
at java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
at
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(LiveListenerBus.scala:80)
at
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
at
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:78)
at
org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1282)
at
org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:77)
18/05/17 21:38:23.448 SparkListenerBus ERROR Utils: throw uncaught fatal
error in thread SparkListenerBus
java.lang.InterruptedException
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1302)
at java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
at
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(LiveListenerBus.scala:80)
at
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
at
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:78)
at
org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1282)
at
org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:77)
```
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]