Ah I see. Currently the RocksDB backend will use one column in RocksDB per state that is registered. The states for different keys of one state are stored in one column.
> On 4. Jan 2018, at 14:56, Jinhua Luo <luajit...@gmail.com> wrote: > > ok, I see. > > But as known, one rocksdb instance occupy one directory, so I am still > wondering what's the relationship between the states and rocksdb > instances. > > 2018-01-04 21:50 GMT+08:00 Aljoscha Krettek <aljos...@apache.org>: >> Each operator (which run in a TaskManager) will write its state to some >> location in HDFS (or any other DFS) and send a handle to this to the >> CheckpointCoordinator (which is running on the JobManager). The >> CheckpointCoordinator is collecting all the handles and creating one >> Uber-Handle, which describes the complete checkpoint. When restoring, the >> CheckpointCoordinator figures out which handles need to be sent to which >> operators for restoring. >> >> Best, >> Aljoscha >> >>> On 4. Jan 2018, at 14:44, Jinhua Luo <luajit...@gmail.com> wrote: >>> >>> OK, I think I get the point. >>> >>> But another question raises: how task managers merge their rocksdb >>> snapshot on a global single path? >>> >>> >>> 2018-01-04 21:30 GMT+08:00 Aljoscha Krettek <aljos...@apache.org>: >>>> Hi, >>>> >>>> The path you give to the constructor must be a path on some distributed >>>> filesystem, otherwise the data will be lost when the local machine >>>> crashes. As you mentioned correctly. >>>> >>>> RocksDB will keep files in a local directory (you can specify this using >>>> setDbStoragePath()) and when checkpointing will write to the checkpoint >>>> directory that you specified in the constructor. >>>> >>>> Best, >>>> Aljoscha >>>> >>>> >>>>> On 4. Jan 2018, at 14:23, Jinhua Luo <luajit...@gmail.com> wrote: >>>>> >>>>> I still do not understand the relationship between rocksdb backend and >>>>> the filesystem (here I refer to any filesystem impl, including local, >>>>> hdfs, s3). >>>>> >>>>> For example, when I specify the path to rocksdb backend: >>>>> env.setStateBackend(new RocksDBStateBackend("file:///data1/flinkapp")); >>>>> >>>>> What does it mean? >>>>> >>>>> Each task manager would save states to /data1/flinkapp on its machine? >>>>> But it seems no sense. Because when one of the machines crashes, the >>>>> job manager could not access the states on dead machine. >>>>> Or, each task manager creates rocksdb instance on temporary path, and >>>>> send snapshots to job manager, then job manager in turn saves them on >>>>> /data1/flinkapp on the job manager's machine? >>>>> >>>>> Could you give the data flow example? >>>>> >>>>> And another question is, when I turn off checkpointing (which is also >>>>> default cfg), what happens to the states processing? >>>>> >>>>> >>>>> >>>>> 2018-01-03 0:06 GMT+08:00 Timo Walther <twal...@apache.org>: >>>>>> Hi Jinhua, >>>>>> >>>>>> I will try to answer your questions: >>>>>> >>>>>> Flink checkpoints the state of each operator. For a Kafka consumer >>>>>> operator >>>>>> this is only the offset. For other operators (such as Windows or a >>>>>> ProcessFunction) the values/list/maps stored in the state are >>>>>> checkpointed. >>>>>> If you are interested in the internals, I would recommend this page [1]. >>>>>> Only the MemoryStateBackend sends entire states to the JobManager (see >>>>>> [2]). >>>>>> But you are right, this is a bottleneck and not very fault-tolerant. >>>>>> Usually, Flink assumes to have some distributed file system (such as >>>>>> HDFS) >>>>>> to which each Flink operator can be checkpointed in a fault-tolerant way. >>>>>> For the RocksDbStateBackend the local files are copied to HDFS as well. >>>>>> At >>>>>> the time of writing, only the RocksDBBackend supports incremental >>>>>> checkpoints. The JobManager can then read from HDFS and restore the >>>>>> operator >>>>>> on a different machine. >>>>>> >>>>>> Feel free to ask further questions. >>>>>> >>>>>> Regards, >>>>>> Timo >>>>>> >>>>>> [1] >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/stream_checkpointing.html >>>>>> [2] >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/state_backends.html >>>>>> >>>>>> >>>>>> >>>>>> Am 1/1/18 um 3:50 PM schrieb Jinhua Luo: >>>>>> >>>>>>> Hi All, >>>>>>> >>>>>>> I have two questions: >>>>>>> >>>>>>> a) does the records/elements themselves would be checkpointed? or just >>>>>>> record offset checkpointed? That is, what data included in the >>>>>>> checkpoint except for states? >>>>>>> >>>>>>> b) where flink stores the state globally? so that the job manager >>>>>>> could restore them on each task manger at failure restart. >>>>>>> >>>>>>> For the heap backend, all task managers would send states to job >>>>>>> manager, and job manager would save it in its heap, correct? >>>>>>> >>>>>>> For the fs/rocksdb backend, all task managers would save states >>>>>>> (incrementally or not) in local path temporarily, and send them (in >>>>>>> rocksdb snapshot format for the rocksdb case?) to the job manager at >>>>>>> checkpoint? >>>>>>> >>>>>>> The path we used to configure backend is the path on the job manager >>>>>>> machine but not on the task managers' machines? So that's the >>>>>>> bottleneck and single failure point? So it's better to use hdfs path >>>>>>> so that we could scale the storage and make it high availability as >>>>>>> well? >>>>>>> >>>>>>> Thank you all. >>>>>> >>>>>> >>>>>> >>>> >>