Hi,
I have a bref loop of the code that related to the restoring of incremental
checkpoint, not abvious bug could be found. But there is a suspicious loophole
that may lead to data loss, the suspicious code is pasted below.
while (iterator.isValid()) {
int keyGroup = 0;
for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j];
}
if (stateBackend.keyGroupRange.contains(keyGroup)) {
stateBackend.db.put(targetColumnFamilyHandle,
iterator.key(), iterator.value());
}
iterator.next();
}
we only use the iterator.isValid() to check whether we have reached the end of
the iterator, but if we refer to RocksDB's wiki
https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we can find
that iterator.isValid() is indeed not enough, it may return false may also
because the there is a internal error of RocksDB. So, a safer way is to called
iterator.status() to check whether everthing is ok. I'm a bit want to fire a PR
for this now, because in RocksDBMapState (we guarantee to support) we also use
the iterator without checking the iterator.status().
@Stefan What do you think?
Best, Sihua
On 05/16/2018 10:22,sihua zhou<[email protected]> wrote:
Hi Juho,
if I'm not misunderstand, you saied your're rescaling the job from the
checkpoint? If yes, I think that behavior is not guaranteed yet, you can find
this on the doc
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/checkpoints.html#difference-to-savepoints.
So, I not sure whether this is a "bug" at current stage(personally I'd like to
dig it out because currently we also use the checkpoint like the way you are)
...
Best, Sihua
On 05/16/2018 01:46,Juho Autio<[email protected]> wrote:
I was able to reproduce this error.
I just happened to notice an important detail about the original failure:
- checkpoint was created with a 1-node cluster (parallelism=8)
- restored on a 2-node cluster (parallelism=16), caused that null exception
I tried restoring again from the problematic checkpoint again
- restored on a 1-node cluster, no problems
- restored on a 2-node cluster, getting the original error!
So now I have a way to reproduce the bug. To me it seems like the checkpoint
itself is fine. The bug seems to be in redistributing the state of a restored
checkpoint to a higher parallelism. I only tested each cluster size once (as
described above) so it could also be coincidence, but seems at least likely now
that it's about the state redistribution.
I'll try to follow up with those TRACE-level logs tomorrow. Today I tried
adding these to the logback.xml, but I didn't get anything else but INFO level
logs:
<logger name="org.apache.flink.contrib.streaming.state" level="TRACE">
<appender-ref ref="file"/>
</logger>
<logger name="org.apache.flink.runtime.state" level="TRACE">
<appender-ref ref="file"/>
</logger>
<logger name="org.apache.flink.runtime.checkpoint" level="TRACE">
<appender-ref ref="file"/>
</logger>
<logger name="org.apache.flink.streaming.api.operators" level="TRACE">
<appender-ref ref="file"/>
</logger>
<logger name="org.apache.flink.streaming.runtime.tasks" level="TRACE">
<appender-ref ref="file"/>
</logger>
Maybe I need to edit the log4j.properties instead(?). Indeed it's Flink
1.5-SNAPSHOT and the package has all of these in the conf/ dir:
log4j-cli.properties
log4j-console.properties
log4j.properties
log4j-yarn-session.properties
logback-console.xml
logback.xml
logback-yarn.xml
On Tue, May 15, 2018 at 11:49 AM, Stefan Richter <[email protected]>
wrote:
Hi,
Am 15.05.2018 um 10:34 schrieb Juho Autio <[email protected]>:
Ok, that should be possible to provide. Are there any specific packages to set
on trace level? Maybe just go with org.apache.flink.* on TRACE?
The following packages would be helpful:
org.apache.flink.contrib.streaming.state.*
org.apache.flink.runtime.state.*
org.apache.flink.runtime.checkpoint.*
org.apache.flink.streaming.api.operators.*
org.apache.flink.streaming.runtime.tasks.*
> did the „too many open files“ problem only happen with local recovery (asking
> since it should actually not add the the amount of open files)
I think it happened in various places, maybe not when restoring.. Any way if
the situation is like that, the system is pretty much unusable (on OS level),
so it shouldn't matter too much which operation of the application it causes to
fail? Any way I'll try to grab & share all log lines that say "Too Many Open
Files"..
> and did you deactivate it on the second cluster for the restart or changed
> your OS settings?
No, didn't change anything except for increasing the ulimit on OS to prevent
this from happening again. Note that the system only ran out of files after ~11
days of uptime. During that time there had been some local recoveries. This
makes me wonder though, could it be that many local recoveries eventually
caused this – could it be that in the occasion of local recovery some "old"
files are left open, making the system eventually run out of files?
From the way how local recovery works with incremental RocksDB checkpoints, I
would not assume that it is the cause of the problem. In this particular case,
the number of opened files on a local FS should not be higher than the number
without local recovery. Maybe it is just a matter of the OS limit and the
number of operators with a RocksDB backend running on the machine and the
amount of files managed by all those RocksDB instances that simply exceed the
limit. If you have an overview how many parallel operator instances with keyed
state were running on the machine and assume some reasonable number of files
per RocksDB instance and the limit configured in your OS, could that be the
case?
Thanks!
Thanks for your help!
On Tue, May 15, 2018 at 11:17 AM, Stefan Richter <[email protected]>
wrote:
Btw having a trace level log of a restart from a problematic checkpoint could
actually be helpful if we cannot find the problem from the previous points.
This can give a more detailed view of what checkpoint files are mapped to which
operator.
I am having one more question: did the „too many open files“ problem only
happen with local recovery (asking since it should actually not add the the
amount of open files), and did you deactivate it on the second cluster for the
restart or changed your OS settings?
Am 15.05.2018 um 10:09 schrieb Stefan Richter <[email protected]>:
What I would like to see from the logs is (also depending a bit on your log
level):
- all exceptions.
- in which context exactly the „too many open files“ problem occurred, because
I think for checkpoint consistency it should not matter as a checkpoint with
such a problem should never succeed.
- files that are written for checkpoints/savepoints.
- completed checkpoints/savepoints ids.
- the restored checkpoint/savepoint id.
- files that are loaded on restore.
Am 15.05.2018 um 10:02 schrieb Juho Autio <[email protected]>:
Thanks all. I'll have to see about sharing the logs & configuration..
Is there something special that you'd like to see from the logs? It may be
easier for me to get specific lines and obfuscate sensitive information instead
of trying to do that for the full logs.
We basically have: RocksDBStateBackend with
enableIncrementalCheckpointing=true, external state path on s3.
The code that we use is:
env.setStateBackend(getStateBackend(statePath, new
RocksDBStateBackend(statePath, true)));
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(params.getLong("checkpoint.minPause",
60 * 1000));
env.getCheckpointConfig().setMaxConcurrentCheckpoints(params.getInt("checkpoint.maxConcurrent",
1));
env.getCheckpointConfig().setCheckpointTimeout(params.getLong("checkpoint.timeout",
10 * 60 * 1000));
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
The problematic state that we tried to use was a checkpoint created with this
conf.
> Are you using the local recovery feature?
Yes, and in this particular case the job was constantly failing/restarting
because of Too Many Open Files. So we terminated the cluster entirely, created
a new one, and launched a new job by specifying the latest checkpoint path to
restore state from.
This is the only time I have seen this error happen with timer state. I still
have that bad checkpoint data on s3, so I might be able to try to restore it
again if needed to debug it. But that would require some tweaking, because I
don't want to tangle with the same kafka consumer group offsets or send old
data again to production endpoint.
Please keep in mind that there was that Too Many Open Files issue on the
cluster that created the problematic checkpoint, if you think that's relevant.
On Tue, May 15, 2018 at 10:39 AM, Stefan Richter <[email protected]>
wrote:
Hi,
I agree, this looks like a bug. Can you tell us your exact configuration of the
state backend, e.g. if you are using incremental checkpoints or not. Are you
using the local recovery feature? Are you restarting the job from a checkpoint
or a savepoint? Can you provide logs for both the job that failed and the
restarted job?
Best,
Stefan
Am 14.05.2018 um 13:00 schrieb Juho Autio <[email protected]>:
We have a Flink streaming job (1.5-SNAPSHOT) that uses timers to clear old
state. After restoring state from a checkpoint, it seems like a timer had been
restored, but not the data that was expected to be in a related MapState if
such timer has been added.
The way I see this is that there's a bug, either of these:
- The writing of timers & map states to Flink state is not synchronized (or
maybe there are no such guarantees by design?)
- Flink may restore a checkpoint that is actually corrupted/incomplete
Our code (simplified):
private MapState<String, String> mapState;
public void processElement(..) {
mapState.put("lastUpdated", ctx.timestamp().toString());
ctx.timerService().registerEventTimeTimer(ctx.timestamp() +
stateRetentionMillis);
}
public void onTimer(long timestamp, OnTimerContext ctx, ..) {
long lastUpdated = Long.parseLong(mapState.get("lastUpdated"));
if (timestamp >= lastUpdated + stateRetentionMillis) {
mapState.clear();
}
}
Normally this "just works". As you can see, it shouldn't be possible that
"lastUpdated" doesn't exist in state if timer was registered and onTimer gets
called.
However, after restoring state from a checkpoint, the job kept failing with
this error:
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at ..EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
..
So apparently onTimer was called but lastUpdated wasn't found in the MapState.
The background for restoring state in this case is not entirely clean. There
was an OS level issue "Too many open files" after running a job for ~11 days.
To fix that, we replaced the cluster with a new one and launched the Flink job
again. State was successfully restored from the latest checkpoint that had been
created by the "problematic execution". Now, I'm assuming that if the state
wouldn't have been created successfully, restoring wouldn't succeed either –
correct? This is just to rule out that the issue with state didn't happen
because the checkpoint files were somehow corrupted due to the Too many open
files problem.
Thank you all for your continued support!
P.S. I would be very much interested to hear if there's some cleaner way to
achieve this kind of TTL for keyed state in Flink.