Hi, Sachin.

IIUC, it is in the second situation you listed, that is:
[ reduceData (d1, d2, d3), reducedData(ld1, d2, d3, late d4, late d5, late d6) 
]. 
However, because of `table.exec.emit.late-fire.delay`, it could also be such as 
[ reduceData (d1, d2, d3), reducedData(ld1, d2, d3, late d4, late d5), 
reducedData(ld1, d2, d3, late d4, late d5, late d6) ]


Actually, allow-lateness(table.exec.emit.allow-lateness) is used to control 
when it decides not to change the value of the window output, and
allowing the framework to automatically clear the corresponding state.


> Also if I want the reduced data from late records to not include the data 
> emitted within the window bounds, how can I do the same ?
or if this is handled as default case ?


Maybe side output[1] can help you to collect the late data and re-compute them.
[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/side_output/



--

    Best!
    Xuyang




At 2024-04-17 16:56:54, "Sachin Mittal" <sjmit...@gmail.com> wrote:

Hi,



Suppose my pipeline is:


data
.keyBy(new MyKeySelector())
    .window(TumblingEventTimeWindows.of(Time.seconds(60)))
    .allowedLateness(Time.seconds(180))
    .reduce(new MyDataReducer())


So I wanted to know if the final output stream would contain reduced data at 
the end of the window mark and also another reduced data at the end of allowed 
lateness ?
If that is the case, then the reduced data at the end of allowed lateness would 
also include the data from non late records or it will only include reduced 
data from late records.


Example


If I have data in sequence:


[window start], d1, d2, d3, [window end], late d4, late d5, late d6, [end of 
allowed lateness] 


The resultant stream after window and reduce operation would be:


[ reduceData (d1, d2, d3), reducedData(late d4, late d5, late d6) ]


or
[ reduceData (d1, d2, d3), reducedData(ld1, d2, d3, late d4, late d5, late d6) ]


or something else ?


Also if I want the reduced data from late records to not include the data 
emitted within the window bounds, how can I do the same ?
or if this is handled as default case ?


Thanks
Sachin







Reply via email to