Each operator (which run in a TaskManager) will write its state to some 
location in HDFS (or any other DFS) and send a handle to this to the 
CheckpointCoordinator (which is running on the JobManager). The 
CheckpointCoordinator is collecting all the handles and creating one 
Uber-Handle, which describes the complete checkpoint. When restoring, the 
CheckpointCoordinator figures out which handles need to be sent to which 
operators for restoring.

Best,
Aljoscha

> On 4. Jan 2018, at 14:44, Jinhua Luo <luajit...@gmail.com> wrote:
> 
> OK, I think I get the point.
> 
> But another question raises: how task managers merge their rocksdb
> snapshot on a global single path?
> 
> 
> 2018-01-04 21:30 GMT+08:00 Aljoscha Krettek <aljos...@apache.org>:
>> Hi,
>> 
>> The path you give to the constructor must be a path on some distributed 
>> filesystem, otherwise the data will be lost when the local machine crashes. 
>> As you mentioned correctly.
>> 
>> RocksDB will keep files in a local directory (you can specify this using 
>> setDbStoragePath()) and when checkpointing will write to the checkpoint 
>> directory that you specified in the constructor.
>> 
>> Best,
>> Aljoscha
>> 
>> 
>>> On 4. Jan 2018, at 14:23, Jinhua Luo <luajit...@gmail.com> wrote:
>>> 
>>> I still do not understand the relationship between rocksdb backend and
>>> the filesystem (here I refer to any filesystem impl, including local,
>>> hdfs, s3).
>>> 
>>> For example, when I specify the path to rocksdb backend:
>>> env.setStateBackend(new RocksDBStateBackend("file:///data1/flinkapp"));
>>> 
>>> What does it mean?
>>> 
>>> Each task manager would save states to /data1/flinkapp on its machine?
>>> But it seems no sense. Because when one of the machines crashes, the
>>> job manager could not access the states on dead machine.
>>> Or, each task manager creates rocksdb instance on temporary path, and
>>> send snapshots to job manager, then job manager in turn saves them on
>>> /data1/flinkapp on the job manager's machine?
>>> 
>>> Could you give the data flow example?
>>> 
>>> And another question is, when I turn off checkpointing (which is also
>>> default cfg), what happens to the states processing?
>>> 
>>> 
>>> 
>>> 2018-01-03 0:06 GMT+08:00 Timo Walther <twal...@apache.org>:
>>>> Hi Jinhua,
>>>> 
>>>> I will try to answer your questions:
>>>> 
>>>> Flink checkpoints the state of each operator. For a Kafka consumer operator
>>>> this is only the offset. For other operators (such as Windows or a
>>>> ProcessFunction) the values/list/maps stored in the state are checkpointed.
>>>> If you are interested in the internals, I would recommend this page [1].
>>>> Only the MemoryStateBackend sends entire states to the JobManager (see 
>>>> [2]).
>>>> But you are right, this is a bottleneck and not very fault-tolerant.
>>>> Usually, Flink assumes to have some distributed file system (such as HDFS)
>>>> to which each Flink operator can be checkpointed in a fault-tolerant way.
>>>> For the RocksDbStateBackend the local files are copied to HDFS as well. At
>>>> the time of writing, only the RocksDBBackend supports incremental
>>>> checkpoints. The JobManager can then read from HDFS and restore the 
>>>> operator
>>>> on a different machine.
>>>> 
>>>> Feel free to ask further questions.
>>>> 
>>>> Regards,
>>>> Timo
>>>> 
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/stream_checkpointing.html
>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/state_backends.html
>>>> 
>>>> 
>>>> 
>>>> Am 1/1/18 um 3:50 PM schrieb Jinhua Luo:
>>>> 
>>>>> Hi All,
>>>>> 
>>>>> I have two questions:
>>>>> 
>>>>> a) does the records/elements themselves would be checkpointed? or just
>>>>> record offset checkpointed? That is, what data included in the
>>>>> checkpoint except for states?
>>>>> 
>>>>> b) where flink stores the state globally? so that the job manager
>>>>> could restore them on each task manger at failure restart.
>>>>> 
>>>>> For the heap backend, all task managers would send states to job
>>>>> manager, and job manager would save it in its heap, correct?
>>>>> 
>>>>> For the fs/rocksdb backend, all task managers would save states
>>>>> (incrementally or not) in local path temporarily, and send them (in
>>>>> rocksdb snapshot format for the rocksdb case?) to the job manager at
>>>>> checkpoint?
>>>>> 
>>>>> The path we used to configure backend is the path on the job manager
>>>>> machine but not on the task managers' machines? So that's the
>>>>> bottleneck and single failure point? So it's better to use hdfs path
>>>>> so that we could scale the storage and make it high availability as
>>>>> well?
>>>>> 
>>>>> Thank you all.
>>>> 
>>>> 
>>>> 
>> 

Reply via email to