Hi Meghajit,

Good question! To make a short answer: splits won’t be returned back to 
enumerator by reader once they are assigned and *checkpointed*. 

As described by the JavaDoc of SplitEnumerator#addSplitsBack [1]:

> Add a split back to the split enumerator. It will only happen when a 
> SourceReader fails and there are splits assigned to it after the last 
> successful checkpoint.

Suppose we have split A and reader 0, and we have a flow like this:

Checkpoint 100 -> Split assignment (A -> 0) -> Checkpoint 101

After checkpoint 101 the state of split A will be managed by reader 0, which 
means if the reader fails and rolls back to checkpoint 101, the state of split 
A should be recovered by reader instead of returning to the enumerator because 
the split has been delivered to the reader and successfully stored into the 
reader’s checkpoint 101. But if reader 0 fails before checkpoint 101 and rolls 
back to 100, reader 0 is not aware of the assignment of split A, then A will be 
added back to the enumerator and be assigned again.

In a nulshell, if a split is assigned to a reader and a checkpoint is made 
successfully, it should be reader’s responsibility to handle the state and 
recover, and the split won’t be returned to the enumerator. A split won’t be 
duplicately assigned or read under this pattern. 

Hope this is helpful!

[1] 
https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumerator.html

Cheers, 

Qingsheng


> On May 31, 2022, at 16:29, Meghajit Mazumdar <meghajit.mazum...@gojek.com> 
> wrote:
> 
> Hello,
> 
> I had a question with regards to the behaviour of FileSource and SourceReader 
> in cases of failures. Let me know if I missed something conceptually.
> 
> We are running a Parquet File Source. Let's say, we supply the source with a 
> directory path containing 5 files and the Flink job is configured to run with 
> a parallelism of 2.
> 
> When the job starts, 2 SourceReaders are created and when they ask for 
> splits, the split assigner assigns them one file each, which they start 
> processing.
> 
> Now, the documentation of FileSplitAssigner.addSplits method says the 
> following: 
> Adds a set of splits to this assigner. This happens for example when some 
> split processing failed and the splits need to be re-added, or when new 
> splits got discovered.
> 
> I understand this means that un-processed splits or splits that were not 
> processed completely due to some error with the SourceReader get added back 
> to the split assigner to be re-assigned to some other SourceReader.
> 
> However, the documentation of FileRecordFormat.restoreReader has this 
> statement written:
> Restores a reader from a checkpointed position. This method is called when 
> the reader is recovered from a checkpoint and the reader has previously 
> stored an offset into the checkpoint, by returning from the 
> FileRecordFormat.Reader.getCheckpointedPosition() a value with non-negative 
> offset. That value is supplied as the restoredOffset.
> 
> I am somewhat confused by these 2 documentation statements. If the split is 
> added back to the split assigner when split processing got failed by a 
> SourceReader (maybe due to some exception or fatal error), then the split 
> could be re-assigned to any other SourceReader next. Even if the failed 
> SourceReader comes back and starts processing the file from the last 
> checkpointed offset, there would be duplicate processing as the file could 
> have been assigned to somebody else in the meantime. Then what is the purpose 
> of `restoreReader` ? Or, am I missing something ?
> 
> -- 
> Regards,
> Meghajit

Reply via email to