Re: Calling stop on StreamingContext locks up

2015-11-08 Thread vonnagy
Hi Ted,

Your fix addresses the issue for me. Thanks again for your help and I saw
the PR you submitted to Master.

Ivan



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Calling-stop-on-StreamingContext-locks-up-tp15063p15073.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Calling stop on StreamingContext locks up

2015-11-07 Thread Ted Yu
Would the following change work for you ?

diff --git
a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
index 61b5a4c..c330d25 100644
---
a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
+++
b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
@@ -66,6 +66,7 @@ private[spark] abstract class AsynchronousListenerBus[L
<: AnyRef, E](name: Stri
 self.synchronized {
   processingEvent = true
 }
+if (stopped.get()) return
 try {
   val event = eventQueue.poll
   if (event == null) {

On Sat, Nov 7, 2015 at 12:17 PM, vonnagy <i...@vadio.com> wrote:

> If I have a streaming job (Spark 1.5.1) and attempt to stop the stream
> after
> the first batch, the system locks up and never completes. The pseudo code
> below shows that after the batch complete notification is called the stream
> is stopped. I have traced the lockup to the call `listener.stop()`in
> JobScheduler (line 114) which attempts to join the thread in
> AsynchronousListenerBus. That thread never ends because it is still getting
> messages `SparkListenerExecutorMetricsUpdate` from the DAGScheduler. The
> thread never ends because the events continue to come in.
>
> Any thoughts/ideas on how I can effectively stop the stream after the first
> batch would greatly appreciated.
>
> Psuedo Example:
>
> class SomeJob {
>
> val ssc = createStreamingContext()
> val listener = new MyListener(ssc)
> ssc.addStreamingListener(listener)
>
> val stream = getStream
>
> stream.foreachRDD { rdd =>
> // Do something with the data
> }
> }
>
> class MyListener(ctx: StreamingContext) extends StreamingListener {
> override def onBatchCompleted(batchCompleted:
> StreamingListenerBatchCompleted) = synchronized {
> ctx.stop(false, false)
> // NOTE: I get the same results with ctx.stop(), ctx.stop(true),
> ctx.stop(true, true), or ctx.stop(false, false)
> }
> }
>
>
>
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/Calling-stop-on-StreamingContext-locks-up-tp15063.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>
>


Calling stop on StreamingContext locks up

2015-11-07 Thread vonnagy
If I have a streaming job (Spark 1.5.1) and attempt to stop the stream after
the first batch, the system locks up and never completes. The pseudo code
below shows that after the batch complete notification is called the stream
is stopped. I have traced the lockup to the call `listener.stop()`in
JobScheduler (line 114) which attempts to join the thread in
AsynchronousListenerBus. That thread never ends because it is still getting
messages `SparkListenerExecutorMetricsUpdate` from the DAGScheduler. The
thread never ends because the events continue to come in.

Any thoughts/ideas on how I can effectively stop the stream after the first
batch would greatly appreciated.

Psuedo Example:

class SomeJob {

val ssc = createStreamingContext()
val listener = new MyListener(ssc)
ssc.addStreamingListener(listener)

val stream = getStream

stream.foreachRDD { rdd =>
// Do something with the data
}
}

class MyListener(ctx: StreamingContext) extends StreamingListener {
override def onBatchCompleted(batchCompleted:
StreamingListenerBatchCompleted) = synchronized {
ctx.stop(false, false)
// NOTE: I get the same results with ctx.stop(), ctx.stop(true),
ctx.stop(true, true), or ctx.stop(false, false)
}
}



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Calling-stop-on-StreamingContext-locks-up-tp15063.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org