Hello,

I had a question with regards to the behaviour of FileSource and
SourceReader in cases of failures. Let me know if I missed something
conceptually.

We are running a Parquet File Source. Let's say, we supply the source with
a directory path containing 5 files and the Flink job is configured to run
with a parallelism of 2.

When the job starts, 2 SourceReaders are created and when they ask for
splits, the split assigner assigns them one file each, which they start
processing.

Now, the documentation of FileSplitAssigner.addSplits
<https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/file/src/assigners/FileSplitAssigner.html#addSplits-java.util.Collection->
method says the following:
*Adds a set of splits to this assigner. This happens for example when some
split processing failed and the splits need to be re-added, or when new
splits got discovered.*

I understand this means that un-processed splits or splits that were not
processed completely due to some error with the SourceReader get added back
to the split assigner to be re-assigned to some other SourceReader.

However, the documentation of FileRecordFormat.restoreReader
<https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/file/src/reader/FileRecordFormat.html#restoreReader-org.apache.flink.configuration.Configuration-org.apache.flink.core.fs.Path-long-long-long->
has
this statement written:
*Restores a reader from a checkpointed position. This method is called when
the reader is recovered from a checkpoint and the reader has previously
stored an offset into the checkpoint, by returning from the
FileRecordFormat.Reader.getCheckpointedPosition() a value with non-negative
offset. That value is supplied as the restoredOffset.*

I am somewhat confused by these 2 documentation statements. If the split is
added back to the split assigner when split processing got failed by a
SourceReader (maybe due to some exception or fatal error), then the split
could be re-assigned to any other SourceReader next. Even if the failed
SourceReader comes back and starts processing the file from the last
checkpointed offset, there would be duplicate processing as the file could
have been assigned to somebody else in the meantime. Then what is the
purpose of `restoreReader` ? Or, am I missing something ?

-- 
*Regards,*
*Meghajit*

Reply via email to