TaskManagers don't do any checkpointing but Operators that run in TaskManagers 
do.

Each operator, of which there are multiple running on multiple TMs in the 
cluster will write to a unique DFS directory. Something like:
/checkpoints/job-xyz/checkpoint-1/operator-a/1

These individual checkpoints are not merged together into one directory but the 
handles to those directories are sent to the CheckpointCoordinator which 
creates a checkpoint that stores handles to all the states stored in DFS.

Best,
Aljoscha

> On 4. Jan 2018, at 15:06, Jinhua Luo <luajit...@gmail.com> wrote:
> 
> One task manager would create one rocksdb instance on its local
> temporary dir, correct?
> Since there is likely multiple task managers for one cluster, so how
> they handle directory conflict, because one rocksdb instance is one
> directory, that is, what I mentioned at first, how they merge rocksdb
> instances and store it on the single distributed filesystem path?
> 
> 2018-01-04 22:00 GMT+08:00 Aljoscha Krettek <aljos...@apache.org>:
>> Ah I see. Currently the RocksDB backend will use one column in RocksDB per 
>> state that is registered. The states for different keys of one state are 
>> stored in one column.
>> 
>>> On 4. Jan 2018, at 14:56, Jinhua Luo <luajit...@gmail.com> wrote:
>>> 
>>> ok, I see.
>>> 
>>> But as known, one rocksdb instance occupy one directory, so I am still
>>> wondering what's the relationship between the states and rocksdb
>>> instances.
>>> 
>>> 2018-01-04 21:50 GMT+08:00 Aljoscha Krettek <aljos...@apache.org>:
>>>> Each operator (which run in a TaskManager) will write its state to some 
>>>> location in HDFS (or any other DFS) and send a handle to this to the 
>>>> CheckpointCoordinator (which is running on the JobManager). The 
>>>> CheckpointCoordinator is collecting all the handles and creating one 
>>>> Uber-Handle, which describes the complete checkpoint. When restoring, the 
>>>> CheckpointCoordinator figures out which handles need to be sent to which 
>>>> operators for restoring.
>>>> 
>>>> Best,
>>>> Aljoscha
>>>> 
>>>>> On 4. Jan 2018, at 14:44, Jinhua Luo <luajit...@gmail.com> wrote:
>>>>> 
>>>>> OK, I think I get the point.
>>>>> 
>>>>> But another question raises: how task managers merge their rocksdb
>>>>> snapshot on a global single path?
>>>>> 
>>>>> 
>>>>> 2018-01-04 21:30 GMT+08:00 Aljoscha Krettek <aljos...@apache.org>:
>>>>>> Hi,
>>>>>> 
>>>>>> The path you give to the constructor must be a path on some distributed 
>>>>>> filesystem, otherwise the data will be lost when the local machine 
>>>>>> crashes. As you mentioned correctly.
>>>>>> 
>>>>>> RocksDB will keep files in a local directory (you can specify this using 
>>>>>> setDbStoragePath()) and when checkpointing will write to the checkpoint 
>>>>>> directory that you specified in the constructor.
>>>>>> 
>>>>>> Best,
>>>>>> Aljoscha
>>>>>> 
>>>>>> 
>>>>>>> On 4. Jan 2018, at 14:23, Jinhua Luo <luajit...@gmail.com> wrote:
>>>>>>> 
>>>>>>> I still do not understand the relationship between rocksdb backend and
>>>>>>> the filesystem (here I refer to any filesystem impl, including local,
>>>>>>> hdfs, s3).
>>>>>>> 
>>>>>>> For example, when I specify the path to rocksdb backend:
>>>>>>> env.setStateBackend(new RocksDBStateBackend("file:///data1/flinkapp"));
>>>>>>> 
>>>>>>> What does it mean?
>>>>>>> 
>>>>>>> Each task manager would save states to /data1/flinkapp on its machine?
>>>>>>> But it seems no sense. Because when one of the machines crashes, the
>>>>>>> job manager could not access the states on dead machine.
>>>>>>> Or, each task manager creates rocksdb instance on temporary path, and
>>>>>>> send snapshots to job manager, then job manager in turn saves them on
>>>>>>> /data1/flinkapp on the job manager's machine?
>>>>>>> 
>>>>>>> Could you give the data flow example?
>>>>>>> 
>>>>>>> And another question is, when I turn off checkpointing (which is also
>>>>>>> default cfg), what happens to the states processing?
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 2018-01-03 0:06 GMT+08:00 Timo Walther <twal...@apache.org>:
>>>>>>>> Hi Jinhua,
>>>>>>>> 
>>>>>>>> I will try to answer your questions:
>>>>>>>> 
>>>>>>>> Flink checkpoints the state of each operator. For a Kafka consumer 
>>>>>>>> operator
>>>>>>>> this is only the offset. For other operators (such as Windows or a
>>>>>>>> ProcessFunction) the values/list/maps stored in the state are 
>>>>>>>> checkpointed.
>>>>>>>> If you are interested in the internals, I would recommend this page 
>>>>>>>> [1].
>>>>>>>> Only the MemoryStateBackend sends entire states to the JobManager (see 
>>>>>>>> [2]).
>>>>>>>> But you are right, this is a bottleneck and not very fault-tolerant.
>>>>>>>> Usually, Flink assumes to have some distributed file system (such as 
>>>>>>>> HDFS)
>>>>>>>> to which each Flink operator can be checkpointed in a fault-tolerant 
>>>>>>>> way.
>>>>>>>> For the RocksDbStateBackend the local files are copied to HDFS as 
>>>>>>>> well. At
>>>>>>>> the time of writing, only the RocksDBBackend supports incremental
>>>>>>>> checkpoints. The JobManager can then read from HDFS and restore the 
>>>>>>>> operator
>>>>>>>> on a different machine.
>>>>>>>> 
>>>>>>>> Feel free to ask further questions.
>>>>>>>> 
>>>>>>>> Regards,
>>>>>>>> Timo
>>>>>>>> 
>>>>>>>> [1]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/stream_checkpointing.html
>>>>>>>> [2]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/state_backends.html
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Am 1/1/18 um 3:50 PM schrieb Jinhua Luo:
>>>>>>>> 
>>>>>>>>> Hi All,
>>>>>>>>> 
>>>>>>>>> I have two questions:
>>>>>>>>> 
>>>>>>>>> a) does the records/elements themselves would be checkpointed? or just
>>>>>>>>> record offset checkpointed? That is, what data included in the
>>>>>>>>> checkpoint except for states?
>>>>>>>>> 
>>>>>>>>> b) where flink stores the state globally? so that the job manager
>>>>>>>>> could restore them on each task manger at failure restart.
>>>>>>>>> 
>>>>>>>>> For the heap backend, all task managers would send states to job
>>>>>>>>> manager, and job manager would save it in its heap, correct?
>>>>>>>>> 
>>>>>>>>> For the fs/rocksdb backend, all task managers would save states
>>>>>>>>> (incrementally or not) in local path temporarily, and send them (in
>>>>>>>>> rocksdb snapshot format for the rocksdb case?) to the job manager at
>>>>>>>>> checkpoint?
>>>>>>>>> 
>>>>>>>>> The path we used to configure backend is the path on the job manager
>>>>>>>>> machine but not on the task managers' machines? So that's the
>>>>>>>>> bottleneck and single failure point? So it's better to use hdfs path
>>>>>>>>> so that we could scale the storage and make it high availability as
>>>>>>>>> well?
>>>>>>>>> 
>>>>>>>>> Thank you all.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> 
>> 

Reply via email to