I’m using a single task manager with 3 task slots, can there be a skew in this 
case?
And how am I expected to handle this situation? Is there a best practice to 
guarentee exactly once execution of the events in the window without events 
being dropped?

(BTW, just to correct my example, the pastFirstElementInPane() should be with a 
delay of 1 hour + 3 minutes)

Thanks,
Ifat

From: Reuven Lax <[email protected]>
Date: Wednesday, 21 February 2024 at 22:43
To: "[email protected]" <[email protected]>
Cc: "Ifat Afek (Nokia)" <[email protected]>
Subject: Re: Some events are discarded from a FixedWindow


CAUTION: This is an external email. Please be very careful when clicking links 
or opening attachments. See the URL nok.it/ext for additional information.




On Wed, Feb 21, 2024 at 12:39 PM Ifat Afek (Nokia) via user 
<[email protected]<mailto:[email protected]>> wrote:
Hi,

We have a Beam-SQL pipeline on top of Flink, that once in 5 min gets a bunch of 
events from Kafka and should execute an SQL command on a 1-hour window. Some of 
the events arrive late.
I’m using KafkaIO.withTimestampPolicyFactory() to set one of the object’s 
fields as the timestamp.
For the aggregation, it’s important that the window triggers exactly once with 
all the events, with allowed lateness of 3 minutes. I defined the window as:

        final PCollection<Row> windowSelectFields = selectFields
                .apply("Windowing", Window
                        .<Row>into(FixedWindows.of(Duration.standardHours(1)))
                        .triggering(Repeatedly.forever(
                                        
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(3)))
                        )
                        .withAllowedLateness(Duration.standardMinutes(3))
                        .accumulatingFiredPanes()
                );

When tested on a smaller window with a small number of events, I see that the 
first 3 out of 10 events are being discarded. From the log, it looks like the 
trigger is executed 1 second ahead of time. I suspect that as a result, its 
shouldFire() method returns false, since 3 minutes have not passed yet.

Processing-time triggers are based on the local clock on a worker, and clocks 
can skew between workers (they can even skew between different processes on the 
same worker).


2024-02-21 16:27:08,079 DEBUG 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator [] - 
Setting timer: 1:1708533008079 at 1708533008079 with output time 1708533008079. 
                     (that is 4:30:08.079 PM)

And later on:

2024-02-21 16:30:07,944 DEBUG org.apache.beam.sdk.util.WindowTracing            
           [] - ReduceFnRunner: Received timer key:Row:
call_direction:-1729318488
; window:[2024-02-21T16:24:00.000Z..2024-02-21T16:26:00.000Z); 
data:TimerData{timerId=1:1708533008079, timerFamilyId=, 
namespace=Window([2024-02-21T16:24:00.000Z..2024-02-21T16:26:00.000Z)), 
timestamp=2024-02-21T16:30:08.079Z, outputTimestamp=2024-02-21T16:30:08.079Z, 
domain=PROCESSING_TIME, deleted=false} with 
inputWatermark:2024-02-21T16:18:04.000Z; 
outputWatermark:2024-02-21T16:18:04.000Z

Is my understanding correct?
Did I define the window and timestamps correctly?
Any help would be appreciated.

Thanks,
Ifat

Reply via email to