Arti, The problem with watermarks and the File source operator will be fixed in 1.11.2 [1]. This bug was introduced in 1.10.0, and isn't related to the new WatermarkStrategy api.
[1] https://issues.apache.org/jira/browse/FLINK-19109 David On Wed, Sep 9, 2020 at 2:52 PM Arti Pande <pande.a...@gmail.com> wrote: > Hi Aljoscha, > > By "checkpoints do not work" what I mean is ever since Flink 1.9.2 till > 1.11.1 when using File source the source operator (guessing split > enumerator or metadata reader) finishes immediately after starting (and > assigning the splits to split readers) hence when first checkpoint is > triggered, it sees the state of the first operator i.e. source as finished > and hence does not do any checkpointing. Thats' what you can see in logs > and also on the Flink UI for checkpoints. It assumes that the pipeline is > about to finish shortly and aborts the checkpoint. > > This along with the watermark generation problems kind of make it > difficult to use file source in production. > > > On Mon, Aug 24, 2020 at 4:01 PM Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Hi Arti, >> >> what exactly do you mean by "checkpoints do not work"? Are there >> exceptions being thrown? How are you writing your file-based sources, >> what API methods are you using? >> >> Best, >> Aljoscha >> >> On 20.08.20 16:21, Arti Pande wrote: >> > Hi Till, >> > >> > Thank you for your quick response. Both the >> AssignerWithPeriodicWatermarks >> > and WatermarkStrategy I am using are very simple ones. >> > >> > *Code for AssignerWithPeriodicWatermarks:* >> > >> > public class CustomEventTimeWatermarkGenerator implements >> > AssignerWithPeriodicWatermarks<MyPojo> { >> > >> > private final long maxOutOfOrderness = 0; >> > private long currentMaxTimestamp; >> > >> > @Override >> > public long extractTimestamp(MyPojo myPojo, long >> previousTimestamp) { >> > long timestamp = myPojo.getInitiationTime().toEpochMilli(); >> > currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); >> > return timestamp; >> > } >> > >> > @Override >> > public Watermark getCurrentWatermark() { >> > return new Watermark(currentMaxTimestamp - maxOutOfOrderness); >> > } >> > } >> > >> > >> > *Code for WatermarkStrategy :* >> > >> > WatermarkStrategy<MyPojo> watermarkStrategy = >> > >> WatermarkStrategy.<MyPojo>forBoundedOutOfOrderness(Duration.ofMillis(0)) >> > .withTimestampAssigner((event, timestamp) -> >> > event.getInitiationTime().toEpochMilli()); >> > >> > >> > Thanks & regards, >> > Arti >> > >> > >> > On Thu, Aug 20, 2020 at 11:43 AM Till Rohrmann <trohrm...@apache.org> >> wrote: >> > >> >> Hi Arti, >> >> >> >> thanks for sharing this feedback with us. The WatermarkStrategy has >> been >> >> introduced quite recently and might have some rough edges. I am >> pulling in >> >> Aljoscha and Klou who have worked on this feature and might be able to >> help >> >> you. For better understanding your problem, it would be great if you >> could >> >> share the AssignerWithPeriodicWatermarks/WatermarkStrategy code with >> us. >> >> >> >> For the file source, the Flink community has recently introduced a new >> >> source abstraction which will also support checkpoints for file sources >> >> once the file source connector has been migrated to the new >> interfaces. The >> >> community is currently working on it. >> >> >> >> Cheers, >> >> Till >> >> >> >> On Wed, Aug 19, 2020 at 5:38 PM Arti Pande <pande.a...@gmail.com> >> wrote: >> >> >> >>> Hi, >> >>> >> >>> When migrating Stream API based Flink application from 1.9.2 to 1.11.1 >> >>> the watermark generation has issues with file source alone. It works >> well >> >>> with Kafka source. >> >>> >> >>> With 1.9.2 a custom watermark generator implementation of >> >>> AssignerWithPeriodicWatermarks was to be used. Starting 1.11.1 it is >> >>> deprecated and to be replaced with WatermarkStrategy (that combines >> both >> >>> WatermarkGenerator and TimestampAssigner). >> >>> >> >>> With Flink 1.11.1 when using Kafka source both the above options (i.e. >> >>> old AssignerWithPeriodicWatermarks and new WatermarkStrategy) work >> >>> perfectly well but with file source none of them works. The watermark >> >>> assigner never increments the watermarks resulting in stateful >> operators >> >>> not clearing their state ever, leading to erroneous results and >> >>> continuously increasing memory usage. >> >>> >> >>> Same code works well with Kafka source. Is this a known issue? If so, >> any >> >>> fix planned shortly? >> >>> >> >>> A side note (and probably a candidate for separate email, but I will >> >>> write it here) even checkpoints do not work with File Source since >> 1.9.2 >> >>> and it is still the problem with 1.11.1. Just wondering if File >> source with >> >>> stream API is not a priority in Flink development? If so we can >> rethink our >> >>> sources. >> >>> >> >>> Thanks & regards, >> >>> Arti >> >>> >> >> >> > >> >>