Arti,

The problem with watermarks and the File source operator will be fixed in
1.11.2 [1]. This bug was introduced in 1.10.0, and isn't related to the new
WatermarkStrategy api.

[1] https://issues.apache.org/jira/browse/FLINK-19109

David

On Wed, Sep 9, 2020 at 2:52 PM Arti Pande <pande.a...@gmail.com> wrote:

> Hi Aljoscha,
>
> By "checkpoints do not work" what I mean is ever since Flink 1.9.2 till
> 1.11.1 when using File source the source operator (guessing split
> enumerator or metadata reader) finishes immediately after starting (and
> assigning the splits to split readers) hence when first checkpoint is
> triggered, it sees the state of the first operator i.e. source as finished
> and hence does not do any checkpointing. Thats' what you can see in logs
> and also on the Flink UI for checkpoints. It assumes that the pipeline is
> about to finish shortly and aborts the checkpoint.
>
> This along with the watermark generation problems kind of make it
> difficult to use file source in production.
>
>
> On Mon, Aug 24, 2020 at 4:01 PM Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Hi Arti,
>>
>> what exactly do you mean by "checkpoints do not work"? Are there
>> exceptions being thrown? How are you writing your file-based sources,
>> what API methods are you using?
>>
>> Best,
>> Aljoscha
>>
>> On 20.08.20 16:21, Arti Pande wrote:
>> > Hi Till,
>> >
>> > Thank you for your quick response. Both the
>> AssignerWithPeriodicWatermarks
>> > and WatermarkStrategy I am using are very simple ones.
>> >
>> > *Code for AssignerWithPeriodicWatermarks:*
>> >
>> > public class CustomEventTimeWatermarkGenerator implements
>> > AssignerWithPeriodicWatermarks<MyPojo> {
>> >
>> >      private final long maxOutOfOrderness = 0;
>> >      private long currentMaxTimestamp;
>> >
>> >      @Override
>> >      public long extractTimestamp(MyPojo myPojo, long
>> previousTimestamp) {
>> >          long timestamp = myPojo.getInitiationTime().toEpochMilli();
>> >          currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
>> >          return timestamp;
>> >      }
>> >
>> >      @Override
>> >      public Watermark getCurrentWatermark() {
>> >          return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
>> >      }
>> > }
>> >
>> >
>> > *Code for WatermarkStrategy :*
>> >
>> > WatermarkStrategy<MyPojo> watermarkStrategy =
>> >
>> WatermarkStrategy.<MyPojo>forBoundedOutOfOrderness(Duration.ofMillis(0))
>> >                  .withTimestampAssigner((event, timestamp) ->
>> > event.getInitiationTime().toEpochMilli());
>> >
>> >
>> > Thanks & regards,
>> > Arti
>> >
>> >
>> > On Thu, Aug 20, 2020 at 11:43 AM Till Rohrmann <trohrm...@apache.org>
>> wrote:
>> >
>> >> Hi Arti,
>> >>
>> >> thanks for sharing this feedback with us. The WatermarkStrategy has
>> been
>> >> introduced quite recently and might have some rough edges. I am
>> pulling in
>> >> Aljoscha and Klou who have worked on this feature and might be able to
>> help
>> >> you. For better understanding your problem, it would be great if you
>> could
>> >> share the AssignerWithPeriodicWatermarks/WatermarkStrategy code with
>> us.
>> >>
>> >> For the file source, the Flink community has recently introduced a new
>> >> source abstraction which will also support checkpoints for file sources
>> >> once the file source connector has been migrated to the new
>> interfaces. The
>> >> community is currently working on it.
>> >>
>> >> Cheers,
>> >> Till
>> >>
>> >> On Wed, Aug 19, 2020 at 5:38 PM Arti Pande <pande.a...@gmail.com>
>> wrote:
>> >>
>> >>> Hi,
>> >>>
>> >>> When migrating Stream API based Flink application from 1.9.2 to 1.11.1
>> >>> the watermark generation has issues with file source alone. It works
>> well
>> >>> with Kafka source.
>> >>>
>> >>> With 1.9.2 a custom watermark generator implementation of
>> >>> AssignerWithPeriodicWatermarks was to be used. Starting 1.11.1 it is
>> >>> deprecated and to be replaced with WatermarkStrategy (that combines
>> both
>> >>> WatermarkGenerator and TimestampAssigner).
>> >>>
>> >>> With Flink 1.11.1 when using Kafka source both the above options (i.e.
>> >>> old  AssignerWithPeriodicWatermarks  and new WatermarkStrategy) work
>> >>> perfectly well but with file source none of them works. The watermark
>> >>> assigner never increments the watermarks resulting in stateful
>> operators
>> >>> not clearing their state ever, leading to erroneous results and
>> >>> continuously increasing memory usage.
>> >>>
>> >>> Same code works well with Kafka source. Is this a known issue? If so,
>> any
>> >>> fix planned shortly?
>> >>>
>> >>> A side note (and probably a candidate for separate email, but I will
>> >>> write it here) even checkpoints do not work with File Source since
>> 1.9.2
>> >>> and it is still the problem with 1.11.1. Just wondering if File
>> source with
>> >>> stream API is not a priority in Flink development? If so we can
>> rethink our
>> >>> sources.
>> >>>
>> >>> Thanks & regards,
>> >>> Arti
>> >>>
>> >>
>> >
>>
>>

Reply via email to