Yes, I'm rescaling from a checkpoint. > that behavior is not guaranteed yet
If restoring + rescaling a checkpoint is not supported properly, I don't understand why Flink doesn't entirely refuse to restore in that case? Note that I'm using a rather new 1.5-SNAPSHOT, not release 1.4. To be exact, the package was built at flink commit 8395508b0401353ed07375e22882e7 581d46ac0e. I also made this additional test: - launch again from the checkpoint with the original parallelism=8 - cancel with savepoint (this was after ~7 minutes, during which the job also created some new incremental checkpoints - restore the newly created savepoint with parallelism=16 - it works Now I have restored from the original checkpoint multiple times and it consistently fails with that java.lang.NumberFormatException when trying with parallelism=18. I was able to enable trace-level logs with this change (nothing changed in logback.xml): Added these lines to flink-1.5-SNAPSHOT/conf/log4j.properties: # To debug checkpoint restoring log4j.logger.org.apache.flink.contrib.streaming.state=TRACE log4j.logger.org.apache.flink.runtime.state=TRACE log4j.logger.org.apache.flink.runtime.checkpoint=TRACE log4j.logger.org.apache.flink.streaming.api.operators=TRACE log4j.logger.org.apache.flink.streaming.runtime.tasks=TRACE (This is confusing to me – I had understood that Flink would have migrated entirely to use logback configuration instead of log4j?) I modified the logs to remove sensitive information, but because I'm not 100% sure that I caught everything, I will share the logs personally with Stefan only. On Wed, May 16, 2018 at 5:22 AM, sihua zhou <summerle...@163.com> wrote: > Hi Juho, > if I'm not misunderstand, you saied your're rescaling the job from the > checkpoint? If yes, I think that behavior is not guaranteed yet, you can > find this on the doc https://ci.apache.org/proj > ects/flink/flink-docs-release-1.4/ops/state/checkpoints.html > #difference-to-savepoints. So, I not sure whether this is a "bug" at > current stage(personally I'd like to dig it out because currently we also > use the checkpoint like the way you are) ... > > Best, Sihua > > On 05/16/2018 01:46,Juho Autio<juho.au...@rovio.com> > <juho.au...@rovio.com> wrote: > > I was able to reproduce this error. > > I just happened to notice an important detail about the original failure: > - checkpoint was created with a 1-node cluster (parallelism=8) > - restored on a 2-node cluster (parallelism=16), caused that null > exception > > I tried restoring again from the problematic checkpoint again > - restored on a 1-node cluster, no problems > - restored on a 2-node cluster, getting the original error! > > So now I have a way to reproduce the bug. To me it seems like the > checkpoint itself is fine. The bug seems to be in redistributing the state > of a restored checkpoint to a higher parallelism. I only tested each > cluster size once (as described above) so it could also be coincidence, but > seems at least likely now that it's about the state redistribution. > > I'll try to follow up with those TRACE-level logs tomorrow. Today I tried > adding these to the logback.xml, but I didn't get anything else but INFO > level logs: > > <logger name="org.apache.flink.contrib.streaming.state" level="TRACE"> > <appender-ref ref="file"/> > </logger> > <logger name="org.apache.flink.runtime.state" level="TRACE"> > <appender-ref ref="file"/> > </logger> > <logger name="org.apache.flink.runtime.checkpoint" level="TRACE"> > <appender-ref ref="file"/> > </logger> > <logger name="org.apache.flink.streaming.api.operators" level="TRACE"> > <appender-ref ref="file"/> > </logger> > <logger name="org.apache.flink.streaming.runtime.tasks" level="TRACE"> > <appender-ref ref="file"/> > </logger> > > Maybe I need to edit the log4j.properties instead(?). Indeed it's Flink > 1.5-SNAPSHOT and the package has all of these in the conf/ dir: > > log4j-cli.properties > log4j-console.properties > log4j.properties > log4j-yarn-session.properties > logback-console.xml > logback.xml > logback-yarn.xml > > On Tue, May 15, 2018 at 11:49 AM, Stefan Richter < > s.rich...@data-artisans.com> wrote: > >> Hi, >> >> Am 15.05.2018 um 10:34 schrieb Juho Autio <juho.au...@rovio.com>: >> >> Ok, that should be possible to provide. Are there any specific packages >> to set on trace level? Maybe just go with org.apache.flink.* on TRACE? >> >> >> The following packages would be helpful: >> >> org.apache.flink.contrib.streaming.state.* >> org.apache.flink.runtime.state.* >> org.apache.flink.runtime.checkpoint.* >> org.apache.flink.streaming.api.operators.* >> org.apache.flink.streaming.runtime.tasks.* >> >> >> > did the „too many open files“ problem only happen with local recovery >> (asking since it should actually not add the the amount of open files) >> >> I think it happened in various places, maybe not when restoring.. Any way >> if the situation is like that, the system is pretty much unusable (on OS >> level), so it shouldn't matter too much which operation of the application >> it causes to fail? Any way I'll try to grab & share all log lines that say >> "Too Many Open Files".. >> >> > and did you deactivate it on the second cluster for the restart or >> changed your OS settings? >> >> No, didn't change anything except for increasing the ulimit on OS to >> prevent this from happening again. Note that the system only ran out of >> files after ~11 days of uptime. During that time there had been some local >> recoveries. This makes me wonder though, could it be that many local >> recoveries eventually caused this – could it be that in the occasion of >> local recovery some "old" files are left open, making the system eventually >> run out of files? >> >> >> >> From the way how local recovery works with incremental RocksDB >> checkpoints, I would not assume that it is the cause of the problem. In >> this particular case, the number of opened files on a local FS should not >> be higher than the number without local recovery. Maybe it is just a matter >> of the OS limit and the number of operators with a RocksDB backend running >> on the machine and the amount of files managed by all those RocksDB >> instances that simply exceed the limit. If you have an overview how many >> parallel operator instances with keyed state were running on the machine >> and assume some reasonable number of files per RocksDB instance and the >> limit configured in your OS, could that be the case? >> >> >> Thanks! >> >> >> Thanks for your help! >> >> >> On Tue, May 15, 2018 at 11:17 AM, Stefan Richter < >> s.rich...@data-artisans.com> wrote: >> >>> Btw having a trace level log of a restart from a problematic checkpoint >>> could actually be helpful if we cannot find the problem from the previous >>> points. This can give a more detailed view of what checkpoint files are >>> mapped to which operator. >>> >>> I am having one more question: did the „too many open files“ problem >>> only happen with local recovery (asking since it should actually not add >>> the the amount of open files), and did you deactivate it on the second >>> cluster for the restart or changed your OS settings? >>> >>> >>> Am 15.05.2018 um 10:09 schrieb Stefan Richter < >>> s.rich...@data-artisans.com>: >>> >>> What I would like to see from the logs is (also depending a bit on your >>> log level): >>> >>> - all exceptions. >>> - in which context exactly the „too many open files“ problem occurred, >>> because I think for checkpoint consistency it should not matter as a >>> checkpoint with such a problem should never succeed. >>> - files that are written for checkpoints/savepoints. >>> - completed checkpoints/savepoints ids. >>> - the restored checkpoint/savepoint id. >>> - files that are loaded on restore. >>> >>> Am 15.05.2018 um 10:02 schrieb Juho Autio <juho.au...@rovio.com>: >>> >>> Thanks all. I'll have to see about sharing the logs & configuration.. >>> >>> Is there something special that you'd like to see from the logs? It may >>> be easier for me to get specific lines and obfuscate sensitive information >>> instead of trying to do that for the full logs. >>> >>> We basically have: RocksDBStateBackend with >>> enableIncrementalCheckpointing=true, >>> external state path on s3. >>> >>> The code that we use is: >>> >>> env.setStateBackend(getStateBackend(statePath, new >>> RocksDBStateBackend(statePath, true))); >>> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(para >>> ms.getLong("checkpoint.minPause", 60 * 1000)); >>> env.getCheckpointConfig().setMaxConcurrentCheckpoints(params >>> .getInt("checkpoint.maxConcurrent", 1)); >>> >>> env.getCheckpointConfig().setCheckpointTimeout(params.getLong("checkpoint.timeout", >>> 10 * 60 * 1000)); >>> env.getCheckpointConfig().enableExternalizedCheckpoints(Chec >>> kpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); >>> >>> The problematic state that we tried to use was a checkpoint created with >>> this conf. >>> >>> > Are you using the local recovery feature? >>> >>> Yes, and in this particular case the job was constantly >>> failing/restarting because of Too Many Open Files. So we terminated the >>> cluster entirely, created a new one, and launched a new job by specifying >>> the latest checkpoint path to restore state from. >>> >>> This is the only time I have seen this error happen with timer state. I >>> still have that bad checkpoint data on s3, so I might be able to try to >>> restore it again if needed to debug it. But that would require some >>> tweaking, because I don't want to tangle with the same kafka consumer group >>> offsets or send old data again to production endpoint. >>> >>> Please keep in mind that there was that Too Many Open Files issue on the >>> cluster that created the problematic checkpoint, if you think that's >>> relevant. >>> >>> On Tue, May 15, 2018 at 10:39 AM, Stefan Richter < >>> s.rich...@data-artisans.com> wrote: >>> >>>> Hi, >>>> >>>> I agree, this looks like a bug. Can you tell us your exact >>>> configuration of the state backend, e.g. if you are using incremental >>>> checkpoints or not. Are you using the local recovery feature? Are you >>>> restarting the job from a checkpoint or a savepoint? Can you provide logs >>>> for both the job that failed and the restarted job? >>>> >>>> Best, >>>> Stefan >>>> >>>> >>>> Am 14.05.2018 um 13:00 schrieb Juho Autio <juho.au...@rovio.com>: >>>> >>>> We have a Flink streaming job (1.5-SNAPSHOT) that uses timers to clear >>>> old state. After restoring state from a checkpoint, it seems like a timer >>>> had been restored, but not the data that was expected to be in a related >>>> MapState if such timer has been added. >>>> >>>> The way I see this is that there's a bug, either of these: >>>> - The writing of timers & map states to Flink state is not synchronized >>>> (or maybe there are no such guarantees by design?) >>>> - Flink may restore a checkpoint that is actually corrupted/incomplete >>>> >>>> Our code (simplified): >>>> >>>> private MapState<String, String> mapState; >>>> >>>> public void processElement(..) { >>>> mapState.put("lastUpdated", ctx.timestamp().toString()); >>>> ctx.timerService().registerEventTimeTimer(ctx.timestamp() >>>> + stateRetentionMillis); >>>> } >>>> >>>> public void onTimer(long timestamp, OnTimerContext ctx, ..) { >>>> long lastUpdated = Long.parseLong(mapState.get("lastUpdated")); >>>> if (timestamp >= lastUpdated + stateRetentionMillis) { >>>> mapState.clear(); >>>> } >>>> } >>>> >>>> Normally this "just works". As you can see, it shouldn't be possible >>>> that "lastUpdated" doesn't exist in state if timer was registered and >>>> onTimer gets called. >>>> >>>> However, after restoring state from a checkpoint, the job kept failing >>>> with this error: >>>> >>>> Caused by: java.lang.NumberFormatException: null >>>> at java.lang.Long.parseLong(Long.java:552) >>>> at java.lang.Long.parseLong(Long.java:631) >>>> at ..EnrichEventProcessFunction.onTimer(EnrichEventProcessFunct >>>> ion.java:136) >>>> .. >>>> >>>> So apparently onTimer was called but lastUpdated wasn't found in the >>>> MapState. >>>> >>>> The background for restoring state in this case is not entirely clean. >>>> There was an OS level issue "Too many open files" after running a job for >>>> ~11 days. To fix that, we replaced the cluster with a new one and launched >>>> the Flink job again. State was successfully restored from the latest >>>> checkpoint that had been created by the "problematic execution". Now, I'm >>>> assuming that if the state wouldn't have been created successfully, >>>> restoring wouldn't succeed either – correct? This is just to rule out that >>>> the issue with state didn't happen because the checkpoint files were >>>> somehow corrupted due to the Too many open files problem. >>>> >>>> Thank you all for your continued support! >>>> >>>> P.S. I would be very much interested to hear if there's some cleaner >>>> way to achieve this kind of TTL for keyed state in Flink. >>>> >>>> >>>> >>> >>> >>> >>> >> >> >> > >