lvhuyen opened a new pull request #6613: URL: https://github.com/apache/flink/pull/6613
[FLINK-9940] Fix - File-source continuous monitoring mode - out-of-order files were missed ## Fix the issue with ContinuousFileMonitoringFunction - out-of-order files were missed in continuous directory scanning mode. - _Cause_: In the existing directory monitoring mechanism, Flink was maintaining the maximum last-modified-timestamp of all identified files (_globalModificationTime_) so that in the next scan, all files with last-modified-timestamp equal or earlier than that _globalModificationTime_ will be ignored. - _Fix_: This fix provides an additional param when creating a ContinuousFileMonitoringFunction: readConsistencyOffset. Every scan now starts from that max last-modified-timestamp minus this offset. A new list of processedFiles is also maintained, which consists of all known files having modTimestamp in that offset period. - For testing this fix, a change to flink-fs-tests has also been made: The collection of seenFiles is changed from a TreeSet to a SortedList. This change is to verify the ExactOnce of file scanning, instead of AtLeastOnce. ## Verifying this change This change is already covered by existing tests with slight update. - ContinuousFileProcessingMigrationTest.testMonitoringSourceRestore. - ContinuousFileProcessingTest.{testFunctionRestore, testProcessContinuously} This change also added test: - ContinuousFileProcessingTest.testProcessContinuouslyWithNoteTooLateFile ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - The serializers: no - The runtime per-record code paths (performance sensitive): yes (per-file). This is expected to have minimal impact. - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? JavaDocs ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org