Thanks Qingsheng ! This answers my doubt. Regards, Meghajit
On Tue, May 31, 2022 at 3:03 PM Qingsheng Ren <[email protected]> wrote: > Hi Meghajit, > > Good question! To make a short answer: splits won’t be returned back to > enumerator by reader once they are assigned and *checkpointed*. > > As described by the JavaDoc of SplitEnumerator#addSplitsBack [1]: > > > Add a split back to the split enumerator. It will only happen when a > SourceReader fails and there are splits assigned to it after the last > successful checkpoint. > > Suppose we have split A and reader 0, and we have a flow like this: > > Checkpoint 100 -> Split assignment (A -> 0) -> Checkpoint 101 > > After checkpoint 101 the state of split A will be managed by reader 0, > which means if the reader fails and rolls back to checkpoint 101, the state > of split A should be recovered by reader instead of returning to the > enumerator because the split has been delivered to the reader and > successfully stored into the reader’s checkpoint 101. But if reader 0 fails > before checkpoint 101 and rolls back to 100, reader 0 is not aware of the > assignment of split A, then A will be added back to the enumerator and be > assigned again. > > In a nulshell, if a split is assigned to a reader and a checkpoint is made > successfully, it should be reader’s responsibility to handle the state and > recover, and the split won’t be returned to the enumerator. A split won’t > be duplicately assigned or read under this pattern. > > Hope this is helpful! > > [1] > https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumerator.html > > Cheers, > > Qingsheng > > > > On May 31, 2022, at 16:29, Meghajit Mazumdar < > [email protected]> wrote: > > > > Hello, > > > > I had a question with regards to the behaviour of FileSource and > SourceReader in cases of failures. Let me know if I missed something > conceptually. > > > > We are running a Parquet File Source. Let's say, we supply the source > with a directory path containing 5 files and the Flink job is configured to > run with a parallelism of 2. > > > > When the job starts, 2 SourceReaders are created and when they ask for > splits, the split assigner assigns them one file each, which they start > processing. > > > > Now, the documentation of FileSplitAssigner.addSplits method says the > following: > > Adds a set of splits to this assigner. This happens for example when > some split processing failed and the splits need to be re-added, or when > new splits got discovered. > > > > I understand this means that un-processed splits or splits that were not > processed completely due to some error with the SourceReader get added back > to the split assigner to be re-assigned to some other SourceReader. > > > > However, the documentation of FileRecordFormat.restoreReader has this > statement written: > > Restores a reader from a checkpointed position. This method is called > when the reader is recovered from a checkpoint and the reader has > previously stored an offset into the checkpoint, by returning from the > FileRecordFormat.Reader.getCheckpointedPosition() a value with non-negative > offset. That value is supplied as the restoredOffset. > > > > I am somewhat confused by these 2 documentation statements. If the split > is added back to the split assigner when split processing got failed by a > SourceReader (maybe due to some exception or fatal error), then the split > could be re-assigned to any other SourceReader next. Even if the failed > SourceReader comes back and starts processing the file from the last > checkpointed offset, there would be duplicate processing as the file could > have been assigned to somebody else in the meantime. Then what is the > purpose of `restoreReader` ? Or, am I missing something ? > > > > -- > > Regards, > > Meghajit > > -- *Regards,* *Meghajit*
