Hi Arti,
what exactly do you mean by "checkpoints do not work"? Are there
exceptions being thrown? How are you writing your file-based sources,
what API methods are you using?
Best,
Aljoscha
On 20.08.20 16:21, Arti Pande wrote:
Hi Till,
Thank you for your quick response. Both the AssignerWithPeriodicWatermarks
and WatermarkStrategy I am using are very simple ones.
*Code for AssignerWithPeriodicWatermarks:*
public class CustomEventTimeWatermarkGenerator implements
AssignerWithPeriodicWatermarks<MyPojo> {
private final long maxOutOfOrderness = 0;
private long currentMaxTimestamp;
@Override
public long extractTimestamp(MyPojo myPojo, long previousTimestamp) {
long timestamp = myPojo.getInitiationTime().toEpochMilli();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}
*Code for WatermarkStrategy :*
WatermarkStrategy<MyPojo> watermarkStrategy =
WatermarkStrategy.<MyPojo>forBoundedOutOfOrderness(Duration.ofMillis(0))
.withTimestampAssigner((event, timestamp) ->
event.getInitiationTime().toEpochMilli());
Thanks & regards,
Arti
On Thu, Aug 20, 2020 at 11:43 AM Till Rohrmann <trohrm...@apache.org> wrote:
Hi Arti,
thanks for sharing this feedback with us. The WatermarkStrategy has been
introduced quite recently and might have some rough edges. I am pulling in
Aljoscha and Klou who have worked on this feature and might be able to help
you. For better understanding your problem, it would be great if you could
share the AssignerWithPeriodicWatermarks/WatermarkStrategy code with us.
For the file source, the Flink community has recently introduced a new
source abstraction which will also support checkpoints for file sources
once the file source connector has been migrated to the new interfaces. The
community is currently working on it.
Cheers,
Till
On Wed, Aug 19, 2020 at 5:38 PM Arti Pande <pande.a...@gmail.com> wrote:
Hi,
When migrating Stream API based Flink application from 1.9.2 to 1.11.1
the watermark generation has issues with file source alone. It works well
with Kafka source.
With 1.9.2 a custom watermark generator implementation of
AssignerWithPeriodicWatermarks was to be used. Starting 1.11.1 it is
deprecated and to be replaced with WatermarkStrategy (that combines both
WatermarkGenerator and TimestampAssigner).
With Flink 1.11.1 when using Kafka source both the above options (i.e.
old AssignerWithPeriodicWatermarks and new WatermarkStrategy) work
perfectly well but with file source none of them works. The watermark
assigner never increments the watermarks resulting in stateful operators
not clearing their state ever, leading to erroneous results and
continuously increasing memory usage.
Same code works well with Kafka source. Is this a known issue? If so, any
fix planned shortly?
A side note (and probably a candidate for separate email, but I will
write it here) even checkpoints do not work with File Source since 1.9.2
and it is still the problem with 1.11.1. Just wondering if File source with
stream API is not a priority in Flink development? If so we can rethink our
sources.
Thanks & regards,
Arti