You mean to say within Runtime.getRuntime().addShutdownHook I call
ssc.stop(stopSparkContext
 = true, stopGracefully  = true) ?

This won't work anymore in 1.4.

The SparkContext got stopped before Receiver processed all received blocks
and I see below exception in logs. But if I add the Utils.addShutdownHook
with the priority as I mentioned , then only graceful shutdown works . In
that case shutdown-hook run in priority order.



*INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - Sent stop
signal to all 3 receivers*
ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Deregistered
receiver for stream 0: Stopped by driver
ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Deregistered
receiver for stream 1: Stopped by driver
ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Deregistered
receiver for stream 2: Stopped by driver
*INFO : org.apache.spark.SparkContext - Invoking stop() from shutdown hook*
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/streaming/batch/json,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/streaming/batch,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/streaming/json,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/streaming,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/metrics/json,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/static,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/executors/threadDump,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/executors/json,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/executors,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/environment/json,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/environment,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/storage/rdd/json,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/storage/rdd,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/storage/json,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/storage,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/stages/pool/json,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/stages/pool,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/stages/stage/json,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/stages/stage,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/stages/json,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/stages,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/jobs/job/json,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/jobs/job,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/jobs/json,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/jobs,null}
INFO : org.apache.spark.ui.SparkUI - Stopped Spark web UI at
http://10.252.5.113:4040
INFO : org.apache.spark.scheduler.DAGScheduler - Stopping DAGScheduler
INFO : org.apache.spark.scheduler.DAGScheduler - Job 4 failed: start at
Consumer.java:122, took 10.398746 s
*Exception in thread "Thread-28" org.apache.spark.SparkException: Job
cancelled because SparkContext was shut down*
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:736)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:735)
INFO : org.apache.spark.scheduler.DAGScheduler - ResultStage 2 (start at
Consumer.java:122) failed in 10.383 s

at
org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:735)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1468)
at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1403)
at org.apache.spark.SparkContext.stop(SparkContext.scala:1562)
at
org.apache.spark.SparkContext$$anonfun$3.apply$mcV$sp(SparkContext.scala:551)
at org.apache.spark.util.SparkShutdownHook.run(Utils.scala:2271)
at
org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Utils.scala:2241)
at
org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(Utils.scala:2241)
at
org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(Utils.scala:2241)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1769)
at
org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(Utils.scala:2241)
at
org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(Utils.scala:2241)
at
org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(Utils.scala:2241)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.util.SparkShutdownHookManager.runAll(Utils.scala:2241)
at
org.apache.spark.util.SparkShutdownHookManager$$anon$6.run(Utils.scala:2223)
at java.lang.Thread.run(Thread.java:744)
INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - Waiting for
receiver job to terminate gracefully
INFO : org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend -
Asking each executor to shut down
INFO :
org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint
- OutputCommitCoordinator stopped!
INFO :
org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint
- OutputCommitCoordinator stopped!
INFO : org.apache.spark.MapOutputTrackerMasterEndpoint -
MapOutputTrackerMasterEndpoint stopped!
INFO : org.apache.spark.storage.MemoryStore - MemoryStore cleared
INFO : org.apache.spark.storage.BlockManager - BlockManager stopped
INFO : org.apache.spark.storage.BlockManagerMaster - BlockManagerMaster
stopped
*INFO : org.apache.spark.SparkContext - Successfully stopped SparkContext*
INFO : akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting
down remote daemon.
INFO : akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote
daemon shut down; proceeding with flushing remote transports.
INFO : akka.remote.RemoteActorRefProvider$RemotingTerminator - Remoting
shut down.
INFO : org.apache.spark.streaming.scheduler.JobGenerator - Checkpointing
graph for time 1431259082000 ms
INFO : org.apache.spark.streaming.DStreamGraph - Updating checkpoint data
for time 1431259082000 ms
INFO : org.apache.spark.streaming.DStreamGraph - Updated checkpoint data
for time 1431259082000 ms
ERROR: org.apache.spark.streaming.scheduler.JobScheduler - Error generating
jobs for time 1431259082000 ms
*java.lang.IllegalStateException: Shutdown in progress*
* at java.lang.ApplicationShutdownHooks.add(Applicat*
ionShutdownHooks.java:66)
at java.lang.Runtime.addShutdownHook(Runtime.java:211)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1474)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:263)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
at
org.apache.spark.streaming.util.HdfsUtils$.getFileSystemForPath(HdfsUtils.scala:68)
at
org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:26)
at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org
$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org
$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(FileBasedWriteAheadLogWriter.scala:33)
at
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:41)
at
org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:194)
at
org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:81)
at
org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:44)
at
org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$writeToLog$2.apply(ReceivedBlockTracker.scala:219)
at
org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$writeToLog$2.apply(ReceivedBlockTracker.scala:218)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:218)
at
org.apache.spark.streaming.scheduler.ReceivedBlockTracker.allocateBlocksToBatch(ReceivedBlockTracker.scala:108)
at
org.apache.spark.streaming.scheduler.ReceiverTracker.allocateBlocksToBatch(ReceiverTracker.scala:105)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:242)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:241)
at scala.util.Try$.apply(Try.scala:161)
at
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:241)
at org.apache.spark.streaming.scheduler.JobGenerator.org
$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:177)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Exception in thread "main" java.lang.IllegalStateException: Shutdown in
progress






On Tue, May 19, 2015 at 11:58 AM, Tathagata Das <t...@databricks.com> wrote:

> If you wanted to stop it gracefully, then why are you not calling
> ssc.stop(stopGracefully = true, stopSparkContext = true)? Then it doesnt
> matter whether the shutdown hook was called or not.
>
> TD
>
> On Mon, May 18, 2015 at 9:43 PM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
>> Hi,
>>
>> Just figured out that if I want to perform graceful shutdown of Spark
>> Streaming 1.4 ( from master ) , the Runtime.getRuntime().addShutdownHook no
>> longer works . As in Spark 1.4 there is Utils.addShutdownHook defined for
>> Spark Core, that gets anyway called , which leads to graceful shutdown from
>> Spark streaming failed with error like "Sparkcontext already closed" issue.
>>
>> To solve this , I need to explicitly add Utils.addShutdownHook in my
>> driver with higher priority ( say 150 ) than Spark's shutdown priority of
>> 50 , and there I specified streamingcontext stop method with (false , true)
>> parameter.
>>
>> Just curious to know , if this is how we need to handle shutdown hook
>> going forward ?
>>
>> Can't we make the streaming shutdown default to gracefully  shutdown ?
>>
>> Also the Java Api for adding shutdownhook in Utils looks very dirty with
>> methods like this ..
>>
>>
>>
>> Utils.addShutdownHook(150, new Function0<BoxedUnit>() {
>>  @Override
>> public BoxedUnit apply() {
>> return null;
>> }
>>
>> @Override
>> public byte apply$mcB$sp() {
>> return 0;
>> }
>>
>> @Override
>> public char apply$mcC$sp() {
>> return 0;
>> }
>>
>> @Override
>> public double apply$mcD$sp() {
>> return 0;
>> }
>>
>> @Override
>> public float apply$mcF$sp() {
>> return 0;
>> }
>>
>> @Override
>> public int apply$mcI$sp() {
>> // TODO Auto-generated method stub
>> return 0;
>> }
>>
>> @Override
>> public long apply$mcJ$sp() {
>> return 0;
>> }
>>
>> @Override
>> public short apply$mcS$sp() {
>> return 0;
>> }
>>
>> @Override
>> public void apply$mcV$sp() {
>>  *jsc.stop(false, true);*
>>  }
>>
>> @Override
>> public boolean apply$mcZ$sp() {
>> // TODO Auto-generated method stub
>> return false;
>> }
>> });
>>
>
>

Reply via email to