Thanks for that code snippet, I should try it out to simulate my DAG.. If
any suggestions how to debug futher what's causing late data on a
production stream job, please let me know.

On Fri, May 11, 2018 at 2:18 PM, Piotr Nowojski <pi...@data-artisans.com>
wrote:

> Hey,
>
> Actually I think Fabian initial message was incorrect. As far as I can see
> in the code of WindowOperator (last lines of org.apache.flink.streaming.
> runtime.operators.windowing.WindowOperator#processElement ), the element
> is sent to late side output if it is late AND it wasn’t assigned to any of
> the existing windows (because they were late as well). In other words, it
> should work as you Juho are wishing: element should be marked as late once
> they are overdue/late for the window after one full day.
>
> I have tested it and it works as expected. Following program:
>
> https://gist.github.com/pnowojski/8cd650170925cf35be521cf236f1d97a
>
> Prints only ONE number to the standard err:
>
> > 1394
>
> And there is nothing on the side output.
>
> Piotrek
>
> On 11 May 2018, at 12:32, Juho Autio <juho.au...@rovio.com> wrote:
>
> Thanks. What I still don't get is why my message got filtered in the first
> place. Even if the allowed lateness filtering would be done "on the
> window", data should not be dropped as late if it's not in fact late by
> more than the allowedLateness setting.
>
> Assuming that these conditions hold:
> - messages (and thus the extracted timestamps) were not out of order by
> more than 5 secods (as far as I didn't make any mistake in my
> partition-level analysis)
> - allowedLateness=1 minute
> - watermarks are assigned on kafka consumer meaning that they are
> synchronized across all partitions
>
> I don't see how the watermark could have ever been more than 5 seconds
> further when the message arrives on the isElementLate filter. Do you have
> any idea on this? Is there some existing test that simulates out of order
> input to flink's kafka consumer? I could try to build a test case based on
> that to possibly reproduce my problem. I'm not sure how to gather enough
> debug information on the production stream so that it would clearly show
> the watermarks, how they progressed on each kafka partition & later in the
> chain in case isElementLate filters something.
>
> On Fri, May 11, 2018 at 12:12 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi Juho,
>>
>> Thanks for bringing up this topic! I share your intuition.
>> IMO, records should only be filtered out and send to a side output if any
>> of the windows they would be assigned to is closed already.
>>
>> I had a look into the code and found that records are filtered out as
>> late based on the following condition:
>>
>> protected boolean isElementLate(StreamRecord<IN> element){
>>    return (windowAssigner.isEventTime()) &&
>>       (element.getTimestamp() + allowedLateness <=
>> internalTimerService.currentWatermark());
>> }
>>
>>
>> This code shows that your analysis is correct.
>> Records are filtered out based on their timestamp and the current
>> watermark, even though they arrive before the window is closed.
>>
>> OTOH, filtering out records based on the window they would end up in can
>> also be tricky if records are assigned to multiple windows (e.g., sliding
>> windows).
>> In this case, a side-outputted records could still be in some windows and
>> not in others.
>>
>> @Aljoscha (CC) Might have an explanation for the current behavior.
>>
>> Thanks,
>> Fabian
>>
>>
>> 2018-05-11 10:55 GMT+02:00 Juho Autio <juho.au...@rovio.com>:
>>
>>> I don't understand why I'm getting some data discarded as late on my
>>> Flink stream job a long time before the window even closes.
>>>
>>> I can not be 100% sure, but to me it seems like the kafka consumer is
>>> basically causing the data to be dropped as "late", not the window. I
>>> didn't expect this to ever happen?
>>>
>>> I have a Flink stream job that gathers distinct values using a 24-hour
>>> window. It reads the data from Kafka, using a 
>>> BoundedOutOfOrdernessTimestampExtractor
>>> on the kafka consumer to synchronize watermarks accross all kafka
>>> partitions. The maxOutOfOrderness of the extractor is set to 10 seconds.
>>>
>>> I have also enabled allowedLateness with 1 minute lateness on the
>>> 24-hour window:
>>>
>>> .timeWindow(Time.days(1))
>>> .allowedLateness(Time.minutes(1))
>>> .sideOutputLateData(lateDataTag)
>>> .reduce(new DistinctFunction())
>>>
>>> I have used accumulators to see that there is some late data. I have
>>> had multiple occurrences of those.
>>>
>>> Now focusing on a particular case that I was investigating more closely.
>>> Around ~12:15 o-clock my late data accumulator started showing that 1
>>> message had been late. That's in the middle of the time window – so why
>>> would this happen? I would expect late data to be discarded only sometime
>>> after 00:01 if some data is arriving late for the window that just closed
>>> at 00:00, and doesn't get emitted as part of 1 minute allowedLateness.
>>>
>>> To analyze the timestamps I read all messages in sequence separately
>>> from each kafka partition and calculated the difference in timestamps
>>> between consecutive messages. I had had exactly one message categorized as
>>> late by Flink in this case, and at the time i was using maxOutOfOrderness
>>> = 5 seconds. I found exactly one message in one kafka partition where the
>>> timestamp difference between messages was 5 seconds (they were out of order
>>> by 5 s), which makes me wonder, did Flink drop the event as late because it
>>> violated maxOutOfOrderness? Have I misunderstood the concept of late
>>> data somehow? I only expected late data to happen on window operations. I
>>> would expect kafka consumer to pass "late" messages onward even though
>>> watermark doesn't change.
>>>
>>> Thank you very much if you can find the time to look at this!
>>>
>>
>>
>
>

Reply via email to