Hi, Actually, seems like spark dynamic allocation saves more resources in that case.
________________________________ From: Arvid Heise <ar...@ververica.com> Sent: Monday, May 18, 2020 11:15:09 PM To: Congxian Qiu <qcx978132...@gmail.com> Cc: Sergii Mikhtoniuk <mikhton...@gmail.com>; user <user@flink.apache.org> Subject: Re: Process available data and stop with savepoint Hi Sergii, your requirements feel a bit odd. It's neither batch nor streaming. Could you tell us why it's not possible to let the job run as a streaming job that runs continuously? Is it just a matter of saving costs? If so, you could monitor the number of records being processed and trigger stop/cancel-with-savepoint accordingly. On Mon, May 18, 2020 at 7:19 AM Congxian Qiu <qcx978132...@gmail.com<mailto:qcx978132...@gmail.com>> wrote: Hi Sergii If I understand correctly, you want to process all the files in some directory, and do not want to process them multiple times. I'm not sure if using `FileProcessingMode#PROCESS_CONTINUOUSLY` instead of `FileProcessingMode#PROCESS_ONCE`[1] can satisfy your needs, and keep the job running 7*24. but be careful, under `FileProcessingMode#CONTINUOUSLY` mode, when a file is modified, its contents are re-processed entirely. This can break the “exactly-once” semantics, as appending data at the end of a file will lead to all its contents being re-processed. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/datastream_api.html#data-sources Best, Congxian Sergii Mikhtoniuk <mikhton...@gmail.com<mailto:mikhton...@gmail.com>> 于2020年5月18日周一 上午5:47写道: Hello, I'm migrating my Spark-based stream processing application to Flink (Calcite SQL and temporal tables look too attractive to resist). My Spark app works as follows: - application is started periodically - it reads a directory of Parquet files as a stream - SQL transformations are applied - resulting append stream is written to another directory - it runs until all available data is processed - checkpoints its state - and **exits** - upon next run it resumes where it left off, processing only new data I'm having difficulties replicating this start-stop-resume behavior with Flink. When I setup my input stream using: env.readFile[Row](..., FileProcessingMode.PROCESS_CONTINUOUSLY) ... I get an infinite stream, but the application will naturally keep running until aborted manually. When I use FileProcessingMode.PROCESS_ONCE - the application exits after exhausting all inputs, but it seems that Flink also treats the end of the stream as max watermark so, for example, it will close all tumbling windows that I don't want to be closed yet since more data will arrive upon next run. Is there a way not to emit a max watermark with PROCESS_ONCE? If so, can I still trigger a savepoint when env.execute() returns? Alternatively, if I use PROCESS_CONTINUOUSLY along with env.executeAsync() is there a way for me to detect when file stream was exhausted to call job.stopWithSavepoint()? Thanks for your help! - Sergii -- Arvid Heise | Senior Java Developer [https://lh5.googleusercontent.com/ODbO0aq1IqKMfuoy_pw2YH8r6dqDRTq37rg3ytg11FCGJx12jJ1ff_SANPBxTHzSJTUQY9JLuoXq4NB7Om7j6Vq1lg6jIOKz8S5g2VKDGwicbj5fbY09PVb6mD5TdRuWEUvEMZTG]<https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward<https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng