Hi
   你可以参考这里[1] 自己进行一些修改尝试,来分析 metadata 文件
[1]
https://github.com/apache/flink/blob/master/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
Best,
Congxian


Harold.Miao <[email protected]> 于2020年9月15日周二 下午1:58写道:

> 是同一个insert任务, 只是重启任务的时候 ,我加了这些代码,构造一个 SavepointRestoreSettings  来恢复cp
>
> 请教   我如何判断cp真正写入了hdfs呢,meta文件有什么工具可以解析吗
>
> 谢谢
>
> Jark Wu <[email protected]> 于2020年9月15日周二 上午11:31写道:
>
> > 是不是你的 cp 恢复的代码,没有执行任何的 insert into 语句?
> >
> > On Mon, 14 Sep 2020 at 20:15, Harold.Miao <[email protected]> wrote:
> >
> > > 还有一点是 我们修改了sql-client代码, 让任务从cp恢复,修改如下
> > >
> > > private StreamExecutionEnvironment createStreamExecutionEnvironment() {
> > >    final StreamExecutionEnvironment env =
> > > StreamExecutionEnvironment.getExecutionEnvironment();
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > *   LOG.info("restore cp exist: {}",
> > > environment.getExecution().getRestoreSp().isPresent());   if
> > > (environment.getExecution().getRestoreSp().isPresent()) {
> > > LOG.info("restore cp path: {}",
> > > environment.getExecution().getRestoreSp().get());      if
> > > (!environment.getExecution().getRestoreSp().get().contains("none")) {
> > >        SavepointRestoreSettings savepointRestoreSettings =
> > >
> > >
> >
> SavepointRestoreSettings.forPath(environment.getExecution().getRestoreSp().get(),
> > > true);
> > >
> >
> env.getStreamGraph().setSavepointRestoreSettings(savepointRestoreSettings);
> > >      }   }*
> > >    // for TimeCharacteristic validation in StreamTableEnvironmentImpl
> > >
> > >
> >
> env.setStreamTimeCharacteristic(environment.getExecution().getTimeCharacteristic());
> > >    if (env.getStreamTimeCharacteristic() ==
> > TimeCharacteristic.EventTime) {
> > >
> > >
> >
> env.getConfig().setAutoWatermarkInterval(environment.getExecution().getPeriodicWatermarksInterval());
> > >    }
> > >    return env;
> > > }
> > >
> > >
> > > 传入上面那个只有meta文件地址的时候报错如下:
> > >
> > > Exception in thread "main"
> > > org.apache.flink.table.client.SqlClientException: Unexpected
> > > exception. This is a bug. Please consider filing an issue.
> > >         at
> > org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
> > > Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
> > > Could not create execution context.
> > >         at
> > >
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:879)
> > >         at
> > >
> >
> org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227)
> > >         at
> > > org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
> > >         at
> > org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
> > > Caused by: java.lang.IllegalStateException: No operators defined in
> > > streaming topology. Cannot execute.
> > >         at
> > >
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1870)
> > >         at
> > >
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1861)
> > >         at
> > >
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1846)
> > >         at
> > >
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1834)
> > >         at
> > >
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.createStreamExecutionEnvironment(ExecutionContext.java:691)
> > >         at
> > >
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.createTableEnvironment(ExecutionContext.java:593)
> > >         at
> > >
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:498)
> > >         at
> > >
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:184)
> > >         at
> > >
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:137)
> > >         at
> > >
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:868)
> > >         ... 3 more
> > >
> > >
> > > 错误很明显的显示没有算子的state
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > Congxian Qiu <[email protected]> 于2020年9月14日周一 下午7:53写道:
> > >
> > > > Hi
> > > >    如果你的 state 都非常小的话,可能就会保存在 meta 文件中了,这样的话就只有 _metadata
> > > > 这一个文件的。具体逻辑可以看一下这里[1]
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/9b0fb562898b809b860cf0065ded7a45c49300af/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L442
> > > > Best,
> > > > Congxian
> > > >
> > > >
> > > > Harold.Miao <[email protected]> 于2020年9月14日周一 下午6:44写道:
> > > >
> > > > > hi  all
> > > > >
> > > > > flink 版本: 1.11.1
> > > > >
> > > > > 我们利用sql-client提交任务, flink-conf.yaml配置如下
> > > > >
> > > > > state.backend: filesystem
> > > > > state.backend.fs.checkpointdir:
> > > > >
> > >
> hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-data/23252
> > > > > state.checkpoints.dir:
> > > > >
> > >
> hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23252
> > > > > state.savepoints.dir:
> > > > > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/savepoint/23252
> > > > >
> > > > > execution.checkpointing.externalized-checkpoint-retention:
> > > > > RETAIN_ON_CANCELLATION
> > > > > execution.checkpointing.interval: 60s
> > > > > execution.checkpointing.mode: EXACTLY_ONCE
> > > > > jobmanager.execution.failover-strategy: full
> > > > > state.backend.incremental: true
> > > > >
> > > > >
> > > > > 任务运行后,在UI界面上看checkpoint都成功了。 但是hdfs上面却一直只有一个meta文件
> > > > >
> > > > > 类似下面:
> > > > >
> > > > > hdfs://
> > > > >
> > > >
> > >
> >
> 10.218.60.57:8020/ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23250/c72c1ee4362c3d0ba72db32698363fcf/chk-5/_metadata
> > > > >
> > > > > 除了这个文件,其他什么都没有。
> > > > >
> > > > > 我们的源是kafka,kafka肯定会保存state的。
> > > > >
> > > > >
> > > > > 请教大家这是什么原因导致的呢
> > > > >
> > > > >
> > > > > 谢谢
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > >
> > > > > Best Regards,
> > > > > Harold Miao
> > > > >
> > > >
> > >
> > >
> > > --
> > >
> > > Best Regards,
> > > Harold Miao
> > >
> >
>
>
> --
>
> Best Regards,
> Harold Miao
>

回复