Each operator (which run in a TaskManager) will write its state to some location in HDFS (or any other DFS) and send a handle to this to the CheckpointCoordinator (which is running on the JobManager). The CheckpointCoordinator is collecting all the handles and creating one Uber-Handle, which describes the complete checkpoint. When restoring, the CheckpointCoordinator figures out which handles need to be sent to which operators for restoring.
Best, Aljoscha > On 4. Jan 2018, at 14:44, Jinhua Luo <luajit...@gmail.com> wrote: > > OK, I think I get the point. > > But another question raises: how task managers merge their rocksdb > snapshot on a global single path? > > > 2018-01-04 21:30 GMT+08:00 Aljoscha Krettek <aljos...@apache.org>: >> Hi, >> >> The path you give to the constructor must be a path on some distributed >> filesystem, otherwise the data will be lost when the local machine crashes. >> As you mentioned correctly. >> >> RocksDB will keep files in a local directory (you can specify this using >> setDbStoragePath()) and when checkpointing will write to the checkpoint >> directory that you specified in the constructor. >> >> Best, >> Aljoscha >> >> >>> On 4. Jan 2018, at 14:23, Jinhua Luo <luajit...@gmail.com> wrote: >>> >>> I still do not understand the relationship between rocksdb backend and >>> the filesystem (here I refer to any filesystem impl, including local, >>> hdfs, s3). >>> >>> For example, when I specify the path to rocksdb backend: >>> env.setStateBackend(new RocksDBStateBackend("file:///data1/flinkapp")); >>> >>> What does it mean? >>> >>> Each task manager would save states to /data1/flinkapp on its machine? >>> But it seems no sense. Because when one of the machines crashes, the >>> job manager could not access the states on dead machine. >>> Or, each task manager creates rocksdb instance on temporary path, and >>> send snapshots to job manager, then job manager in turn saves them on >>> /data1/flinkapp on the job manager's machine? >>> >>> Could you give the data flow example? >>> >>> And another question is, when I turn off checkpointing (which is also >>> default cfg), what happens to the states processing? >>> >>> >>> >>> 2018-01-03 0:06 GMT+08:00 Timo Walther <twal...@apache.org>: >>>> Hi Jinhua, >>>> >>>> I will try to answer your questions: >>>> >>>> Flink checkpoints the state of each operator. For a Kafka consumer operator >>>> this is only the offset. For other operators (such as Windows or a >>>> ProcessFunction) the values/list/maps stored in the state are checkpointed. >>>> If you are interested in the internals, I would recommend this page [1]. >>>> Only the MemoryStateBackend sends entire states to the JobManager (see >>>> [2]). >>>> But you are right, this is a bottleneck and not very fault-tolerant. >>>> Usually, Flink assumes to have some distributed file system (such as HDFS) >>>> to which each Flink operator can be checkpointed in a fault-tolerant way. >>>> For the RocksDbStateBackend the local files are copied to HDFS as well. At >>>> the time of writing, only the RocksDBBackend supports incremental >>>> checkpoints. The JobManager can then read from HDFS and restore the >>>> operator >>>> on a different machine. >>>> >>>> Feel free to ask further questions. >>>> >>>> Regards, >>>> Timo >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/stream_checkpointing.html >>>> [2] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/state_backends.html >>>> >>>> >>>> >>>> Am 1/1/18 um 3:50 PM schrieb Jinhua Luo: >>>> >>>>> Hi All, >>>>> >>>>> I have two questions: >>>>> >>>>> a) does the records/elements themselves would be checkpointed? or just >>>>> record offset checkpointed? That is, what data included in the >>>>> checkpoint except for states? >>>>> >>>>> b) where flink stores the state globally? so that the job manager >>>>> could restore them on each task manger at failure restart. >>>>> >>>>> For the heap backend, all task managers would send states to job >>>>> manager, and job manager would save it in its heap, correct? >>>>> >>>>> For the fs/rocksdb backend, all task managers would save states >>>>> (incrementally or not) in local path temporarily, and send them (in >>>>> rocksdb snapshot format for the rocksdb case?) to the job manager at >>>>> checkpoint? >>>>> >>>>> The path we used to configure backend is the path on the job manager >>>>> machine but not on the task managers' machines? So that's the >>>>> bottleneck and single failure point? So it's better to use hdfs path >>>>> so that we could scale the storage and make it high availability as >>>>> well? >>>>> >>>>> Thank you all. >>>> >>>> >>>> >>