Hi all, I am trying to implement a custom TimestampPolicy and use the event time instead of the processing time
the flow is read from kafka using .withTimestampPolicyFactory((tp, previousWatermark) -> new EventTimestampPolicy(previousWatermark)); and the custom class with implementation i saw many times when i searched the beam group/docs public class EventTimestampPolicy extends TimestampPolicy<String, GenericRecord> { protected Instant currentWatermark; public EventTimestampPolicy(final Optional<Instant> previousWatermark) { this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE); } @Override public Instant getTimestampForRecord(final PartitionContext ctx, final KafkaRecord<String, GenericRecord> record) { this.currentWatermark = new Instant(record.getKV().getValue().get("timestmp")); return this.currentWatermark; } @Override public Instant getWatermark(final PartitionContext ctx) { return this.currentWatermark; } } I have opened a fixed window of 10 sec and .withAllowedLateness(Duration.ZERO) when running the tests i see the following 1. event time: 2023-03-27T11:28:13. window: [2023-03-27T11:28:10.000Z..2023-03-27T11:28:20.000Z] 2. event time: 2023-03-27T11:28:15 window: [2023-03-27T11:28:10.000Z..2023-03-27T11:28:20.000Z] 3. event time: 2023-03-27T11:28:28 new window: [2023-03-27T11:28:20.000Z..2023-03-27T11:28:30.000Z] After 15 second i am sending a message with "old" event time stamp 4. event time: 2023-03-27T11:28:12 and it seem that it still joins window [2023-03-27T11:28:10.000Z..2023-03-27T11:28:20.000Z] I probably miss something basic in event time processing here but I have a few questions: a. I am not sure why the windows are set with these times. If the first message is at 11:28:13 why does the window start at 11:28:10? b. windows should be closed when they achieve the watermark so in this impl the window is closed for each message? c. how late messages are identified in this case. my purpose is to identify event 4 as late and discard it... any clarification would help Thanks in advance Sigalit