Thanks, and I will read the codes to get more understanding.

Let me repeat another question, what happen if the checkpoing is
disabled (by default, as known)? Does the state still saved?

2018-01-04 22:48 GMT+08:00 Aljoscha Krettek <[email protected]>:
> TaskManagers don't do any checkpointing but Operators that run in 
> TaskManagers do.
>
> Each operator, of which there are multiple running on multiple TMs in the 
> cluster will write to a unique DFS directory. Something like:
> /checkpoints/job-xyz/checkpoint-1/operator-a/1
>
> These individual checkpoints are not merged together into one directory but 
> the handles to those directories are sent to the CheckpointCoordinator which 
> creates a checkpoint that stores handles to all the states stored in DFS.
>
> Best,
> Aljoscha
>
>> On 4. Jan 2018, at 15:06, Jinhua Luo <[email protected]> wrote:
>>
>> One task manager would create one rocksdb instance on its local
>> temporary dir, correct?
>> Since there is likely multiple task managers for one cluster, so how
>> they handle directory conflict, because one rocksdb instance is one
>> directory, that is, what I mentioned at first, how they merge rocksdb
>> instances and store it on the single distributed filesystem path?
>>
>> 2018-01-04 22:00 GMT+08:00 Aljoscha Krettek <[email protected]>:
>>> Ah I see. Currently the RocksDB backend will use one column in RocksDB per 
>>> state that is registered. The states for different keys of one state are 
>>> stored in one column.
>>>
>>>> On 4. Jan 2018, at 14:56, Jinhua Luo <[email protected]> wrote:
>>>>
>>>> ok, I see.
>>>>
>>>> But as known, one rocksdb instance occupy one directory, so I am still
>>>> wondering what's the relationship between the states and rocksdb
>>>> instances.
>>>>
>>>> 2018-01-04 21:50 GMT+08:00 Aljoscha Krettek <[email protected]>:
>>>>> Each operator (which run in a TaskManager) will write its state to some 
>>>>> location in HDFS (or any other DFS) and send a handle to this to the 
>>>>> CheckpointCoordinator (which is running on the JobManager). The 
>>>>> CheckpointCoordinator is collecting all the handles and creating one 
>>>>> Uber-Handle, which describes the complete checkpoint. When restoring, the 
>>>>> CheckpointCoordinator figures out which handles need to be sent to which 
>>>>> operators for restoring.
>>>>>
>>>>> Best,
>>>>> Aljoscha
>>>>>
>>>>>> On 4. Jan 2018, at 14:44, Jinhua Luo <[email protected]> wrote:
>>>>>>
>>>>>> OK, I think I get the point.
>>>>>>
>>>>>> But another question raises: how task managers merge their rocksdb
>>>>>> snapshot on a global single path?
>>>>>>
>>>>>>
>>>>>> 2018-01-04 21:30 GMT+08:00 Aljoscha Krettek <[email protected]>:
>>>>>>> Hi,
>>>>>>>
>>>>>>> The path you give to the constructor must be a path on some distributed 
>>>>>>> filesystem, otherwise the data will be lost when the local machine 
>>>>>>> crashes. As you mentioned correctly.
>>>>>>>
>>>>>>> RocksDB will keep files in a local directory (you can specify this 
>>>>>>> using setDbStoragePath()) and when checkpointing will write to the 
>>>>>>> checkpoint directory that you specified in the constructor.
>>>>>>>
>>>>>>> Best,
>>>>>>> Aljoscha
>>>>>>>
>>>>>>>
>>>>>>>> On 4. Jan 2018, at 14:23, Jinhua Luo <[email protected]> wrote:
>>>>>>>>
>>>>>>>> I still do not understand the relationship between rocksdb backend and
>>>>>>>> the filesystem (here I refer to any filesystem impl, including local,
>>>>>>>> hdfs, s3).
>>>>>>>>
>>>>>>>> For example, when I specify the path to rocksdb backend:
>>>>>>>> env.setStateBackend(new RocksDBStateBackend("file:///data1/flinkapp"));
>>>>>>>>
>>>>>>>> What does it mean?
>>>>>>>>
>>>>>>>> Each task manager would save states to /data1/flinkapp on its machine?
>>>>>>>> But it seems no sense. Because when one of the machines crashes, the
>>>>>>>> job manager could not access the states on dead machine.
>>>>>>>> Or, each task manager creates rocksdb instance on temporary path, and
>>>>>>>> send snapshots to job manager, then job manager in turn saves them on
>>>>>>>> /data1/flinkapp on the job manager's machine?
>>>>>>>>
>>>>>>>> Could you give the data flow example?
>>>>>>>>
>>>>>>>> And another question is, when I turn off checkpointing (which is also
>>>>>>>> default cfg), what happens to the states processing?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> 2018-01-03 0:06 GMT+08:00 Timo Walther <[email protected]>:
>>>>>>>>> Hi Jinhua,
>>>>>>>>>
>>>>>>>>> I will try to answer your questions:
>>>>>>>>>
>>>>>>>>> Flink checkpoints the state of each operator. For a Kafka consumer 
>>>>>>>>> operator
>>>>>>>>> this is only the offset. For other operators (such as Windows or a
>>>>>>>>> ProcessFunction) the values/list/maps stored in the state are 
>>>>>>>>> checkpointed.
>>>>>>>>> If you are interested in the internals, I would recommend this page 
>>>>>>>>> [1].
>>>>>>>>> Only the MemoryStateBackend sends entire states to the JobManager 
>>>>>>>>> (see [2]).
>>>>>>>>> But you are right, this is a bottleneck and not very fault-tolerant.
>>>>>>>>> Usually, Flink assumes to have some distributed file system (such as 
>>>>>>>>> HDFS)
>>>>>>>>> to which each Flink operator can be checkpointed in a fault-tolerant 
>>>>>>>>> way.
>>>>>>>>> For the RocksDbStateBackend the local files are copied to HDFS as 
>>>>>>>>> well. At
>>>>>>>>> the time of writing, only the RocksDBBackend supports incremental
>>>>>>>>> checkpoints. The JobManager can then read from HDFS and restore the 
>>>>>>>>> operator
>>>>>>>>> on a different machine.
>>>>>>>>>
>>>>>>>>> Feel free to ask further questions.
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Timo
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/stream_checkpointing.html
>>>>>>>>> [2]
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/state_backends.html
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Am 1/1/18 um 3:50 PM schrieb Jinhua Luo:
>>>>>>>>>
>>>>>>>>>> Hi All,
>>>>>>>>>>
>>>>>>>>>> I have two questions:
>>>>>>>>>>
>>>>>>>>>> a) does the records/elements themselves would be checkpointed? or 
>>>>>>>>>> just
>>>>>>>>>> record offset checkpointed? That is, what data included in the
>>>>>>>>>> checkpoint except for states?
>>>>>>>>>>
>>>>>>>>>> b) where flink stores the state globally? so that the job manager
>>>>>>>>>> could restore them on each task manger at failure restart.
>>>>>>>>>>
>>>>>>>>>> For the heap backend, all task managers would send states to job
>>>>>>>>>> manager, and job manager would save it in its heap, correct?
>>>>>>>>>>
>>>>>>>>>> For the fs/rocksdb backend, all task managers would save states
>>>>>>>>>> (incrementally or not) in local path temporarily, and send them (in
>>>>>>>>>> rocksdb snapshot format for the rocksdb case?) to the job manager at
>>>>>>>>>> checkpoint?
>>>>>>>>>>
>>>>>>>>>> The path we used to configure backend is the path on the job manager
>>>>>>>>>> machine but not on the task managers' machines? So that's the
>>>>>>>>>> bottleneck and single failure point? So it's better to use hdfs path
>>>>>>>>>> so that we could scale the storage and make it high availability as
>>>>>>>>>> well?
>>>>>>>>>>
>>>>>>>>>> Thank you all.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>
>>>
>

Reply via email to