Thanks, and I will read the codes to get more understanding. Let me repeat another question, what happen if the checkpoing is disabled (by default, as known)? Does the state still saved?
2018-01-04 22:48 GMT+08:00 Aljoscha Krettek <[email protected]>: > TaskManagers don't do any checkpointing but Operators that run in > TaskManagers do. > > Each operator, of which there are multiple running on multiple TMs in the > cluster will write to a unique DFS directory. Something like: > /checkpoints/job-xyz/checkpoint-1/operator-a/1 > > These individual checkpoints are not merged together into one directory but > the handles to those directories are sent to the CheckpointCoordinator which > creates a checkpoint that stores handles to all the states stored in DFS. > > Best, > Aljoscha > >> On 4. Jan 2018, at 15:06, Jinhua Luo <[email protected]> wrote: >> >> One task manager would create one rocksdb instance on its local >> temporary dir, correct? >> Since there is likely multiple task managers for one cluster, so how >> they handle directory conflict, because one rocksdb instance is one >> directory, that is, what I mentioned at first, how they merge rocksdb >> instances and store it on the single distributed filesystem path? >> >> 2018-01-04 22:00 GMT+08:00 Aljoscha Krettek <[email protected]>: >>> Ah I see. Currently the RocksDB backend will use one column in RocksDB per >>> state that is registered. The states for different keys of one state are >>> stored in one column. >>> >>>> On 4. Jan 2018, at 14:56, Jinhua Luo <[email protected]> wrote: >>>> >>>> ok, I see. >>>> >>>> But as known, one rocksdb instance occupy one directory, so I am still >>>> wondering what's the relationship between the states and rocksdb >>>> instances. >>>> >>>> 2018-01-04 21:50 GMT+08:00 Aljoscha Krettek <[email protected]>: >>>>> Each operator (which run in a TaskManager) will write its state to some >>>>> location in HDFS (or any other DFS) and send a handle to this to the >>>>> CheckpointCoordinator (which is running on the JobManager). The >>>>> CheckpointCoordinator is collecting all the handles and creating one >>>>> Uber-Handle, which describes the complete checkpoint. When restoring, the >>>>> CheckpointCoordinator figures out which handles need to be sent to which >>>>> operators for restoring. >>>>> >>>>> Best, >>>>> Aljoscha >>>>> >>>>>> On 4. Jan 2018, at 14:44, Jinhua Luo <[email protected]> wrote: >>>>>> >>>>>> OK, I think I get the point. >>>>>> >>>>>> But another question raises: how task managers merge their rocksdb >>>>>> snapshot on a global single path? >>>>>> >>>>>> >>>>>> 2018-01-04 21:30 GMT+08:00 Aljoscha Krettek <[email protected]>: >>>>>>> Hi, >>>>>>> >>>>>>> The path you give to the constructor must be a path on some distributed >>>>>>> filesystem, otherwise the data will be lost when the local machine >>>>>>> crashes. As you mentioned correctly. >>>>>>> >>>>>>> RocksDB will keep files in a local directory (you can specify this >>>>>>> using setDbStoragePath()) and when checkpointing will write to the >>>>>>> checkpoint directory that you specified in the constructor. >>>>>>> >>>>>>> Best, >>>>>>> Aljoscha >>>>>>> >>>>>>> >>>>>>>> On 4. Jan 2018, at 14:23, Jinhua Luo <[email protected]> wrote: >>>>>>>> >>>>>>>> I still do not understand the relationship between rocksdb backend and >>>>>>>> the filesystem (here I refer to any filesystem impl, including local, >>>>>>>> hdfs, s3). >>>>>>>> >>>>>>>> For example, when I specify the path to rocksdb backend: >>>>>>>> env.setStateBackend(new RocksDBStateBackend("file:///data1/flinkapp")); >>>>>>>> >>>>>>>> What does it mean? >>>>>>>> >>>>>>>> Each task manager would save states to /data1/flinkapp on its machine? >>>>>>>> But it seems no sense. Because when one of the machines crashes, the >>>>>>>> job manager could not access the states on dead machine. >>>>>>>> Or, each task manager creates rocksdb instance on temporary path, and >>>>>>>> send snapshots to job manager, then job manager in turn saves them on >>>>>>>> /data1/flinkapp on the job manager's machine? >>>>>>>> >>>>>>>> Could you give the data flow example? >>>>>>>> >>>>>>>> And another question is, when I turn off checkpointing (which is also >>>>>>>> default cfg), what happens to the states processing? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> 2018-01-03 0:06 GMT+08:00 Timo Walther <[email protected]>: >>>>>>>>> Hi Jinhua, >>>>>>>>> >>>>>>>>> I will try to answer your questions: >>>>>>>>> >>>>>>>>> Flink checkpoints the state of each operator. For a Kafka consumer >>>>>>>>> operator >>>>>>>>> this is only the offset. For other operators (such as Windows or a >>>>>>>>> ProcessFunction) the values/list/maps stored in the state are >>>>>>>>> checkpointed. >>>>>>>>> If you are interested in the internals, I would recommend this page >>>>>>>>> [1]. >>>>>>>>> Only the MemoryStateBackend sends entire states to the JobManager >>>>>>>>> (see [2]). >>>>>>>>> But you are right, this is a bottleneck and not very fault-tolerant. >>>>>>>>> Usually, Flink assumes to have some distributed file system (such as >>>>>>>>> HDFS) >>>>>>>>> to which each Flink operator can be checkpointed in a fault-tolerant >>>>>>>>> way. >>>>>>>>> For the RocksDbStateBackend the local files are copied to HDFS as >>>>>>>>> well. At >>>>>>>>> the time of writing, only the RocksDBBackend supports incremental >>>>>>>>> checkpoints. The JobManager can then read from HDFS and restore the >>>>>>>>> operator >>>>>>>>> on a different machine. >>>>>>>>> >>>>>>>>> Feel free to ask further questions. >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> Timo >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/stream_checkpointing.html >>>>>>>>> [2] >>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/state_backends.html >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Am 1/1/18 um 3:50 PM schrieb Jinhua Luo: >>>>>>>>> >>>>>>>>>> Hi All, >>>>>>>>>> >>>>>>>>>> I have two questions: >>>>>>>>>> >>>>>>>>>> a) does the records/elements themselves would be checkpointed? or >>>>>>>>>> just >>>>>>>>>> record offset checkpointed? That is, what data included in the >>>>>>>>>> checkpoint except for states? >>>>>>>>>> >>>>>>>>>> b) where flink stores the state globally? so that the job manager >>>>>>>>>> could restore them on each task manger at failure restart. >>>>>>>>>> >>>>>>>>>> For the heap backend, all task managers would send states to job >>>>>>>>>> manager, and job manager would save it in its heap, correct? >>>>>>>>>> >>>>>>>>>> For the fs/rocksdb backend, all task managers would save states >>>>>>>>>> (incrementally or not) in local path temporarily, and send them (in >>>>>>>>>> rocksdb snapshot format for the rocksdb case?) to the job manager at >>>>>>>>>> checkpoint? >>>>>>>>>> >>>>>>>>>> The path we used to configure backend is the path on the job manager >>>>>>>>>> machine but not on the task managers' machines? So that's the >>>>>>>>>> bottleneck and single failure point? So it's better to use hdfs path >>>>>>>>>> so that we could scale the storage and make it high availability as >>>>>>>>>> well? >>>>>>>>>> >>>>>>>>>> Thank you all. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>> >>>>> >>> >
