TaskManagers don't do any checkpointing but Operators that run in TaskManagers do.
Each operator, of which there are multiple running on multiple TMs in the cluster will write to a unique DFS directory. Something like: /checkpoints/job-xyz/checkpoint-1/operator-a/1 These individual checkpoints are not merged together into one directory but the handles to those directories are sent to the CheckpointCoordinator which creates a checkpoint that stores handles to all the states stored in DFS. Best, Aljoscha > On 4. Jan 2018, at 15:06, Jinhua Luo <luajit...@gmail.com> wrote: > > One task manager would create one rocksdb instance on its local > temporary dir, correct? > Since there is likely multiple task managers for one cluster, so how > they handle directory conflict, because one rocksdb instance is one > directory, that is, what I mentioned at first, how they merge rocksdb > instances and store it on the single distributed filesystem path? > > 2018-01-04 22:00 GMT+08:00 Aljoscha Krettek <aljos...@apache.org>: >> Ah I see. Currently the RocksDB backend will use one column in RocksDB per >> state that is registered. The states for different keys of one state are >> stored in one column. >> >>> On 4. Jan 2018, at 14:56, Jinhua Luo <luajit...@gmail.com> wrote: >>> >>> ok, I see. >>> >>> But as known, one rocksdb instance occupy one directory, so I am still >>> wondering what's the relationship between the states and rocksdb >>> instances. >>> >>> 2018-01-04 21:50 GMT+08:00 Aljoscha Krettek <aljos...@apache.org>: >>>> Each operator (which run in a TaskManager) will write its state to some >>>> location in HDFS (or any other DFS) and send a handle to this to the >>>> CheckpointCoordinator (which is running on the JobManager). The >>>> CheckpointCoordinator is collecting all the handles and creating one >>>> Uber-Handle, which describes the complete checkpoint. When restoring, the >>>> CheckpointCoordinator figures out which handles need to be sent to which >>>> operators for restoring. >>>> >>>> Best, >>>> Aljoscha >>>> >>>>> On 4. Jan 2018, at 14:44, Jinhua Luo <luajit...@gmail.com> wrote: >>>>> >>>>> OK, I think I get the point. >>>>> >>>>> But another question raises: how task managers merge their rocksdb >>>>> snapshot on a global single path? >>>>> >>>>> >>>>> 2018-01-04 21:30 GMT+08:00 Aljoscha Krettek <aljos...@apache.org>: >>>>>> Hi, >>>>>> >>>>>> The path you give to the constructor must be a path on some distributed >>>>>> filesystem, otherwise the data will be lost when the local machine >>>>>> crashes. As you mentioned correctly. >>>>>> >>>>>> RocksDB will keep files in a local directory (you can specify this using >>>>>> setDbStoragePath()) and when checkpointing will write to the checkpoint >>>>>> directory that you specified in the constructor. >>>>>> >>>>>> Best, >>>>>> Aljoscha >>>>>> >>>>>> >>>>>>> On 4. Jan 2018, at 14:23, Jinhua Luo <luajit...@gmail.com> wrote: >>>>>>> >>>>>>> I still do not understand the relationship between rocksdb backend and >>>>>>> the filesystem (here I refer to any filesystem impl, including local, >>>>>>> hdfs, s3). >>>>>>> >>>>>>> For example, when I specify the path to rocksdb backend: >>>>>>> env.setStateBackend(new RocksDBStateBackend("file:///data1/flinkapp")); >>>>>>> >>>>>>> What does it mean? >>>>>>> >>>>>>> Each task manager would save states to /data1/flinkapp on its machine? >>>>>>> But it seems no sense. Because when one of the machines crashes, the >>>>>>> job manager could not access the states on dead machine. >>>>>>> Or, each task manager creates rocksdb instance on temporary path, and >>>>>>> send snapshots to job manager, then job manager in turn saves them on >>>>>>> /data1/flinkapp on the job manager's machine? >>>>>>> >>>>>>> Could you give the data flow example? >>>>>>> >>>>>>> And another question is, when I turn off checkpointing (which is also >>>>>>> default cfg), what happens to the states processing? >>>>>>> >>>>>>> >>>>>>> >>>>>>> 2018-01-03 0:06 GMT+08:00 Timo Walther <twal...@apache.org>: >>>>>>>> Hi Jinhua, >>>>>>>> >>>>>>>> I will try to answer your questions: >>>>>>>> >>>>>>>> Flink checkpoints the state of each operator. For a Kafka consumer >>>>>>>> operator >>>>>>>> this is only the offset. For other operators (such as Windows or a >>>>>>>> ProcessFunction) the values/list/maps stored in the state are >>>>>>>> checkpointed. >>>>>>>> If you are interested in the internals, I would recommend this page >>>>>>>> [1]. >>>>>>>> Only the MemoryStateBackend sends entire states to the JobManager (see >>>>>>>> [2]). >>>>>>>> But you are right, this is a bottleneck and not very fault-tolerant. >>>>>>>> Usually, Flink assumes to have some distributed file system (such as >>>>>>>> HDFS) >>>>>>>> to which each Flink operator can be checkpointed in a fault-tolerant >>>>>>>> way. >>>>>>>> For the RocksDbStateBackend the local files are copied to HDFS as >>>>>>>> well. At >>>>>>>> the time of writing, only the RocksDBBackend supports incremental >>>>>>>> checkpoints. The JobManager can then read from HDFS and restore the >>>>>>>> operator >>>>>>>> on a different machine. >>>>>>>> >>>>>>>> Feel free to ask further questions. >>>>>>>> >>>>>>>> Regards, >>>>>>>> Timo >>>>>>>> >>>>>>>> [1] >>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/stream_checkpointing.html >>>>>>>> [2] >>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/state_backends.html >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Am 1/1/18 um 3:50 PM schrieb Jinhua Luo: >>>>>>>> >>>>>>>>> Hi All, >>>>>>>>> >>>>>>>>> I have two questions: >>>>>>>>> >>>>>>>>> a) does the records/elements themselves would be checkpointed? or just >>>>>>>>> record offset checkpointed? That is, what data included in the >>>>>>>>> checkpoint except for states? >>>>>>>>> >>>>>>>>> b) where flink stores the state globally? so that the job manager >>>>>>>>> could restore them on each task manger at failure restart. >>>>>>>>> >>>>>>>>> For the heap backend, all task managers would send states to job >>>>>>>>> manager, and job manager would save it in its heap, correct? >>>>>>>>> >>>>>>>>> For the fs/rocksdb backend, all task managers would save states >>>>>>>>> (incrementally or not) in local path temporarily, and send them (in >>>>>>>>> rocksdb snapshot format for the rocksdb case?) to the job manager at >>>>>>>>> checkpoint? >>>>>>>>> >>>>>>>>> The path we used to configure backend is the path on the job manager >>>>>>>>> machine but not on the task managers' machines? So that's the >>>>>>>>> bottleneck and single failure point? So it's better to use hdfs path >>>>>>>>> so that we could scale the storage and make it high availability as >>>>>>>>> well? >>>>>>>>> >>>>>>>>> Thank you all. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>> >>>> >>