Hi Juho,

As Sihua said, this shouldn't happen and indicates a bug. Did you only 
encounter this once or can you easily reproduce the problem?

Best,
Aljoscha

> On 15. May 2018, at 05:57, sihua zhou <summerle...@163.com> wrote:
> 
> Hi Juho,
> in fact, from your code I can't see any possible that the MapState could be 
> inconsistency with the timer, it's looks like a bug to me, because once the 
> checkpoint's complete and you haven't query the state in a customer thread 
> async, then the result of the checkpoint should be consistency. The only 
> case, I can see where the timer could be inconsistency with state is when the 
> task is shutting down, that case the backend maybe already closed but the 
> timer failed to shutdown, so that the time callback function may access a 
> closed backend. But it shouldn't be reason of your case. Maybe, could you 
> please provide us more information, like what type of backend are you using? 
> are you using the RocksDBBackend? and I think @Stefan may tell more about 
> this, and please correct me if I'm incorrect.
> 
> Best,
> Sihua
> 
> On 05/15/2018 01:48,Bowen Li<bowenl...@gmail.com> 
> <mailto:bowenl...@gmail.com> wrote: 
> Hi Juho,
> 
> You are right, there's no transactional guarantee on timers and state in 
> processElement(). They may end up with inconsistency if your job was 
> cancelled in the middle of processing an element.
> 
> To avoid the situation, the best programming practice is to always check if 
> the state you're trying to get is null or not.
> 
> I've also created https://issues.apache.org/jira/browse/FLINK-9362 
> <https://issues.apache.org/jira/browse/FLINK-9362> to document this. 
> 
> Thanks
> Bowen
> 
> 
> 
> On Mon, May 14, 2018 at 4:00 AM, Juho Autio <juho.au...@rovio.com 
> <mailto:juho.au...@rovio.com>> wrote:
> We have a Flink streaming job (1.5-SNAPSHOT) that uses timers to clear old 
> state. After restoring state from a checkpoint, it seems like a timer had 
> been restored, but not the data that was expected to be in a related MapState 
> if such timer has been added.
> 
> The way I see this is that there's a bug, either of these:
> - The writing of timers & map states to Flink state is not synchronized (or 
> maybe there are no such guarantees by design?)
> - Flink may restore a checkpoint that is actually corrupted/incomplete
> 
> Our code (simplified):
> 
>     private MapState<String, String> mapState;
> 
>     public void processElement(..) {
>             mapState.put("lastUpdated", ctx.timestamp().toString());
>             ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 
> stateRetentionMillis);
>     }
> 
>     public void onTimer(long timestamp, OnTimerContext ctx, ..) {
>         long lastUpdated = Long.parseLong(mapState.get("lastUpdated"));
>         if (timestamp >= lastUpdated + stateRetentionMillis) {
>             mapState.clear();
>         }
>     }
> 
> Normally this "just works". As you can see, it shouldn't be possible that 
> "lastUpdated" doesn't exist in state if timer was registered and onTimer gets 
> called.
> 
> However, after restoring state from a checkpoint, the job kept failing with 
> this error:
> 
> Caused by: java.lang.NumberFormatException: null
> at java.lang.Long.parseLong(Long.java:552)
> at java.lang.Long.parseLong(Long.java:631)
> at ..EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
> ..
> 
> So apparently onTimer was called but lastUpdated wasn't found in the MapState.
> 
> The background for restoring state in this case is not entirely clean. There 
> was an OS level issue "Too many open files" after running a job for ~11 days. 
> To fix that, we replaced the cluster with a new one and launched the Flink 
> job again. State was successfully restored from the latest checkpoint that 
> had been created by the "problematic execution". Now, I'm assuming that if 
> the state wouldn't have been created successfully, restoring wouldn't succeed 
> either – correct? This is just to rule out that the issue with state didn't 
> happen because the checkpoint files were somehow corrupted due to the Too 
> many open files problem.
> 
> Thank you all for your continued support!
> 
> P.S. I would be very much interested to hear if there's some cleaner way to 
> achieve this kind of TTL for keyed state in Flink.
> 
> 

Reply via email to