Re: Invocation of StreamingContext.stop() hangs in 1.5
Hi, Thanks to Ted Vu and Nilanjan. Stopping the streaming context asynchronously did the trick! Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Invocation-of-StreamingContext-stop-hangs-in-1-5-tp25402p25434.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Invocation of StreamingContext.stop() hangs in 1.5
Hi, We're using Spark 1.5 streaming. We've a use case where we need to stop an existing StreamingContext and start a new one primarily to handle a newly added partition to Kafka topic by creating a new Kafka DStream in the context of the new StreamingContext. We've implemented "StreamingListener" trait and invoking "StreamingContext.stop(false, false)" in "onBatchCompleted" event. In order not to stop the underlying SparkContext, we've specified ""spark.streaming.stopSparkContextByDefault" to false. The above invocation never returns. Here is the partial stack trace of the JVM "StreamingListenerBus" daemon prio=10 tid=0x7fcc1000e800 nid=0x1027 in Object.wait() [0x7fcc170ef000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.lang.Thread.join(Thread.java:1281) - locked <0x00070688b0c0> (a org.apache.spark.util.AsynchronousListenerBus$$anon$1) at java.lang.Thread.join(Thread.java:1355) at org.apache.spark.util.AsynchronousListenerBus.stop(AsynchronousListenerBus.scala:167) at org.apache.spark.streaming.scheduler.JobScheduler.stop(JobScheduler.scala:114) - locked <0x00070688a878> (a org.apache.spark.streaming.scheduler.JobScheduler) at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:704) - locked <0x00070624e890> (a org.apache.spark.streaming.StreamingContext) at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:683) - locked <0x00070624e890> (a org.apache.spark.streaming.StreamingContext) at com.verizon.bda.manager.ApplicationManager$.restartWorflow(ApplicationManager.scala:367) at com.verizon.bda.manager.ApplicationListener.onBatchCompleted(ApplicationListener.scala:28) at org.apache.spark.streaming.scheduler.StreamingListenerBus.onPostEvent(StreamingListenerBus.scala:45) at org.apache.spark.streaming.scheduler.StreamingListenerBus.onPostEvent(StreamingListenerBus.scala:26) at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:56) at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:37) at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:79) at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1136) at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63) "JobScheduler" daemon prio=10 tid=0x7fcc1000b000 nid=0x1026 waiting on condition [0x7fcccd25e000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0007125fd760> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) at java.util.concurrent.LinkedBlockingDeque.takeFirst(LinkedBlockingDeque.java:489) at java.util.concurrent.LinkedBlockingDeque.take(LinkedBlockingDeque.java:678) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:46) Thanks, Jiten -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Invocation-of-StreamingContext-stop-hangs-in-1-5-tp25402.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Invocation of StreamingContext.stop() hangs in 1.5
I don't think you should call ssc.stop() in StreamingListenerBus thread. Please stop the context asynchronously. BTW I have a pending PR: https://github.com/apache/spark/pull/9741 On Tue, Nov 17, 2015 at 1:50 PM, jitenwrote: > Hi, > > We're using Spark 1.5 streaming. We've a use case where we need to > stop > an existing StreamingContext and start a new one primarily to handle a > newly > added partition to Kafka topic by creating a new Kafka DStream in the > context of the new StreamingContext. > > We've implemented "StreamingListener" trait and invoking > "StreamingContext.stop(false, false)" in "onBatchCompleted" event. In order > not to stop the underlying SparkContext, we've specified > ""spark.streaming.stopSparkContextByDefault" to false. The above invocation > never returns. > >Here is the partial stack trace of the JVM > > > "StreamingListenerBus" daemon prio=10 tid=0x7fcc1000e800 nid=0x1027 in > Object.wait() [0x7fcc170ef000] >java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > at java.lang.Thread.join(Thread.java:1281) > - locked <0x00070688b0c0> (a > org.apache.spark.util.AsynchronousListenerBus$$anon$1) > at java.lang.Thread.join(Thread.java:1355) > at > > org.apache.spark.util.AsynchronousListenerBus.stop(AsynchronousListenerBus.scala:167) > at > > org.apache.spark.streaming.scheduler.JobScheduler.stop(JobScheduler.scala:114) > - locked <0x00070688a878> (a > org.apache.spark.streaming.scheduler.JobScheduler) > at > > org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:704) > - locked <0x00070624e890> (a > org.apache.spark.streaming.StreamingContext) > at > > org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:683) > - locked <0x00070624e890> (a > org.apache.spark.streaming.StreamingContext) > at > > com.verizon.bda.manager.ApplicationManager$.restartWorflow(ApplicationManager.scala:367) > at > > com.verizon.bda.manager.ApplicationListener.onBatchCompleted(ApplicationListener.scala:28) > at > > org.apache.spark.streaming.scheduler.StreamingListenerBus.onPostEvent(StreamingListenerBus.scala:45) > at > > org.apache.spark.streaming.scheduler.StreamingListenerBus.onPostEvent(StreamingListenerBus.scala:26) > at > org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:56) > at > > org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:37) > at > > org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:79) > at > org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1136) > at > > org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63) > > "JobScheduler" daemon prio=10 tid=0x7fcc1000b000 nid=0x1026 waiting on > condition [0x7fcccd25e000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0007125fd760> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) > at > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) > at > > java.util.concurrent.LinkedBlockingDeque.takeFirst(LinkedBlockingDeque.java:489) > at > java.util.concurrent.LinkedBlockingDeque.take(LinkedBlockingDeque.java:678) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:46) > > > Thanks, > Jiten > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Invocation-of-StreamingContext-stop-hangs-in-1-5-tp25402.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >