By the way this happens when I stooped the Driver process ...

On Tue, May 19, 2015 at 12:29 PM, Dibyendu Bhattacharya <
dibyendu.bhattach...@gmail.com> wrote:

> You mean to say within Runtime.getRuntime().addShutdownHook I call
> ssc.stop(stopSparkContext  = true, stopGracefully  = true) ?
>
> This won't work anymore in 1.4.
>
> The SparkContext got stopped before Receiver processed all received blocks
> and I see below exception in logs. But if I add the Utils.addShutdownHook
> with the priority as I mentioned , then only graceful shutdown works . In
> that case shutdown-hook run in priority order.
>
>
>
> *INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - Sent stop
> signal to all 3 receivers*
> ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Deregistered
> receiver for stream 0: Stopped by driver
> ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Deregistered
> receiver for stream 1: Stopped by driver
> ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Deregistered
> receiver for stream 2: Stopped by driver
> *INFO : org.apache.spark.SparkContext - Invoking stop() from shutdown hook*
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/streaming/batch/json,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/streaming/batch,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/streaming/json,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/streaming,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/metrics/json,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/static,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/executors/threadDump,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/executors/json,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/executors,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/environment/json,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/environment,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/storage/rdd/json,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/storage/rdd,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/storage/json,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/storage,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/stages/pool/json,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/stages/pool,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/stages/stage/json,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/stages/stage,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/stages/json,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/stages,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/jobs/job/json,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/jobs/job,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/jobs/json,null}
> INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
> o.s.j.s.ServletContextHandler{/jobs,null}
> INFO : org.apache.spark.ui.SparkUI - Stopped Spark web UI at
> http://10.252.5.113:4040
> INFO : org.apache.spark.scheduler.DAGScheduler - Stopping DAGScheduler
> INFO : org.apache.spark.scheduler.DAGScheduler - Job 4 failed: start at
> Consumer.java:122, took 10.398746 s
> *Exception in thread "Thread-28" org.apache.spark.SparkException: Job
> cancelled because SparkContext was shut down*
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:736)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:735)
> INFO : org.apache.spark.scheduler.DAGScheduler - ResultStage 2 (start at
> Consumer.java:122) failed in 10.383 s
>
> at
> org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:735)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1468)
> at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
> at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1403)
> at org.apache.spark.SparkContext.stop(SparkContext.scala:1562)
> at
> org.apache.spark.SparkContext$$anonfun$3.apply$mcV$sp(SparkContext.scala:551)
> at org.apache.spark.util.SparkShutdownHook.run(Utils.scala:2271)
> at
> org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Utils.scala:2241)
> at
> org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(Utils.scala:2241)
> at
> org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(Utils.scala:2241)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1769)
> at
> org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(Utils.scala:2241)
> at
> org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(Utils.scala:2241)
> at
> org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(Utils.scala:2241)
> at scala.util.Try$.apply(Try.scala:161)
> at org.apache.spark.util.SparkShutdownHookManager.runAll(Utils.scala:2241)
> at
> org.apache.spark.util.SparkShutdownHookManager$$anon$6.run(Utils.scala:2223)
> at java.lang.Thread.run(Thread.java:744)
> INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - Waiting for
> receiver job to terminate gracefully
> INFO : org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend -
> Asking each executor to shut down
> INFO :
> org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint
> - OutputCommitCoordinator stopped!
> INFO :
> org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint
> - OutputCommitCoordinator stopped!
> INFO : org.apache.spark.MapOutputTrackerMasterEndpoint -
> MapOutputTrackerMasterEndpoint stopped!
> INFO : org.apache.spark.storage.MemoryStore - MemoryStore cleared
> INFO : org.apache.spark.storage.BlockManager - BlockManager stopped
> INFO : org.apache.spark.storage.BlockManagerMaster - BlockManagerMaster
> stopped
> *INFO : org.apache.spark.SparkContext - Successfully stopped SparkContext*
> INFO : akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting
> down remote daemon.
> INFO : akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote
> daemon shut down; proceeding with flushing remote transports.
> INFO : akka.remote.RemoteActorRefProvider$RemotingTerminator - Remoting
> shut down.
> INFO : org.apache.spark.streaming.scheduler.JobGenerator - Checkpointing
> graph for time 1431259082000 ms
> INFO : org.apache.spark.streaming.DStreamGraph - Updating checkpoint data
> for time 1431259082000 ms
> INFO : org.apache.spark.streaming.DStreamGraph - Updated checkpoint data
> for time 1431259082000 ms
> ERROR: org.apache.spark.streaming.scheduler.JobScheduler - Error
> generating jobs for time 1431259082000 ms
> *java.lang.IllegalStateException: Shutdown in progress*
> * at java.lang.ApplicationShutdownHooks.add(Applicat*
> ionShutdownHooks.java:66)
> at java.lang.Runtime.addShutdownHook(Runtime.java:211)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1474)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:263)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
> at
> org.apache.spark.streaming.util.HdfsUtils$.getFileSystemForPath(HdfsUtils.scala:68)
> at
> org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:26)
> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org
> $apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org
> $apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(FileBasedWriteAheadLogWriter.scala:33)
> at
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:41)
> at
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:194)
> at
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:81)
> at
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:44)
> at
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$writeToLog$2.apply(ReceivedBlockTracker.scala:219)
> at
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$writeToLog$2.apply(ReceivedBlockTracker.scala:218)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:218)
> at
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker.allocateBlocksToBatch(ReceivedBlockTracker.scala:108)
> at
> org.apache.spark.streaming.scheduler.ReceiverTracker.allocateBlocksToBatch(ReceiverTracker.scala:105)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:242)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:241)
> at scala.util.Try$.apply(Try.scala:161)
> at
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:241)
> at org.apache.spark.streaming.scheduler.JobGenerator.org
> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:177)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> Exception in thread "main" java.lang.IllegalStateException: Shutdown in
> progress
>
>
>
>
>
>
> On Tue, May 19, 2015 at 11:58 AM, Tathagata Das <t...@databricks.com>
> wrote:
>
>> If you wanted to stop it gracefully, then why are you not calling
>> ssc.stop(stopGracefully = true, stopSparkContext = true)? Then it doesnt
>> matter whether the shutdown hook was called or not.
>>
>> TD
>>
>> On Mon, May 18, 2015 at 9:43 PM, Dibyendu Bhattacharya <
>> dibyendu.bhattach...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Just figured out that if I want to perform graceful shutdown of Spark
>>> Streaming 1.4 ( from master ) , the Runtime.getRuntime().addShutdownHook no
>>> longer works . As in Spark 1.4 there is Utils.addShutdownHook defined for
>>> Spark Core, that gets anyway called , which leads to graceful shutdown from
>>> Spark streaming failed with error like "Sparkcontext already closed" issue.
>>>
>>> To solve this , I need to explicitly add Utils.addShutdownHook in my
>>> driver with higher priority ( say 150 ) than Spark's shutdown priority of
>>> 50 , and there I specified streamingcontext stop method with (false , true)
>>> parameter.
>>>
>>> Just curious to know , if this is how we need to handle shutdown hook
>>> going forward ?
>>>
>>> Can't we make the streaming shutdown default to gracefully  shutdown ?
>>>
>>> Also the Java Api for adding shutdownhook in Utils looks very dirty with
>>> methods like this ..
>>>
>>>
>>>
>>> Utils.addShutdownHook(150, new Function0<BoxedUnit>() {
>>>  @Override
>>> public BoxedUnit apply() {
>>> return null;
>>> }
>>>
>>> @Override
>>> public byte apply$mcB$sp() {
>>> return 0;
>>> }
>>>
>>> @Override
>>> public char apply$mcC$sp() {
>>> return 0;
>>> }
>>>
>>> @Override
>>> public double apply$mcD$sp() {
>>> return 0;
>>> }
>>>
>>> @Override
>>> public float apply$mcF$sp() {
>>> return 0;
>>> }
>>>
>>> @Override
>>> public int apply$mcI$sp() {
>>> // TODO Auto-generated method stub
>>> return 0;
>>> }
>>>
>>> @Override
>>> public long apply$mcJ$sp() {
>>> return 0;
>>> }
>>>
>>> @Override
>>> public short apply$mcS$sp() {
>>> return 0;
>>> }
>>>
>>> @Override
>>> public void apply$mcV$sp() {
>>>  *jsc.stop(false, true);*
>>>  }
>>>
>>> @Override
>>> public boolean apply$mcZ$sp() {
>>> // TODO Auto-generated method stub
>>> return false;
>>> }
>>> });
>>>
>>
>>
>

Reply via email to