Hi Sergii If I understand correctly, you want to process all the files in some directory, and do not want to process them multiple times. I'm not sure if using `FileProcessingMode#PROCESS_CONTINUOUSLY` instead of `FileProcessingMode#PROCESS_ONCE`[1] can satisfy your needs, and keep the job running 7*24.
but be careful, under `FileProcessingMode#CONTINUOUSLY` mode, when a file is modified, its contents are re-processed entirely. This can break the “exactly-once” semantics, as appending data at the end of a file will lead to all its contents being re-processed. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/datastream_api.html#data-sources Best, Congxian Sergii Mikhtoniuk <mikhton...@gmail.com> 于2020年5月18日周一 上午5:47写道: > Hello, > > I'm migrating my Spark-based stream processing application to Flink > (Calcite SQL and temporal tables look too attractive to resist). > > My Spark app works as follows: > - application is started periodically > - it reads a directory of Parquet files as a stream > - SQL transformations are applied > - resulting append stream is written to another directory > - it runs until all available data is processed > - checkpoints its state > - and **exits** > - upon next run it resumes where it left off, processing only new data > > I'm having difficulties replicating this start-stop-resume behavior with > Flink. > > When I setup my input stream using: > > env.readFile[Row](..., FileProcessingMode.PROCESS_CONTINUOUSLY) > > ... I get an infinite stream, but the application will naturally keep > running until aborted manually. > > When I use FileProcessingMode.PROCESS_ONCE - the application exits after > exhausting all inputs, but it seems that Flink also treats the end of the > stream as max watermark so, for example, it will close all tumbling windows > that I don't want to be closed yet since more data will arrive upon next > run. > > Is there a way not to emit a max watermark with PROCESS_ONCE? If so, can I > still trigger a savepoint when env.execute() returns? > > Alternatively, if I use PROCESS_CONTINUOUSLY along with env.executeAsync() > is there a way for me to detect when file stream was exhausted to call > job.stopWithSavepoint()? > > Thanks for your help! > - Sergii > >