Thanks Qingsheng ! This answers my doubt.

Regards,
Meghajit

On Tue, May 31, 2022 at 3:03 PM Qingsheng Ren <[email protected]> wrote:

> Hi Meghajit,
>
> Good question! To make a short answer: splits won’t be returned back to
> enumerator by reader once they are assigned and *checkpointed*.
>
> As described by the JavaDoc of SplitEnumerator#addSplitsBack [1]:
>
> > Add a split back to the split enumerator. It will only happen when a
> SourceReader fails and there are splits assigned to it after the last
> successful checkpoint.
>
> Suppose we have split A and reader 0, and we have a flow like this:
>
> Checkpoint 100 -> Split assignment (A -> 0) -> Checkpoint 101
>
> After checkpoint 101 the state of split A will be managed by reader 0,
> which means if the reader fails and rolls back to checkpoint 101, the state
> of split A should be recovered by reader instead of returning to the
> enumerator because the split has been delivered to the reader and
> successfully stored into the reader’s checkpoint 101. But if reader 0 fails
> before checkpoint 101 and rolls back to 100, reader 0 is not aware of the
> assignment of split A, then A will be added back to the enumerator and be
> assigned again.
>
> In a nulshell, if a split is assigned to a reader and a checkpoint is made
> successfully, it should be reader’s responsibility to handle the state and
> recover, and the split won’t be returned to the enumerator. A split won’t
> be duplicately assigned or read under this pattern.
>
> Hope this is helpful!
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumerator.html
>
> Cheers,
>
> Qingsheng
>
>
> > On May 31, 2022, at 16:29, Meghajit Mazumdar <
> [email protected]> wrote:
> >
> > Hello,
> >
> > I had a question with regards to the behaviour of FileSource and
> SourceReader in cases of failures. Let me know if I missed something
> conceptually.
> >
> > We are running a Parquet File Source. Let's say, we supply the source
> with a directory path containing 5 files and the Flink job is configured to
> run with a parallelism of 2.
> >
> > When the job starts, 2 SourceReaders are created and when they ask for
> splits, the split assigner assigns them one file each, which they start
> processing.
> >
> > Now, the documentation of FileSplitAssigner.addSplits method says the
> following:
> > Adds a set of splits to this assigner. This happens for example when
> some split processing failed and the splits need to be re-added, or when
> new splits got discovered.
> >
> > I understand this means that un-processed splits or splits that were not
> processed completely due to some error with the SourceReader get added back
> to the split assigner to be re-assigned to some other SourceReader.
> >
> > However, the documentation of FileRecordFormat.restoreReader has this
> statement written:
> > Restores a reader from a checkpointed position. This method is called
> when the reader is recovered from a checkpoint and the reader has
> previously stored an offset into the checkpoint, by returning from the
> FileRecordFormat.Reader.getCheckpointedPosition() a value with non-negative
> offset. That value is supplied as the restoredOffset.
> >
> > I am somewhat confused by these 2 documentation statements. If the split
> is added back to the split assigner when split processing got failed by a
> SourceReader (maybe due to some exception or fatal error), then the split
> could be re-assigned to any other SourceReader next. Even if the failed
> SourceReader comes back and starts processing the file from the last
> checkpointed offset, there would be duplicate processing as the file could
> have been assigned to somebody else in the meantime. Then what is the
> purpose of `restoreReader` ? Or, am I missing something ?
> >
> > --
> > Regards,
> > Meghajit
>
>

-- 
*Regards,*
*Meghajit*

Reply via email to