I am curious about how operator state is repartitioned to subtasks when a job is resumed from a checkpoint or savepoint. The reason is that I am having issues with the ContinuousFileReaderOperator when recovering from a failure.
I consume most of my data from files off S3. I have a custom file monitor that understands how to walk my directory structure and outputs TimestampedFileSplits downstream in chronological order to the stock ContinuousFileReaderOperator. The reader consumes those splits and stores them a priority queue based on their last modified time ensuring that files are read in chronological order which is exactly what I want. The problem is when recovering, the unread splits being partitioned out to each of the subtasks seem to be heavily skewed in terms of last modified time. While each task may have a similar number of files I find then one or two will have a disproportionate number of old files. This in turn holds back my watermark (sometimes for several hours depending on the number of unread splits) which keeps timers from firing, windows from purging, etc. I was hoping there were some way I could add a custom partitioner to ensure that splits are uniformly distributed in a temporal manner or if someone had other ideas of how I could mitigate the problem. Thank you, Seth Wiesman