Hi Sergii

If I understand correctly, you want to process all the files in some
directory, and do not want to process them multiple times. I'm not sure if
using `FileProcessingMode#PROCESS_CONTINUOUSLY`
instead of `FileProcessingMode#PROCESS_ONCE`[1] can satisfy your needs, and
keep the job running 7*24.

but be careful, under `FileProcessingMode#CONTINUOUSLY` mode, when a file
is modified, its contents are re-processed entirely. This can break the
“exactly-once” semantics, as appending data at the end of a file will lead
to all its contents being re-processed.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/datastream_api.html#data-sources

Best,
Congxian


Sergii Mikhtoniuk <mikhton...@gmail.com> 于2020年5月18日周一 上午5:47写道:

> Hello,
>
> I'm migrating my Spark-based stream processing application to Flink
> (Calcite SQL and temporal tables look too attractive to resist).
>
> My Spark app works as follows:
> - application is started periodically
> - it reads a directory of Parquet files as a stream
> - SQL transformations are applied
> - resulting append stream is written to another directory
> - it runs until all available data is processed
> - checkpoints its state
> - and **exits**
> - upon next run it resumes where it left off, processing only new data
>
> I'm having difficulties replicating this start-stop-resume behavior with
> Flink.
>
> When I setup my input stream using:
>
>     env.readFile[Row](..., FileProcessingMode.PROCESS_CONTINUOUSLY)
>
> ... I get an infinite stream, but the application will naturally keep
> running until aborted manually.
>
> When I use FileProcessingMode.PROCESS_ONCE - the application exits after
> exhausting all inputs, but it seems that Flink also treats the end of the
> stream as max watermark so, for example, it will close all tumbling windows
> that I don't want to be closed yet since more data will arrive upon next
> run.
>
> Is there a way not to emit a max watermark with PROCESS_ONCE? If so, can I
> still trigger a savepoint when env.execute() returns?
>
> Alternatively, if I use PROCESS_CONTINUOUSLY along with env.executeAsync()
> is there a way for me to detect when file stream was exhausted to call
> job.stopWithSavepoint()?
>
> Thanks for your help!
> - Sergii
>
>

Reply via email to