Aaah, interesting, you are doing 15 minute slide duration. Yeah, internally
the streaming scheduler waits for the last "batch" interval which has data
to be processed, but if there is a sliding interval (i.e. 15 mins) that is
higher than batch interval, then that might not be run. This is indeed a
bug and should be fixed. Mind setting up a JIRA and assigning it to me.

On Wed, Oct 7, 2015 at 8:33 AM, Michal Čizmazia <mici...@gmail.com> wrote:

> After triggering the graceful shutdown on the following application, the
> application stops before the windowed stream reaches its slide duration. As
> a result, the data is not completely processed (i.e. saveToMyStorage is not
> called) before shutdown.
>
> According to the documentation, graceful shutdown should ensure that the
> data, which has been received, is completely processed before shutdown.
>
> https://spark.apache.org/docs/1.4.0/streaming-programming-guide.html#upgrading-application-code
>
> Spark version: 1.4.1
>
> Code snippet:
>
> Function0<JavaStreamingContext> factory = () -> {
>     JavaStreamingContext context = new JavaStreamingContext(sparkConf,
> Durations.minutes(1));
>     context.checkpoint("/test");
>     JavaDStream<String> records =
> context.receiverStream(myReliableReceiver).flatMap(...);
>     records.persist(StorageLevel.MEMORY_AND_DISK());
>     records.foreachRDD(rdd -> { rdd.count(); return null; });
>     records
>         .window(Durations.minutes(15), Durations.minutes(15))
>         .foreachRDD(rdd -> saveToMyStorage(rdd));
>     return context;
> };
>
> try (JavaStreamingContext context =
> JavaStreamingContext.getOrCreate("/test", factory)) {
>     context.start();
>     waitForShutdownSignal();
>     Boolean stopSparkContext = true;
>     Boolean stopGracefully = true;
>     context.stop(stopSparkContext, stopGracefully);
> }
>
>

Reply via email to