Hi all,

I am trying to implement a custom TimestampPolicy and use the event
time instead of the processing time

the flow is read from kafka using

.withTimestampPolicyFactory((tp, previousWatermark) -> new
EventTimestampPolicy(previousWatermark));

and the custom class with implementation i saw many times when i
searched the beam group/docs
public class EventTimestampPolicy extends TimestampPolicy<String,
GenericRecord> {
    protected Instant currentWatermark;

    public EventTimestampPolicy(final Optional<Instant> previousWatermark) {
        this.currentWatermark =
previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
    }


    @Override
    public Instant getTimestampForRecord(final PartitionContext ctx,
                                         final KafkaRecord<String,
GenericRecord> record) {
        this.currentWatermark = new
Instant(record.getKV().getValue().get("timestmp"));
        return this.currentWatermark;
    }

    @Override
    public Instant getWatermark(final PartitionContext ctx) {
        return this.currentWatermark;
    }
}

I have opened a fixed window of 10 sec and  .withAllowedLateness(Duration.ZERO)
when running the tests i see the following

1. event time: 2023-03-27T11:28:13.  window:
[2023-03-27T11:28:10.000Z..2023-03-27T11:28:20.000Z]
2. event time: 2023-03-27T11:28:15  window:
[2023-03-27T11:28:10.000Z..2023-03-27T11:28:20.000Z]
3. event time: 2023-03-27T11:28:28 new window:
[2023-03-27T11:28:20.000Z..2023-03-27T11:28:30.000Z]

After 15 second i am sending a message with "old" event time stamp
4. event time: 2023-03-27T11:28:12 and it seem that it still joins
window [2023-03-27T11:28:10.000Z..2023-03-27T11:28:20.000Z]

I probably miss something basic in event time processing here but I
have a few questions:
a. I am not sure why the windows are set with these times. If the
first message is at 11:28:13 why does the window start at 11:28:10?
b. windows should be closed when they achieve the watermark so in this
impl the window is closed for each message?
c. how late messages are identified in this case.  my purpose is to
identify event 4 as late and discard it...

any clarification would help
Thanks in advance
Sigalit

Reply via email to