ok, I see.
But as known, one rocksdb instance occupy one directory, so I am still
wondering what's the relationship between the states and rocksdb
instances.
2018-01-04 21:50 GMT+08:00 Aljoscha Krettek <aljos...@apache.org>:
> Each operator (which run in a TaskManager) will write its state to some
> location in HDFS (or any other DFS) and send a handle to this to the
> CheckpointCoordinator (which is running on the JobManager). The
> CheckpointCoordinator is collecting all the handles and creating one
> Uber-Handle, which describes the complete checkpoint. When restoring, the
> CheckpointCoordinator figures out which handles need to be sent to which
> operators for restoring.
>
> Best,
> Aljoscha
>
>> On 4. Jan 2018, at 14:44, Jinhua Luo <luajit...@gmail.com> wrote:
>>
>> OK, I think I get the point.
>>
>> But another question raises: how task managers merge their rocksdb
>> snapshot on a global single path?
>>
>>
>> 2018-01-04 21:30 GMT+08:00 Aljoscha Krettek <aljos...@apache.org>:
>>> Hi,
>>>
>>> The path you give to the constructor must be a path on some distributed
>>> filesystem, otherwise the data will be lost when the local machine crashes.
>>> As you mentioned correctly.
>>>
>>> RocksDB will keep files in a local directory (you can specify this using
>>> setDbStoragePath()) and when checkpointing will write to the checkpoint
>>> directory that you specified in the constructor.
>>>
>>> Best,
>>> Aljoscha
>>>
>>>
>>>> On 4. Jan 2018, at 14:23, Jinhua Luo <luajit...@gmail.com> wrote:
>>>>
>>>> I still do not understand the relationship between rocksdb backend and
>>>> the filesystem (here I refer to any filesystem impl, including local,
>>>> hdfs, s3).
>>>>
>>>> For example, when I specify the path to rocksdb backend:
>>>> env.setStateBackend(new RocksDBStateBackend("file:///data1/flinkapp"));
>>>>
>>>> What does it mean?
>>>>
>>>> Each task manager would save states to /data1/flinkapp on its machine?
>>>> But it seems no sense. Because when one of the machines crashes, the
>>>> job manager could not access the states on dead machine.
>>>> Or, each task manager creates rocksdb instance on temporary path, and
>>>> send snapshots to job manager, then job manager in turn saves them on
>>>> /data1/flinkapp on the job manager's machine?
>>>>
>>>> Could you give the data flow example?
>>>>
>>>> And another question is, when I turn off checkpointing (which is also
>>>> default cfg), what happens to the states processing?
>>>>
>>>>
>>>>
>>>> 2018-01-03 0:06 GMT+08:00 Timo Walther <twal...@apache.org>:
>>>>> Hi Jinhua,
>>>>>
>>>>> I will try to answer your questions:
>>>>>
>>>>> Flink checkpoints the state of each operator. For a Kafka consumer
>>>>> operator
>>>>> this is only the offset. For other operators (such as Windows or a
>>>>> ProcessFunction) the values/list/maps stored in the state are
>>>>> checkpointed.
>>>>> If you are interested in the internals, I would recommend this page [1].
>>>>> Only the MemoryStateBackend sends entire states to the JobManager (see
>>>>> [2]).
>>>>> But you are right, this is a bottleneck and not very fault-tolerant.
>>>>> Usually, Flink assumes to have some distributed file system (such as HDFS)
>>>>> to which each Flink operator can be checkpointed in a fault-tolerant way.
>>>>> For the RocksDbStateBackend the local files are copied to HDFS as well. At
>>>>> the time of writing, only the RocksDBBackend supports incremental
>>>>> checkpoints. The JobManager can then read from HDFS and restore the
>>>>> operator
>>>>> on a different machine.
>>>>>
>>>>> Feel free to ask further questions.
>>>>>
>>>>> Regards,
>>>>> Timo
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/stream_checkpointing.html
>>>>> [2]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/state_backends.html
>>>>>
>>>>>
>>>>>
>>>>> Am 1/1/18 um 3:50 PM schrieb Jinhua Luo:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I have two questions:
>>>>>>
>>>>>> a) does the records/elements themselves would be checkpointed? or just
>>>>>> record offset checkpointed? That is, what data included in the
>>>>>> checkpoint except for states?
>>>>>>
>>>>>> b) where flink stores the state globally? so that the job manager
>>>>>> could restore them on each task manger at failure restart.
>>>>>>
>>>>>> For the heap backend, all task managers would send states to job
>>>>>> manager, and job manager would save it in its heap, correct?
>>>>>>
>>>>>> For the fs/rocksdb backend, all task managers would save states
>>>>>> (incrementally or not) in local path temporarily, and send them (in
>>>>>> rocksdb snapshot format for the rocksdb case?) to the job manager at
>>>>>> checkpoint?
>>>>>>
>>>>>> The path we used to configure backend is the path on the job manager
>>>>>> machine but not on the task managers' machines? So that's the
>>>>>> bottleneck and single failure point? So it's better to use hdfs path
>>>>>> so that we could scale the storage and make it high availability as
>>>>>> well?
>>>>>>
>>>>>> Thank you all.
>>>>>
>>>>>
>>>>>
>>>
>