Hi, We have a Beam-SQL pipeline on top of Flink, that once in 5 min gets a bunch of events from Kafka and should execute an SQL command on a 1-hour window. Some of the events arrive late. I’m using KafkaIO.withTimestampPolicyFactory() to set one of the object’s fields as the timestamp. For the aggregation, it’s important that the window triggers exactly once with all the events, with allowed lateness of 3 minutes. I defined the window as:
final PCollection<Row> windowSelectFields = selectFields .apply("Windowing", Window .<Row>into(FixedWindows.of(Duration.standardHours(1))) .triggering(Repeatedly.forever( AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(3))) ) .withAllowedLateness(Duration.standardMinutes(3)) .accumulatingFiredPanes() ); When tested on a smaller window with a small number of events, I see that the first 3 out of 10 events are being discarded. From the log, it looks like the trigger is executed 1 second ahead of time. I suspect that as a result, its shouldFire() method returns false, since 3 minutes have not passed yet. 2024-02-21 16:27:08,079 DEBUG org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator [] - Setting timer: 1:1708533008079 at 1708533008079 with output time 1708533008079. (that is 4:30:08.079 PM) And later on: 2024-02-21 16:30:07,944 DEBUG org.apache.beam.sdk.util.WindowTracing [] - ReduceFnRunner: Received timer key:Row: call_direction:-1729318488 ; window:[2024-02-21T16:24:00.000Z..2024-02-21T16:26:00.000Z); data:TimerData{timerId=1:1708533008079, timerFamilyId=, namespace=Window([2024-02-21T16:24:00.000Z..2024-02-21T16:26:00.000Z)), timestamp=2024-02-21T16:30:08.079Z, outputTimestamp=2024-02-21T16:30:08.079Z, domain=PROCESSING_TIME, deleted=false} with inputWatermark:2024-02-21T16:18:04.000Z; outputWatermark:2024-02-21T16:18:04.000Z Is my understanding correct? Did I define the window and timestamps correctly? Any help would be appreciated. Thanks, Ifat