Hi Adam,

I have following scala actor based code to do graceful shutdown:

class TimerActor (val timeout : Long, val who : Actor) extends Actor {
    def act {
        reactWithin (timeout) {
            case TIMEOUT => who ! SHUTDOWN
        }
    }
}

class SSCReactor (val ssc : StreamingContext) extends Actor with Logging {
    def act {
        react {
            case SHUTDOWN =>
                logger.info (s"Shutting down gracefully ...")
                ssc.stop (true, true)
        }
    }
}

I see following message:

14/10/22 01:40:49 INFO SSCReactor: Shutting down gracefully ...
14/10/22 01:40:49 INFO JobGenerator: Stopping JobGenerator gracefully
14/10/22 01:40:49 INFO JobGenerator: Waiting for all received blocks to be
consumed for job generation
14/10/22 01:40:49 INFO JobGenerator: Waited for all received blocks to be
consumed for job generation

-Soumitra.


On Mon, Dec 15, 2014 at 1:32 PM, Budde, Adam <bu...@amazon.com> wrote:
>
>  Hi all,
>
>  We are using Spark Streaming ETL a large volume of time series datasets.
> In our current design, each dataset we ETL will have a corresponding Spark
> Streaming context + process running on our cluster. Each of these processes
> will be passed configuration options specifying the data source to process
> as well as various tuning parameters such as the number of Receiver objects
> to use, batch interval size, number of partitions, etc.
>
>  Since the volume of data we're ingesting for each dataset will fluctuate
> over time, we'd like to be able to regularly send a SIGTERM to the Spark
> Streaming process handling the ETL, have that process gracefully complete
> processing any in-flight data, and restart the process with updated
> configuration options. The most obvious solution seems to be to call the
> stop(stopSparkContext: Boolean, stopGracefully: Boolean) method provided
> by StreamingContext in a shutdown hook, but this approach doesn't seem to
> be working for me. Here's a rough idea of what my code looks like:
>
>  > val ssc = new StreamingContext(conf, Seconds(15))
> >
> > ...
> >
> > // Add shutdown hook to exit gracefully upon termination.
> > Runtime.getRuntime().addShutdownHook(new Thread() extends Logging {
> >   override def run() = {
> >     logInfo("Exiting gracefully...")
> >     ssc.stop(true, true)
> >   }
> > })
> >
> > ...
> >
> > ssc.start()
> > ssc.awaitTermination()
>
>  Whenever I try to kill the process, I don't see the "Exiting
> gracefully…" log message I've added. I tried grokking through the Spark
> source code to see if some other shutdown hook might be squashing the hook
> I've added by causing the process to exit before this hook is invoked, but
> I haven't found anything that would cause concern yet. Does anybody have
> any advice or insight on this? I'm a bit of a novice when it comes to the
> JVM and I'm afraid that I'm reaching the limits of my diagnostic abilities
> here.
>
>  Thanks,
> Adam
>

Reply via email to