By the way this happens when I stooped the Driver process ... On Tue, May 19, 2015 at 12:29 PM, Dibyendu Bhattacharya < dibyendu.bhattach...@gmail.com> wrote:
> You mean to say within Runtime.getRuntime().addShutdownHook I call > ssc.stop(stopSparkContext = true, stopGracefully = true) ? > > This won't work anymore in 1.4. > > The SparkContext got stopped before Receiver processed all received blocks > and I see below exception in logs. But if I add the Utils.addShutdownHook > with the priority as I mentioned , then only graceful shutdown works . In > that case shutdown-hook run in priority order. > > > > *INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - Sent stop > signal to all 3 receivers* > ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Deregistered > receiver for stream 0: Stopped by driver > ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Deregistered > receiver for stream 1: Stopped by driver > ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Deregistered > receiver for stream 2: Stopped by driver > *INFO : org.apache.spark.SparkContext - Invoking stop() from shutdown hook* > INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped > o.s.j.s.ServletContextHandler{/streaming/batch/json,null} > INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped > o.s.j.s.ServletContextHandler{/streaming/batch,null} > INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped > o.s.j.s.ServletContextHandler{/streaming/json,null} > INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped > o.s.j.s.ServletContextHandler{/streaming,null} > INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped > o.s.j.s.ServletContextHandler{/metrics/json,null} > INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped > o.s.j.s.ServletContextHandler{/stages/stage/kill,null} > INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped > o.s.j.s.ServletContextHandler{/,null} > INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped > o.s.j.s.ServletContextHandler{/static,null} > INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped > o.s.j.s.ServletContextHandler{/executors/threadDump/json,null} > INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped > o.s.j.s.ServletContextHandler{/executors/threadDump,null} > INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped > o.s.j.s.ServletContextHandler{/executors/json,null} > INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped > o.s.j.s.ServletContextHandler{/executors,null} > INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped > o.s.j.s.ServletContextHandler{/environment/json,null} > INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped > o.s.j.s.ServletContextHandler{/environment,null} > INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped > o.s.j.s.ServletContextHandler{/storage/rdd/json,null} > INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped > o.s.j.s.ServletContextHandler{/storage/rdd,null} > INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped > o.s.j.s.ServletContextHandler{/storage/json,null} > INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped > o.s.j.s.ServletContextHandler{/storage,null} > INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped > o.s.j.s.ServletContextHandler{/stages/pool/json,null} > INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped > o.s.j.s.ServletContextHandler{/stages/pool,null} > INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped > o.s.j.s.ServletContextHandler{/stages/stage/json,null} > INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped > o.s.j.s.ServletContextHandler{/stages/stage,null} > INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped > o.s.j.s.ServletContextHandler{/stages/json,null} > INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped > o.s.j.s.ServletContextHandler{/stages,null} > INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped > o.s.j.s.ServletContextHandler{/jobs/job/json,null} > INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped > o.s.j.s.ServletContextHandler{/jobs/job,null} > INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped > o.s.j.s.ServletContextHandler{/jobs/json,null} > INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped > o.s.j.s.ServletContextHandler{/jobs,null} > INFO : org.apache.spark.ui.SparkUI - Stopped Spark web UI at > http://10.252.5.113:4040 > INFO : org.apache.spark.scheduler.DAGScheduler - Stopping DAGScheduler > INFO : org.apache.spark.scheduler.DAGScheduler - Job 4 failed: start at > Consumer.java:122, took 10.398746 s > *Exception in thread "Thread-28" org.apache.spark.SparkException: Job > cancelled because SparkContext was shut down* > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:736) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:735) > INFO : org.apache.spark.scheduler.DAGScheduler - ResultStage 2 (start at > Consumer.java:122) failed in 10.383 s > > at > org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:735) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1468) > at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84) > at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1403) > at org.apache.spark.SparkContext.stop(SparkContext.scala:1562) > at > org.apache.spark.SparkContext$$anonfun$3.apply$mcV$sp(SparkContext.scala:551) > at org.apache.spark.util.SparkShutdownHook.run(Utils.scala:2271) > at > org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Utils.scala:2241) > at > org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(Utils.scala:2241) > at > org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(Utils.scala:2241) > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1769) > at > org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(Utils.scala:2241) > at > org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(Utils.scala:2241) > at > org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(Utils.scala:2241) > at scala.util.Try$.apply(Try.scala:161) > at org.apache.spark.util.SparkShutdownHookManager.runAll(Utils.scala:2241) > at > org.apache.spark.util.SparkShutdownHookManager$$anon$6.run(Utils.scala:2223) > at java.lang.Thread.run(Thread.java:744) > INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - Waiting for > receiver job to terminate gracefully > INFO : org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend - > Asking each executor to shut down > INFO : > org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint > - OutputCommitCoordinator stopped! > INFO : > org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint > - OutputCommitCoordinator stopped! > INFO : org.apache.spark.MapOutputTrackerMasterEndpoint - > MapOutputTrackerMasterEndpoint stopped! > INFO : org.apache.spark.storage.MemoryStore - MemoryStore cleared > INFO : org.apache.spark.storage.BlockManager - BlockManager stopped > INFO : org.apache.spark.storage.BlockManagerMaster - BlockManagerMaster > stopped > *INFO : org.apache.spark.SparkContext - Successfully stopped SparkContext* > INFO : akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting > down remote daemon. > INFO : akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote > daemon shut down; proceeding with flushing remote transports. > INFO : akka.remote.RemoteActorRefProvider$RemotingTerminator - Remoting > shut down. > INFO : org.apache.spark.streaming.scheduler.JobGenerator - Checkpointing > graph for time 1431259082000 ms > INFO : org.apache.spark.streaming.DStreamGraph - Updating checkpoint data > for time 1431259082000 ms > INFO : org.apache.spark.streaming.DStreamGraph - Updated checkpoint data > for time 1431259082000 ms > ERROR: org.apache.spark.streaming.scheduler.JobScheduler - Error > generating jobs for time 1431259082000 ms > *java.lang.IllegalStateException: Shutdown in progress* > * at java.lang.ApplicationShutdownHooks.add(Applicat* > ionShutdownHooks.java:66) > at java.lang.Runtime.addShutdownHook(Runtime.java:211) > at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1474) > at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:263) > at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187) > at > org.apache.spark.streaming.util.HdfsUtils$.getFileSystemForPath(HdfsUtils.scala:68) > at > org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:26) > at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org > $apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33) > at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org > $apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(FileBasedWriteAheadLogWriter.scala:33) > at > org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:41) > at > org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:194) > at > org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:81) > at > org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:44) > at > org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$writeToLog$2.apply(ReceivedBlockTracker.scala:219) > at > org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$writeToLog$2.apply(ReceivedBlockTracker.scala:218) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:218) > at > org.apache.spark.streaming.scheduler.ReceivedBlockTracker.allocateBlocksToBatch(ReceivedBlockTracker.scala:108) > at > org.apache.spark.streaming.scheduler.ReceiverTracker.allocateBlocksToBatch(ReceiverTracker.scala:105) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:242) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:241) > at scala.util.Try$.apply(Try.scala:161) > at > org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:241) > at org.apache.spark.streaming.scheduler.JobGenerator.org > $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:177) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > Exception in thread "main" java.lang.IllegalStateException: Shutdown in > progress > > > > > > > On Tue, May 19, 2015 at 11:58 AM, Tathagata Das <t...@databricks.com> > wrote: > >> If you wanted to stop it gracefully, then why are you not calling >> ssc.stop(stopGracefully = true, stopSparkContext = true)? Then it doesnt >> matter whether the shutdown hook was called or not. >> >> TD >> >> On Mon, May 18, 2015 at 9:43 PM, Dibyendu Bhattacharya < >> dibyendu.bhattach...@gmail.com> wrote: >> >>> Hi, >>> >>> Just figured out that if I want to perform graceful shutdown of Spark >>> Streaming 1.4 ( from master ) , the Runtime.getRuntime().addShutdownHook no >>> longer works . As in Spark 1.4 there is Utils.addShutdownHook defined for >>> Spark Core, that gets anyway called , which leads to graceful shutdown from >>> Spark streaming failed with error like "Sparkcontext already closed" issue. >>> >>> To solve this , I need to explicitly add Utils.addShutdownHook in my >>> driver with higher priority ( say 150 ) than Spark's shutdown priority of >>> 50 , and there I specified streamingcontext stop method with (false , true) >>> parameter. >>> >>> Just curious to know , if this is how we need to handle shutdown hook >>> going forward ? >>> >>> Can't we make the streaming shutdown default to gracefully shutdown ? >>> >>> Also the Java Api for adding shutdownhook in Utils looks very dirty with >>> methods like this .. >>> >>> >>> >>> Utils.addShutdownHook(150, new Function0<BoxedUnit>() { >>> @Override >>> public BoxedUnit apply() { >>> return null; >>> } >>> >>> @Override >>> public byte apply$mcB$sp() { >>> return 0; >>> } >>> >>> @Override >>> public char apply$mcC$sp() { >>> return 0; >>> } >>> >>> @Override >>> public double apply$mcD$sp() { >>> return 0; >>> } >>> >>> @Override >>> public float apply$mcF$sp() { >>> return 0; >>> } >>> >>> @Override >>> public int apply$mcI$sp() { >>> // TODO Auto-generated method stub >>> return 0; >>> } >>> >>> @Override >>> public long apply$mcJ$sp() { >>> return 0; >>> } >>> >>> @Override >>> public short apply$mcS$sp() { >>> return 0; >>> } >>> >>> @Override >>> public void apply$mcV$sp() { >>> *jsc.stop(false, true);* >>> } >>> >>> @Override >>> public boolean apply$mcZ$sp() { >>> // TODO Auto-generated method stub >>> return false; >>> } >>> }); >>> >> >> >