You mean to say within Runtime.getRuntime().addShutdownHook I call ssc.stop(stopSparkContext = true, stopGracefully = true) ?
This won't work anymore in 1.4. The SparkContext got stopped before Receiver processed all received blocks and I see below exception in logs. But if I add the Utils.addShutdownHook with the priority as I mentioned , then only graceful shutdown works . In that case shutdown-hook run in priority order. *INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - Sent stop signal to all 3 receivers* ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Deregistered receiver for stream 0: Stopped by driver ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Deregistered receiver for stream 1: Stopped by driver ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Deregistered receiver for stream 2: Stopped by driver *INFO : org.apache.spark.SparkContext - Invoking stop() from shutdown hook* INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/streaming/batch/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/streaming/batch,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/streaming/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/streaming,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/metrics/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/static,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/executors/threadDump/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/executors/threadDump,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/executors/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/executors,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/environment/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/environment,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/storage/rdd/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/storage/rdd,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/storage/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/storage,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/stages/pool/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/stages/pool,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/stages/stage/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/stages/stage,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/stages/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/stages,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/jobs/job/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/jobs/job,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/jobs/json,null} INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped o.s.j.s.ServletContextHandler{/jobs,null} INFO : org.apache.spark.ui.SparkUI - Stopped Spark web UI at http://10.252.5.113:4040 INFO : org.apache.spark.scheduler.DAGScheduler - Stopping DAGScheduler INFO : org.apache.spark.scheduler.DAGScheduler - Job 4 failed: start at Consumer.java:122, took 10.398746 s *Exception in thread "Thread-28" org.apache.spark.SparkException: Job cancelled because SparkContext was shut down* at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:736) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:735) INFO : org.apache.spark.scheduler.DAGScheduler - ResultStage 2 (start at Consumer.java:122) failed in 10.383 s at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:735) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1468) at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84) at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1403) at org.apache.spark.SparkContext.stop(SparkContext.scala:1562) at org.apache.spark.SparkContext$$anonfun$3.apply$mcV$sp(SparkContext.scala:551) at org.apache.spark.util.SparkShutdownHook.run(Utils.scala:2271) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Utils.scala:2241) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(Utils.scala:2241) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(Utils.scala:2241) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1769) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(Utils.scala:2241) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(Utils.scala:2241) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(Utils.scala:2241) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.util.SparkShutdownHookManager.runAll(Utils.scala:2241) at org.apache.spark.util.SparkShutdownHookManager$$anon$6.run(Utils.scala:2223) at java.lang.Thread.run(Thread.java:744) INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - Waiting for receiver job to terminate gracefully INFO : org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend - Asking each executor to shut down INFO : org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint - OutputCommitCoordinator stopped! INFO : org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint - OutputCommitCoordinator stopped! INFO : org.apache.spark.MapOutputTrackerMasterEndpoint - MapOutputTrackerMasterEndpoint stopped! INFO : org.apache.spark.storage.MemoryStore - MemoryStore cleared INFO : org.apache.spark.storage.BlockManager - BlockManager stopped INFO : org.apache.spark.storage.BlockManagerMaster - BlockManagerMaster stopped *INFO : org.apache.spark.SparkContext - Successfully stopped SparkContext* INFO : akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting down remote daemon. INFO : akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut down; proceeding with flushing remote transports. INFO : akka.remote.RemoteActorRefProvider$RemotingTerminator - Remoting shut down. INFO : org.apache.spark.streaming.scheduler.JobGenerator - Checkpointing graph for time 1431259082000 ms INFO : org.apache.spark.streaming.DStreamGraph - Updating checkpoint data for time 1431259082000 ms INFO : org.apache.spark.streaming.DStreamGraph - Updated checkpoint data for time 1431259082000 ms ERROR: org.apache.spark.streaming.scheduler.JobScheduler - Error generating jobs for time 1431259082000 ms *java.lang.IllegalStateException: Shutdown in progress* * at java.lang.ApplicationShutdownHooks.add(Applicat* ionShutdownHooks.java:66) at java.lang.Runtime.addShutdownHook(Runtime.java:211) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1474) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:263) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187) at org.apache.spark.streaming.util.HdfsUtils$.getFileSystemForPath(HdfsUtils.scala:68) at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:26) at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org $apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33) at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org $apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(FileBasedWriteAheadLogWriter.scala:33) at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:41) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:194) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:81) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:44) at org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$writeToLog$2.apply(ReceivedBlockTracker.scala:219) at org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$writeToLog$2.apply(ReceivedBlockTracker.scala:218) at scala.Option.foreach(Option.scala:236) at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:218) at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.allocateBlocksToBatch(ReceivedBlockTracker.scala:108) at org.apache.spark.streaming.scheduler.ReceiverTracker.allocateBlocksToBatch(ReceiverTracker.scala:105) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:242) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:241) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:241) at org.apache.spark.streaming.scheduler.JobGenerator.org $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:177) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Exception in thread "main" java.lang.IllegalStateException: Shutdown in progress On Tue, May 19, 2015 at 11:58 AM, Tathagata Das <t...@databricks.com> wrote: > If you wanted to stop it gracefully, then why are you not calling > ssc.stop(stopGracefully = true, stopSparkContext = true)? Then it doesnt > matter whether the shutdown hook was called or not. > > TD > > On Mon, May 18, 2015 at 9:43 PM, Dibyendu Bhattacharya < > dibyendu.bhattach...@gmail.com> wrote: > >> Hi, >> >> Just figured out that if I want to perform graceful shutdown of Spark >> Streaming 1.4 ( from master ) , the Runtime.getRuntime().addShutdownHook no >> longer works . As in Spark 1.4 there is Utils.addShutdownHook defined for >> Spark Core, that gets anyway called , which leads to graceful shutdown from >> Spark streaming failed with error like "Sparkcontext already closed" issue. >> >> To solve this , I need to explicitly add Utils.addShutdownHook in my >> driver with higher priority ( say 150 ) than Spark's shutdown priority of >> 50 , and there I specified streamingcontext stop method with (false , true) >> parameter. >> >> Just curious to know , if this is how we need to handle shutdown hook >> going forward ? >> >> Can't we make the streaming shutdown default to gracefully shutdown ? >> >> Also the Java Api for adding shutdownhook in Utils looks very dirty with >> methods like this .. >> >> >> >> Utils.addShutdownHook(150, new Function0<BoxedUnit>() { >> @Override >> public BoxedUnit apply() { >> return null; >> } >> >> @Override >> public byte apply$mcB$sp() { >> return 0; >> } >> >> @Override >> public char apply$mcC$sp() { >> return 0; >> } >> >> @Override >> public double apply$mcD$sp() { >> return 0; >> } >> >> @Override >> public float apply$mcF$sp() { >> return 0; >> } >> >> @Override >> public int apply$mcI$sp() { >> // TODO Auto-generated method stub >> return 0; >> } >> >> @Override >> public long apply$mcJ$sp() { >> return 0; >> } >> >> @Override >> public short apply$mcS$sp() { >> return 0; >> } >> >> @Override >> public void apply$mcV$sp() { >> *jsc.stop(false, true);* >> } >> >> @Override >> public boolean apply$mcZ$sp() { >> // TODO Auto-generated method stub >> return false; >> } >> }); >> > >