Hi Adam, I have following scala actor based code to do graceful shutdown:
class TimerActor (val timeout : Long, val who : Actor) extends Actor { def act { reactWithin (timeout) { case TIMEOUT => who ! SHUTDOWN } } } class SSCReactor (val ssc : StreamingContext) extends Actor with Logging { def act { react { case SHUTDOWN => logger.info (s"Shutting down gracefully ...") ssc.stop (true, true) } } } I see following message: 14/10/22 01:40:49 INFO SSCReactor: Shutting down gracefully ... 14/10/22 01:40:49 INFO JobGenerator: Stopping JobGenerator gracefully 14/10/22 01:40:49 INFO JobGenerator: Waiting for all received blocks to be consumed for job generation 14/10/22 01:40:49 INFO JobGenerator: Waited for all received blocks to be consumed for job generation -Soumitra. On Mon, Dec 15, 2014 at 1:32 PM, Budde, Adam <bu...@amazon.com> wrote: > > Hi all, > > We are using Spark Streaming ETL a large volume of time series datasets. > In our current design, each dataset we ETL will have a corresponding Spark > Streaming context + process running on our cluster. Each of these processes > will be passed configuration options specifying the data source to process > as well as various tuning parameters such as the number of Receiver objects > to use, batch interval size, number of partitions, etc. > > Since the volume of data we're ingesting for each dataset will fluctuate > over time, we'd like to be able to regularly send a SIGTERM to the Spark > Streaming process handling the ETL, have that process gracefully complete > processing any in-flight data, and restart the process with updated > configuration options. The most obvious solution seems to be to call the > stop(stopSparkContext: Boolean, stopGracefully: Boolean) method provided > by StreamingContext in a shutdown hook, but this approach doesn't seem to > be working for me. Here's a rough idea of what my code looks like: > > > val ssc = new StreamingContext(conf, Seconds(15)) > > > > ... > > > > // Add shutdown hook to exit gracefully upon termination. > > Runtime.getRuntime().addShutdownHook(new Thread() extends Logging { > > override def run() = { > > logInfo("Exiting gracefully...") > > ssc.stop(true, true) > > } > > }) > > > > ... > > > > ssc.start() > > ssc.awaitTermination() > > Whenever I try to kill the process, I don't see the "Exiting > gracefully…" log message I've added. I tried grokking through the Spark > source code to see if some other shutdown hook might be squashing the hook > I've added by causing the process to exit before this hook is invoked, but > I haven't found anything that would cause concern yet. Does anybody have > any advice or insight on this? I'm a bit of a novice when it comes to the > JVM and I'm afraid that I'm reaching the limits of my diagnostic abilities > here. > > Thanks, > Adam >