[ https://issues.apache.org/jira/browse/FLINK-5083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kostas Kloudas closed FLINK-5083. --------------------------------- Resolution: Fixed > Race condition in Rolling/Bucketing Sink pending files cleanup > -------------------------------------------------------------- > > Key: FLINK-5083 > URL: https://issues.apache.org/jira/browse/FLINK-5083 > Project: Flink > Issue Type: Bug > Components: DataStream API > Affects Versions: 1.2.0, 1.1.3 > Reporter: Cliff Resnick > > In both Open and Restore methods there is code that: > 1. gets a recursive listing from baseDir > 2. iterates listing and name checks filenames based on subtaskIndex and other > criteria to find pending or in-progress files. If found delete. > The problem is that the recursive listing gets all files for all > subtaskIndexes. The race error is when #hasNext is called as part of the > iteration, a hidden existence check is made on the "next" file, which was > deleted by another task after-listing but pre-iteration, so an error is > thrown and the job fails. > Depending on the number of pending files, this condition may outlast the > number of job retries, each failing on a different file. > A solution would be use #listStatus instead. The hadoop FileSystem supports a > PathFilter in its #listStatus calls, but not in the recursive #listFiles > call. The cleanup is performed from the baseDir so the recursive listing > would have to be in Flink. > This touches on another issue. Over time, the directory listing is bound to > get very large, and re-listing everything from the baseDir may get > increasingly expensive, especially if the Fs is S3. Maybe we can have a > Bucketer callback to return a list of cleanup root directories based on the > current file? I'm guessing most people are using time based bucketing, so > there's only so much of a period where cleanup will matter. If so, then this > would solve for the above recursive listing problem. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)