Aaah, interesting, you are doing 15 minute slide duration. Yeah, internally the streaming scheduler waits for the last "batch" interval which has data to be processed, but if there is a sliding interval (i.e. 15 mins) that is higher than batch interval, then that might not be run. This is indeed a bug and should be fixed. Mind setting up a JIRA and assigning it to me.
On Wed, Oct 7, 2015 at 8:33 AM, Michal Čizmazia <mici...@gmail.com> wrote: > After triggering the graceful shutdown on the following application, the > application stops before the windowed stream reaches its slide duration. As > a result, the data is not completely processed (i.e. saveToMyStorage is not > called) before shutdown. > > According to the documentation, graceful shutdown should ensure that the > data, which has been received, is completely processed before shutdown. > > https://spark.apache.org/docs/1.4.0/streaming-programming-guide.html#upgrading-application-code > > Spark version: 1.4.1 > > Code snippet: > > Function0<JavaStreamingContext> factory = () -> { > JavaStreamingContext context = new JavaStreamingContext(sparkConf, > Durations.minutes(1)); > context.checkpoint("/test"); > JavaDStream<String> records = > context.receiverStream(myReliableReceiver).flatMap(...); > records.persist(StorageLevel.MEMORY_AND_DISK()); > records.foreachRDD(rdd -> { rdd.count(); return null; }); > records > .window(Durations.minutes(15), Durations.minutes(15)) > .foreachRDD(rdd -> saveToMyStorage(rdd)); > return context; > }; > > try (JavaStreamingContext context = > JavaStreamingContext.getOrCreate("/test", factory)) { > context.start(); > waitForShutdownSignal(); > Boolean stopSparkContext = true; > Boolean stopGracefully = true; > context.stop(stopSparkContext, stopGracefully); > } > >