Hi Juho, thanks for trying this out. I'm running out of myself now... Let's do bref summarize.
- have we reached a consensus that the map state losts when restoring?(because you can restore successfully without rescaling) - the timer state is correctly restore, because for timer, when restoring from checkpoint it has no different with storing from savepoint.(@Stefan plz correct me if I'm wrong) And @Juho, have you try to rescale the job with a different parallelism(not always with 16)? Best, Sihua On 05/18/2018 17:14,Juho Autio<juho.au...@rovio.com> wrote: Tested with http://people.apache.org/~trohrmann/flink-1.5.0-rc4/flink-1.5.0-bin-hadoop27-scala_2.11.tgz. It hits the same problem. Btw, why is this error logged on INFO level? 2018-05-18 09:03:52,595 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - PlatformIDsProcessFunction -> AppIdFilter([MyApp]) -> DiscardBeforeDateFunction(null) -> DiscardedEventsFunction -> (LateDataLabelFunction, FooEventMapper -> ThreadPoolFooGateway (capacity=500) -> Sink: ResponseKafkaSink) (8/16) (c2c102bcf4e1c7f523a2d00b81ed1a23) switched from RUNNING to FAILED. java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NumberFormatException: null at java.lang.Long.parseLong(Long.java:552) at java.lang.Long.parseLong(Long.java:631) at com.mycompany.ds.flink.http.Foo.EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136) at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97) at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onEventTime(LegacyKeyedProcessOperator.java:75) at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734) at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262) ... 7 more 2018-05-18 09:03:52,596 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job FooHttpStream (a54a9210341fd26d210122986f2be844) switched from state RUNNING to FAILING. java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NumberFormatException: null at java.lang.Long.parseLong(Long.java:552) at java.lang.Long.parseLong(Long.java:631) at com.mycompany.ds.flink.http.Foo.EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136) at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97) at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onEventTime(LegacyKeyedProcessOperator.java:75) at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734) at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262) ... 7 more On Fri, May 18, 2018 at 11:06 AM, Juho Autio <juho.au...@rovio.com> wrote: Thanks Sihua, I'll give that RC a try. On Fri, May 18, 2018 at 10:58 AM, sihua zhou <summerle...@163.com> wrote: Hi Juho, would you like to try out the latest RC(http://people.apache.org/~trohrmann/flink-1.5.0-rc4/) to rescaling the job from the "problematic" checkpoint? The latest RC includes a fix for the potential silently data lost. If it's the reason, you will see a different exception when you trying to recover you job. Best, Sihua On 05/18/2018 15:02,Juho Autio<juho.au...@rovio.com> wrote: I see. I appreciate keeping this option available even if it's "beta". The current situation could be documented better, though. As long as rescaling from checkpoint is not officially supported, I would put it behind a flag similar to --allowNonRestoredState. The flag could be called --allowRescalingRestoredCheckpointState, for example. This would make sure that users are aware that what they're using is experimental and might have unexpected effects. As for the bug I faced, indeed I was able to reproduce it consistently. And I have provided TRACE-level logs personally to Stefan. If there is no Jira ticket for this yet, would you like me to create one? On Thu, May 17, 2018 at 1:00 PM, Stefan Richter <s.rich...@data-artisans.com> wrote: Hi, > > This raises a couple of questions: > - Is it a bug though, that the state restoring goes wrong like it does for my > job? Based on my experience it seems like rescaling sometimes works, but then > you can have these random errors. If there is a problem, I would still consider it a bug because it should work correctly. > - If it's not supported properly, why not refuse to restore a checkpoint if > it would require rescaling? It should work properly, but I would preferred to keep this at the level of a "hidden feature“ until it got some more exposure and also some questions about the future of differences between savepoints and checkpoints are solved. > - We have sometimes had Flink jobs where the state has become so heavy that > cancelling with a savepoint times out & fails. Incremental checkpoints are > still working because they don't timeout as long as the state is growing > linearly. In that case if we want to scale up (for example to enable > successful savepoint creation ;) ), the only thing we can do is to restore > from the latest checkpoint. But then we have no way to scale up by increasing > the cluster size, because we can't create a savepoint with a smaller cluster > but on the other hand can't restore a checkpoint to a bigger cluster, if > rescaling from a checkpoint is not supposed to be relied on. So in this case > we're stuck and forced to start from an empty state? IMO there is a very good chance that this will simply become a normal feature in the near future. Best, Stefan