Re: [DISCUSS] Limit size of already processed files in File Source SplitEnumerator

2021-06-08 Thread Tianxin Zhao
Thanks Till, Guowei and Arvid for the insightful discussion! 1. Regarding size and scan performance We are in the POC stage and not hitting OOM issue yet, the issue is discovered by reading through FileSource implementation. Our order of magnitude is each path 200B and ~8000

Re: [DISCUSS] Limit size of already processed files in File Source SplitEnumerator

2021-06-08 Thread Arvid Heise
Hi Tianxin, I assigned you the ticket, so you could go ahead and create some POC PR. I would like to understand the issue first a bit better and then give some things to consider. In general, I see your point that in a potentially infinitely running application keeping track of all read entities

Re: [DISCUSS] Limit size of already processed files in File Source SplitEnumerator

2021-06-08 Thread Guowei Ma
It would really simplify a lot if the modification timestamp of each newly scanned file is increased. We only need to record the file list corresponding to the largest timestamp. Timestamp of each scanned file 1. It is smaller than the maximum timestamp, which means it has been processed;

Re: [DISCUSS] Limit size of already processed files in File Source SplitEnumerator

2021-06-08 Thread Till Rohrmann
Hi Tianxin, thanks for starting this discussion. I am pulling in Arvid who works on Flink's connectors. I think the problem you are describing can happen. >From what I understand you are proposing to keep track of the watermark of processed file input splits and then filter out splits based on

[DISCUSS] Limit size of already processed files in File Source SplitEnumerator

2021-06-07 Thread Tianxin Zhao
Hi! Currently Flink File Source relies on a Set pathsAlreadyProcessed in SplitEnumerator to decide which file has been processed and avoids reprocessing files if a file is already in this set. However this set could be ever growing and ultimately exceed memory size if there are new files