Hi Sachin,

Firstly sorry for my misunderstanding about watermarking in the last
email. When you configure an out-of-orderness watermark with a
tolerance of B, the next watermark emitted after a record with
timestamp T would be T-B instead of T described in my last email.

Then let's go back to your question. When the Flink job receives n
records with timestamp Tn, it will set the timestamp of the next
watermark to be max(Tn - B - 1ms), and that watermark will be emitted
after the next autoWatermarkInternal is reached. So a record with
timestamp T will not influence records less than T - B immediately,
instead it influences the next watermark, and the watermark afterwards
influences those records.

As for the influence on those late records, Flink operators will drop
them by default, but you can also gather them for other downstream
logics. Please refer to
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/#getting-late-data-as-a-side-output

Based on the analysis above, if you configure allowed lateness to A,
records with timestamps less than T - A - B will be dropped or
gathered as side outputs.

Best,
Yunfeng

On Fri, Apr 12, 2024 at 6:34 PM Sachin Mittal <sjmit...@gmail.com> wrote:
>
> Hi Yunfeng,
> I have a question around the tolerance for out of order bound watermarking,
>
> What I understand that when consuming from source with out of order bound set 
> as B, lets say it gets a record with timestamp T.
> After that it will drop all the subsequent records which arrive with the 
> timestamp less than T - B.
>
> Please let me know if I understood this correctly.
>
> If this is correct, then how does allowed lateness when performing event time 
> windowing works ?  Say allowed lateness is set as A,
> does this mean that value of A should be less than that of B because records 
> with timestamp less than T - B would have already been dropped at the source.
>
> If this is not the case than how does lateness work with our of order 
> boundedness ?
>
> Thanks
> Sachin
>
>
> On Fri, Apr 12, 2024 at 12:30 PM Yunfeng Zhou <flink.zhouyunf...@gmail.com> 
> wrote:
>>
>> Hi Sachin,
>>
>> 1. When your Flink job performs an operation like map or flatmap, the
>> output records would be automatically assigned with the same timestamp
>> as the input record. You don't need to manually assign the timestamp
>> in each step. So the windowing result in your example should be as you
>> have expected.
>>
>> 2. The frequency of watermarks can be configured by
>> pipeline.auto-watermark-interval in flink-conf.yaml, or
>> ExecutionConfig#setAutoWatermarkInterval in Java API. In your example,
>> the event time related to the Watermark is still T, just that the job
>> will tolerate any records whose timestamp is in range [T-B, T].
>>
>> Best,
>> Yunfeng
>>
>> On Thu, Apr 11, 2024 at 9:15 PM Sachin Mittal <sjmit...@gmail.com> wrote:
>> >
>> > Hello folks,
>> > I have few questions:
>> >
>> > Say I have a source like this:
>> >
>> > final DataStream<Data> data =
>> >     env.fromSource(
>> >         source,
>> >         
>> > WatermarkStrategy.<Data>forBoundedOutOfOrderness(Duration.ofSeconds(60))
>> >             .withTimestampAssigner((event, timestamp) -> event.timestamp));
>> >
>> >
>> > My pipeline after this is as followed:
>> >
>> >     data.flatMap(new MyFlattendData())
>> >         .keyBy(new MyKeySelector())
>> >         .window(TumblingEventTimeWindows.of(Time.seconds(60)))
>> >         .reduce(new MyReducer());
>> >
>> >
>> > First question I have is that the timestamp I assign from the source, 
>> > would it get carried to all steps below to my window ?
>> > Example say I have timestamped data from source as:
>> > => [ (10, data1), (12, data2), (59, data3), (61, data4), ...  ]
>> >
>> >  would this get flattened to say:
>> > => [ (10, flatdata1), (12, flatdata2), (61, flatdata4), ...]
>> >
>> > then keyed to say:
>> > => [ (10, [key1, flatdata1]),   (12, [key1, flatdata2]),   (61, [key1, 
>> > flatdata4]),    ...    ]
>> >
>> > windows:
>> > 1st => [ flatdata1, flatdata2 ]
>> > 2nd => [ flatdata4, ... ]
>> >
>> > Would the windows created before the reduce function be applied be like I 
>> > have illustrated or to have it this way, do I need to output a record at 
>> > each step with the timestamp assigned for that record ?
>> >
>> > Basically is the timestamp assigned when reading from the source pushed 
>> > (retained) down to all the steps below when doing event time window 
>> > operation ?
>> >
>> >
>> > Next question is in my watermark strategy: how do I set the period of the 
>> > watermarking.
>> > Basically from An out-of-order bound B means that once an event with 
>> > timestamp T was encountered, no events older than T - B will follow any 
>> > more when the watermarking is done.
>> >
>> > However, how frequently is watermarking done and when say watermarking, 
>> > the last encountered event was with timestamp T , does this mean watermark 
>> > timestamp would be T - B ?
>> >
>> > How can we control the watermarking period ?
>> >
>> > Thanks
>> > Sachin
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >

Reply via email to