按我的理解,streaming模式去读是允许checkpoint的(具体看各个connector的checkpoint逻辑),batch模式是一个阶段一个阶段的跑的,上一个task跑完的结果会放到磁盘等待下一个task拉取,task失败了就重新拉取上一个task的结果重新跑(没有看源码里具体实现细节,纯属个人的猜测,有懂行的大佬们可以纠正)


---- 回复的原邮件 ----
| 发件人 | ha.fen...@aisino.com<ha.fen...@aisino.com> |
| 发送日期 | 2024年2月2日 17:21 |
| 收件人 | user-zh<user-zh@flink.apache.org> |
| 主题 | Re: Re: Batch模式下,StatefulSinkWriter如何存储状态,以保证在failover或者job 
restart的情况避免从头读取数据 |
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new FsStateBackend("file:\\d:\\abc"));


发件人: jinzhuguang
发送时间: 2024-02-02 17:16
收件人: user-zh
主题: Re: Batch模式下,StatefulSinkWriter如何存储状态,以保证在failover或者job restart的情况避免从头读取数据
你是在batch模式下手动开启了checkpoint吗

2024年2月2日 17:11,ha.fen...@aisino.com 写道:

今天正好测试了这个问题,开启checkpoint后,读取一个文件内容,在checkpoints有记录时,停止程序,然后再从checkpoint读取启动,读取的记录并不是从最开始,这说明批处理下也会自动保存状态。

发件人: jinzhuguang
发送时间: 2024-02-02 16:47
收件人: user-zh
主题: Batch模式下,StatefulSinkWriter如何存储状态,以保证在failover或者job restart的情况避免从头读取数据
Flink 1.16.0

我在阅读FileSink的代码时发现,其依靠StatefulSinkWriter的snapshotState接口在checkpoint时存储当前的状态。

interface StatefulSinkWriter<InputT, WriterStateT> extends SinkWriter<InputT> {
/**
* @return The writer's state.
* @throws IOException if fail to snapshot writer's state.
*/
List<WriterStateT> snapshotState(long checkpointId) throws IOException;
}

然而,我了解到Flink在batch模式不会开启checkpoint机制,那我如何能够保证批任务的状态能够得到及时保存。

如果我在进行大规模数据的ETL操作,因为某些task失败导致任务重试,难道整个任务都要从头开始吗?

恳请各位大佬赐教

回复