Hi, > I have two questions: > > a) does the records/elements themselves would be checkpointed? or just > record offset checkpointed? That is, what data included in the > checkpoint except for states?
No, just offsets (or something similar, depending on the source), which are part of the state of the source operators. > > b) where flink stores the state globally? so that the job manager > could restore them on each task manger at failure restart. In any non-test scenario, the checkpoint/savepoint state is stored in a distributed store, e.g. HDFS or S3. The job manager is just sending „pointers“ (we call them state handles) about where to find the state in the distributed storage to the task managers on recovery. > > For the heap backend, all task managers would send states to job > manager, and job manager would save it in its heap, correct? > > For the fs/rocksdb backend, all task managers would save states > (incrementally or not) in local path temporarily, and send them (in > rocksdb snapshot format for the rocksdb case?) to the job manager at > checkpoint? This is basically the inverse of the restore. For non-test scenarios, the backends write the state to the distributed store and send handles (e.g. filename + offsets) to the job manager. The only exception is the MemoryStateBackend, which is more for testing purposes, and sends state handles that contain the full state as byte arrays. > > The path we used to configure backend is the path on the job manager > machine but not on the task managers' machines? So that's the > bottleneck and single failure point? So it's better to use hdfs path > so that we could scale the storage and make it high availability as > well? You should configure a path to the distributed storage, and so it becomes no single point of failure. After all, the state must also still be accessible to all task managers via the state handles in case of node failures. The job manager is not collecting the actual state. Best, Stefan