Ah I see. Currently the RocksDB backend will use one column in RocksDB per 
state that is registered. The states for different keys of one state are stored 
in one column.

> On 4. Jan 2018, at 14:56, Jinhua Luo <luajit...@gmail.com> wrote:
> 
> ok, I see.
> 
> But as known, one rocksdb instance occupy one directory, so I am still
> wondering what's the relationship between the states and rocksdb
> instances.
> 
> 2018-01-04 21:50 GMT+08:00 Aljoscha Krettek <aljos...@apache.org>:
>> Each operator (which run in a TaskManager) will write its state to some 
>> location in HDFS (or any other DFS) and send a handle to this to the 
>> CheckpointCoordinator (which is running on the JobManager). The 
>> CheckpointCoordinator is collecting all the handles and creating one 
>> Uber-Handle, which describes the complete checkpoint. When restoring, the 
>> CheckpointCoordinator figures out which handles need to be sent to which 
>> operators for restoring.
>> 
>> Best,
>> Aljoscha
>> 
>>> On 4. Jan 2018, at 14:44, Jinhua Luo <luajit...@gmail.com> wrote:
>>> 
>>> OK, I think I get the point.
>>> 
>>> But another question raises: how task managers merge their rocksdb
>>> snapshot on a global single path?
>>> 
>>> 
>>> 2018-01-04 21:30 GMT+08:00 Aljoscha Krettek <aljos...@apache.org>:
>>>> Hi,
>>>> 
>>>> The path you give to the constructor must be a path on some distributed 
>>>> filesystem, otherwise the data will be lost when the local machine 
>>>> crashes. As you mentioned correctly.
>>>> 
>>>> RocksDB will keep files in a local directory (you can specify this using 
>>>> setDbStoragePath()) and when checkpointing will write to the checkpoint 
>>>> directory that you specified in the constructor.
>>>> 
>>>> Best,
>>>> Aljoscha
>>>> 
>>>> 
>>>>> On 4. Jan 2018, at 14:23, Jinhua Luo <luajit...@gmail.com> wrote:
>>>>> 
>>>>> I still do not understand the relationship between rocksdb backend and
>>>>> the filesystem (here I refer to any filesystem impl, including local,
>>>>> hdfs, s3).
>>>>> 
>>>>> For example, when I specify the path to rocksdb backend:
>>>>> env.setStateBackend(new RocksDBStateBackend("file:///data1/flinkapp"));
>>>>> 
>>>>> What does it mean?
>>>>> 
>>>>> Each task manager would save states to /data1/flinkapp on its machine?
>>>>> But it seems no sense. Because when one of the machines crashes, the
>>>>> job manager could not access the states on dead machine.
>>>>> Or, each task manager creates rocksdb instance on temporary path, and
>>>>> send snapshots to job manager, then job manager in turn saves them on
>>>>> /data1/flinkapp on the job manager's machine?
>>>>> 
>>>>> Could you give the data flow example?
>>>>> 
>>>>> And another question is, when I turn off checkpointing (which is also
>>>>> default cfg), what happens to the states processing?
>>>>> 
>>>>> 
>>>>> 
>>>>> 2018-01-03 0:06 GMT+08:00 Timo Walther <twal...@apache.org>:
>>>>>> Hi Jinhua,
>>>>>> 
>>>>>> I will try to answer your questions:
>>>>>> 
>>>>>> Flink checkpoints the state of each operator. For a Kafka consumer 
>>>>>> operator
>>>>>> this is only the offset. For other operators (such as Windows or a
>>>>>> ProcessFunction) the values/list/maps stored in the state are 
>>>>>> checkpointed.
>>>>>> If you are interested in the internals, I would recommend this page [1].
>>>>>> Only the MemoryStateBackend sends entire states to the JobManager (see 
>>>>>> [2]).
>>>>>> But you are right, this is a bottleneck and not very fault-tolerant.
>>>>>> Usually, Flink assumes to have some distributed file system (such as 
>>>>>> HDFS)
>>>>>> to which each Flink operator can be checkpointed in a fault-tolerant way.
>>>>>> For the RocksDbStateBackend the local files are copied to HDFS as well. 
>>>>>> At
>>>>>> the time of writing, only the RocksDBBackend supports incremental
>>>>>> checkpoints. The JobManager can then read from HDFS and restore the 
>>>>>> operator
>>>>>> on a different machine.
>>>>>> 
>>>>>> Feel free to ask further questions.
>>>>>> 
>>>>>> Regards,
>>>>>> Timo
>>>>>> 
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/stream_checkpointing.html
>>>>>> [2]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/state_backends.html
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> Am 1/1/18 um 3:50 PM schrieb Jinhua Luo:
>>>>>> 
>>>>>>> Hi All,
>>>>>>> 
>>>>>>> I have two questions:
>>>>>>> 
>>>>>>> a) does the records/elements themselves would be checkpointed? or just
>>>>>>> record offset checkpointed? That is, what data included in the
>>>>>>> checkpoint except for states?
>>>>>>> 
>>>>>>> b) where flink stores the state globally? so that the job manager
>>>>>>> could restore them on each task manger at failure restart.
>>>>>>> 
>>>>>>> For the heap backend, all task managers would send states to job
>>>>>>> manager, and job manager would save it in its heap, correct?
>>>>>>> 
>>>>>>> For the fs/rocksdb backend, all task managers would save states
>>>>>>> (incrementally or not) in local path temporarily, and send them (in
>>>>>>> rocksdb snapshot format for the rocksdb case?) to the job manager at
>>>>>>> checkpoint?
>>>>>>> 
>>>>>>> The path we used to configure backend is the path on the job manager
>>>>>>> machine but not on the task managers' machines? So that's the
>>>>>>> bottleneck and single failure point? So it's better to use hdfs path
>>>>>>> so that we could scale the storage and make it high availability as
>>>>>>> well?
>>>>>>> 
>>>>>>> Thank you all.
>>>>>> 
>>>>>> 
>>>>>> 
>>>> 
>> 

Reply via email to