Hi all, I am trying to do a graceful shutdown of my spark streaming job and it appears that everything shuts down gracefully but the checkpointing thread, which continues to run until it crashes.
I looked at the checkpoint thread in 1.3.0 ( https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala) and it appears the write method in CheckpointWriter will try to schedule a new CheckpointWriteHandler (and get the below exception) irregardless of the value of 'stopped', which would be set to 'true' as it was stopped by the graceful shutdown. Is this a bug? Shouldn't the write method not try to schedule anything if stopped is true? Thanks! This is what I'm doing: ================= class TestStreaming extends FunSuite with BeforeAndAfterAll { @transient var sc: SparkContext = _ @transient var ssc: StreamingContext = _ override def beforeAll() = { System.clearProperty("spark.driver.port") System.clearProperty("spark.hostPort") System.setProperty("spark.cleaner.ttl", "300") val sparkConf = new SparkConf().setAppName("testSpark").setMaster("local[4]") sc = new SparkContext(sparkConf) ssc = new StreamingContext(sc, Seconds(1)) } override def afterAll() = { val stopSparkContext = true val stopGracefully = true ssc.stop(stopSparkContext, stopGracefully) sc = null ssc = null System.clearProperty("spark.driver.port") System.clearProperty("spark.hostPort") } test("testStreaming") { val rddQueue = new SynchronizedQueue[RDD[JValue]]() val inputStream = ssc.queueStream(rddQueue) rddQueue += ssc.sparkContext.makeRDD(TestInput.reports("disney")) val hydratedReports = ReportHydrator.hydrate(inputStream) ApplicationPropertyGenerator.generateFrom(hydratedReports).foreachRDD(rdd => rdd.foreach(println(_))) ssc.checkpoint("reports/streaming") ssc.start() } } This is the output I get when shutting down gracefully (the exception is half-way down): ================================================= 15/03/17 12:25:34 INFO Executor: Finished task 3.0 in stage 2.0 (TID 11). 1455 bytes result sent to driver 15/03/17 12:25:34 INFO TaskSetManager: Finished task 3.0 in stage 2.0 (TID 11) in 3019 ms on localhost (4/4) 15/03/17 12:25:34 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 15/03/17 12:25:34 INFO DAGScheduler: Stage 2 (foreachRDD at TestStreaming.scala:42) finished in 3.027 s 15/03/17 12:25:34 INFO DAGScheduler: Job 0 finished: foreachRDD at TestStreaming.scala:42, took 4.492961 s 15/03/17 12:25:34 INFO JobScheduler: Finished job streaming job 1426620330000 ms.0 from job set of time 1426620330000 ms 15/03/17 12:25:34 INFO JobScheduler: Total delay: 4.951 s for time 1426620330000 ms (execution: 4.532 s) 15/03/17 12:25:39 WARN JobGenerator: Timed out while stopping the job generator (timeout = 10000) 15/03/17 12:25:39 INFO JobGenerator: Waited for jobs to be processed and checkpoints to be written 15/03/17 12:25:39 INFO CheckpointWriter: CheckpointWriter executor terminated ? true, waited for 0 ms. 15/03/17 12:25:39 INFO JobGenerator: Stopped JobGenerator 15/03/17 12:25:39 INFO JobGenerator: Checkpointing graph for time 1426620330000 ms 15/03/17 12:25:39 INFO DStreamGraph: Updating checkpoint data for time 1426620330000 ms 15/03/17 12:25:39 INFO DStreamGraph: Updated checkpoint data for time 1426620330000 ms 15/03/17 12:25:39 INFO JobScheduler: Stopped JobScheduler 15/03/17 12:25:39 INFO StreamingContext: StreamingContext stopped successfully 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/streaming/json,null} 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/streaming,null} 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/metrics/json,null} 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,null} 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/,null} 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/static,null} 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump/json,null} 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump,null} 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/json,null} 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors,null} 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment/json,null} 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment,null} 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd/json,null} 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd,null} 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/json,null} 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage,null} 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool/json,null} 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool,null} 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/json,null} 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage,null} 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/json,null} 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages,null} 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job/json,null} 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job,null} 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/json,null} 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs,null} 15/03/17 12:25:39 ERROR CheckpointWriter: Could not submit checkpoint task to the thread pool executor java.util.concurrent.RejectedExecutionException: Task org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@5cd6e776 rejected from java.util.concurrent.ThreadPoolExecutor@20f62398[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2048) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372) at org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:188) at org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:285) at org.apache.spark.streaming.scheduler.JobGenerator.org $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:176) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:85) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1.aroundReceive(JobGenerator.scala:83) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 15/03/17 12:25:39 INFO SparkUI: Stopped Spark web UI at http://192.168.241.128:4040 15/03/17 12:25:39 INFO DAGScheduler: Stopping DAGScheduler 15/03/17 12:25:39 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor stopped! 15/03/17 12:25:39 INFO MemoryStore: MemoryStore cleared 15/03/17 12:25:39 INFO BlockManager: BlockManager stopped 15/03/17 12:25:39 INFO BlockManagerMaster: BlockManagerMaster stopped 15/03/17 12:25:39 INFO OutputCommitCoordinator$OutputCommitCoordinatorActor: OutputCommitCoordinator stopped! 15/03/17 12:25:39 INFO SparkContext: Successfully stopped SparkContext 15/03/17 12:25:39 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 15/03/17 12:25:39 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. [info] TestStreaming: [info] - testStreaming [info] Run completed in 14 seconds, 369 milliseconds. [info] Total number of tests run: 1 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. [success] Total time: 17 s, completed Mar 17, 2015 12:25:39 PM 15/03/17 12:25:39 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down. >