Hello,
Peihui,可以参考下是不是和这个问题类似?之前我在1.10.0也遇到过。
http://apache-flink.147419.n8.nabble.com/rocksdb-Could-not-restore-keyed-state-backend-for-KeyedProcessOperator-td2232.html#a2239
解决方式:
1. 使用hdfs作为状态后端不会报错
2. 升级至1.10.1使用rocksdb也不会出现该问题














在 2020-07-14 14:41:53,"Peihui He" <peihu...@gmail.com> 写道:
>Hi Yun,
>
>我这边用一个word count 例子,socket -> flatmap -> keyBy -> reduce ->
>print. 在flatmap 中当出现特定word的时候就抛出一个runtimeException。在1.9.2
>里面是可以从checkpoint中自动恢复上次做checkpoint的时候的状态,但是用1.10.0 就不能。环境是flink on
>yarn。
>
>Best wishes.
>
>Yun Tang <myas...@live.com> 于2020年7月14日周二 上午11:57写道:
>
>> Hi Peihui
>>
>> 你的异常应该是从增量Checkpoint恢复时,文件已经下载到本地了,做硬链时[1],发现源文件不见了,有很大的可能是当时发生了异常,导致restore流程退出了,所以这个问题应该不是root
>> cause。
>>
>> [1]
>> https://github.com/apache/flink/blob/2a3b642b1efb957f3d4f20502c40398786ab1469/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java#L473
>>
>>
>> 祝好
>> 唐云
>> ________________________________
>> From: Peihui He <peihu...@gmail.com>
>> Sent: Tuesday, July 14, 2020 10:42
>> To: user-zh@flink.apache.org <user-zh@flink.apache.org>
>> Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>>
>> hello,
>>
>>         当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示
>>
>>
>> Caused by: java.nio.file.NoSuchFileException:
>>
>> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/000009.sst
>> ->
>> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/000009.sst
>>
>> 配置和1.9.2 一样:
>> state.backend: rocksdb
>> state.checkpoints.dir: hdfs:///flink/checkpoints/wc/
>> state.savepoints.dir: hdfs:///flink/savepoints/wc/
>> state.backend.incremental: true
>>
>> 代码上都有
>>
>> env.enableCheckpointing(10000);
>>
>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,
>> org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)));
>>
>>
>>           是1.10.0 需要做什么特别配置么?
>>

回复