I am curious about how operator state is repartitioned to subtasks when a job 
is resumed from a checkpoint or savepoint. The reason is that I am having 
issues with the ContinuousFileReaderOperator when recovering from a failure.

I consume most of my data from files off S3. I have a custom file monitor that 
understands how to walk my directory structure and outputs 
TimestampedFileSplits downstream in chronological order to the stock 
ContinuousFileReaderOperator. The reader consumes those splits and stores them 
a priority queue based on their last modified time ensuring that files are read 
in chronological order which is exactly what I want. The problem is when 
recovering, the unread splits being partitioned out to each of the subtasks 
seem to be heavily skewed in terms of last modified time.

While each task may have a similar number of files I find then one or two will 
have a disproportionate number of old files. This in turn holds back my 
watermark (sometimes for several hours depending on the number of unread 
splits) which keeps timers from firing, windows from purging, etc.

I was hoping there were some way I could add a custom partitioner to ensure 
that splits are uniformly distributed in a temporal manner or if someone had 
other ideas of how I could mitigate the problem.

Thank you,

Seth Wiesman

Reply via email to