我又仔细读了文档和代码,显然org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint中是通过operator
ID进行匹配,也就是说,只有在新作业的operator id和savepoint中的id相匹配的情况下,才允许加载成功。 而operator
id的生成有两种方法:1.用户制定;2. 自动生成。 显然你的作业中没有指定相应的id,
因此flink会为你自动生成相应的id,但是operator的id生成方法对结构非常敏感,显然由于作业的修改导致了新旧两个作业的生成operator
id不一致。具体的可以参考文档 我提到的文档中Assiging Operator IDs这一节的内容。
注意: sql语句并不能和operator画上等号,也就是说一个sql语句并不代表一个operator。

方盛凯 <[email protected]> 于2020年6月9日周二 下午9:26写道:

>
> 可以查看org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint方法了解为什么加载失败了
> 对于这个问题,文档和代码中都提供了解决办法,就是使用 --allowNonRestoredState参数,具体使用方法参见文档
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html
>
> 如有错误,欢迎补充回答。
>
> 陈赋赟 <[email protected]> 于2020年6月8日周一 上午11:53写道:
>
>> 原先sql任务是:
>> CREATE TABLE A_source(...)
>> CREATE TABLE B_sink (...)
>> INSERT INTO B_sink
>> SELECT
>>      1
>> FROM
>>     A_source
>> ;
>> 我基于这个FlinkSQL任务生成了savepoint后,我重新修改为
>>
>>
>> CREATE TABLE A_source(...)
>> CREATE TABLE B_sink (...)
>> CREATE TABLE C_source(...)
>> CREATE TABLE D_sink (...)
>> INSERT INTO B_sink
>> SELECT
>>      1
>> FROM
>>     A_source
>> ;
>>
>>
>> INSERT INTO C_sink
>> SELECT
>>      1
>> FROM
>>     D_source
>> ;
>> 并基于Savepoint提交,结果显示
>>
>> Cannot map checkpoint/savepoint state for operator
>> 2e9c6b0c053878cef673bbe7d94ab037 to the new program, because the operator
>> is not available in the new program.
>> If you want to allow to skip this, you can set the
>> --allowNonRestoredState option on the CLI.
>>
>>
>> 想请教一下底层是因为什么原因导致了opertor匹配不上?
>
>

回复