我又仔细读了文档和代码,显然org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint中是通过operator ID进行匹配,也就是说,只有在新作业的operator id和savepoint中的id相匹配的情况下,才允许加载成功。 而operator id的生成有两种方法:1.用户制定;2. 自动生成。 显然你的作业中没有指定相应的id, 因此flink会为你自动生成相应的id,但是operator的id生成方法对结构非常敏感,显然由于作业的修改导致了新旧两个作业的生成operator id不一致。具体的可以参考文档 我提到的文档中Assiging Operator IDs这一节的内容。 注意: sql语句并不能和operator画上等号,也就是说一个sql语句并不代表一个operator。
方盛凯 <[email protected]> 于2020年6月9日周二 下午9:26写道: > > 可以查看org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint方法了解为什么加载失败了 > 对于这个问题,文档和代码中都提供了解决办法,就是使用 --allowNonRestoredState参数,具体使用方法参见文档 > https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html > > 如有错误,欢迎补充回答。 > > 陈赋赟 <[email protected]> 于2020年6月8日周一 上午11:53写道: > >> 原先sql任务是: >> CREATE TABLE A_source(...) >> CREATE TABLE B_sink (...) >> INSERT INTO B_sink >> SELECT >> 1 >> FROM >> A_source >> ; >> 我基于这个FlinkSQL任务生成了savepoint后,我重新修改为 >> >> >> CREATE TABLE A_source(...) >> CREATE TABLE B_sink (...) >> CREATE TABLE C_source(...) >> CREATE TABLE D_sink (...) >> INSERT INTO B_sink >> SELECT >> 1 >> FROM >> A_source >> ; >> >> >> INSERT INTO C_sink >> SELECT >> 1 >> FROM >> D_source >> ; >> 并基于Savepoint提交,结果显示 >> >> Cannot map checkpoint/savepoint state for operator >> 2e9c6b0c053878cef673bbe7d94ab037 to the new program, because the operator >> is not available in the new program. >> If you want to allow to skip this, you can set the >> --allowNonRestoredState option on the CLI. >> >> >> 想请教一下底层是因为什么原因导致了opertor匹配不上? > >
