Hi,

I think what you’re trying to achieve is not possible with the out-of-box file 
source. The problem is that it is hard to know when a file can be deleted, i.e. 
there are multiple splits of a file and those are possibly read on different 
parallel operators. Plus, deletion/move of files has to happen after a 
checkpoint is confirmed, otherwise, the job might fail and would have to 
re-read those files.

You could get this working by implementing your own version of the continuous 
file monitor and file reader operators. You would have to ensure that one split 
always covers one complete file (maybe make your files small enough for that), 
then in the reader operator, after reading a split, you would store them in a 
list that you checkpoint. When a checkpoint is confirmed, i.e. in the 
notifyCheckpointComplete() method you can delete the files in that list.

I hope that helps.

Best,
Aljoscha

> On 20. May 2019, at 09:53, Hanan Yehudai <[email protected]> wrote:
> 
> Hi
> im looking for a way to delete / rename files that are done loading..
>  
> im using the env.readFile ,   monitoring a directory for all new files,  once 
> files are done with I would like to delete it.
> Is there a way to monitor the closed splits in the continues reader ?  is 
> there an different way to do this ?
>  
>  
> Regards,
> Hanan

Reply via email to