Thanks for reporting back and the debugging advice!

Best, Fabian

2018-06-08 9:00 GMT+02:00 Juho Autio <juho.au...@rovio.com>:

> Flink was NOT at fault. Turns out our Kafka producer had OS level clock
> sync problems :(
>
> Because of that, our Kafka occasionally had some messages in between with
> an incorrect timestamp. In practice they were about 7 days older than they
> should.
>
> I'm really sorry for wasting your time on this. But thank you once more
> for taking the time to answer.
>
> For any similar case, I would first advise user to extra carefully compare
> the actual timestamps of their input data. For me it was helpful to make
> this change in my Flink job: for late data output, include both processing
> time (DateTime.now()) along with the event time (original timestamp).
>
> On Mon, May 14, 2018 at 12:42 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Thanks for correcting me Piotr. I didn't look close enough at the code.
>> With the presently implemented logic, a record should not be emitted to a
>> side output if its window wasn't closed yet.
>>
>> 2018-05-11 14:13 GMT+02:00 Piotr Nowojski <pi...@data-artisans.com>:
>>
>>> Generally speaking best practise is always to simplify your program as
>>> much as possible to narrow down the scope of the search. Replace data
>>> source with statically generated events, remove unnecessary components Etc.
>>> Either such process help you figure out what’s wrong on your own and if
>>> not, if you share us such minimal program that reproduces the issue, it
>>> will allow  us to debug it.
>>>
>>> Piotrek
>>>
>>>
>>> On 11 May 2018, at 13:54, Juho Autio <juho.au...@rovio.com> wrote:
>>>
>>> Thanks for that code snippet, I should try it out to simulate my DAG..
>>> If any suggestions how to debug futher what's causing late data on a
>>> production stream job, please let me know.
>>>
>>> On Fri, May 11, 2018 at 2:18 PM, Piotr Nowojski <pi...@data-artisans.com
>>> > wrote:
>>>
>>>> Hey,
>>>>
>>>> Actually I think Fabian initial message was incorrect. As far as I can
>>>> see in the code of WindowOperator (last lines of 
>>>> org.apache.flink.streaming.
>>>> runtime.operators.windowing.WindowOperator#processElement ), the
>>>> element is sent to late side output if it is late AND it wasn’t assigned to
>>>> any of the existing windows (because they were late as well). In other
>>>> words, it should work as you Juho are wishing: element should be marked as
>>>> late once they are overdue/late for the window after one full day.
>>>>
>>>> I have tested it and it works as expected. Following program:
>>>>
>>>> https://gist.github.com/pnowojski/8cd650170925cf35be521cf236f1d97a
>>>>
>>>> Prints only ONE number to the standard err:
>>>>
>>>> > 1394
>>>>
>>>> And there is nothing on the side output.
>>>>
>>>> Piotrek
>>>>
>>>> On 11 May 2018, at 12:32, Juho Autio <juho.au...@rovio.com> wrote:
>>>>
>>>> Thanks. What I still don't get is why my message got filtered in the
>>>> first place. Even if the allowed lateness filtering would be done "on the
>>>> window", data should not be dropped as late if it's not in fact late by
>>>> more than the allowedLateness setting.
>>>>
>>>> Assuming that these conditions hold:
>>>> - messages (and thus the extracted timestamps) were not out of order by
>>>> more than 5 secods (as far as I didn't make any mistake in my
>>>> partition-level analysis)
>>>> - allowedLateness=1 minute
>>>> - watermarks are assigned on kafka consumer meaning that they are
>>>> synchronized across all partitions
>>>>
>>>> I don't see how the watermark could have ever been more than 5 seconds
>>>> further when the message arrives on the isElementLate filter. Do you have
>>>> any idea on this? Is there some existing test that simulates out of order
>>>> input to flink's kafka consumer? I could try to build a test case based on
>>>> that to possibly reproduce my problem. I'm not sure how to gather enough
>>>> debug information on the production stream so that it would clearly show
>>>> the watermarks, how they progressed on each kafka partition & later in the
>>>> chain in case isElementLate filters something.
>>>>
>>>> On Fri, May 11, 2018 at 12:12 PM, Fabian Hueske <fhue...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Juho,
>>>>>
>>>>> Thanks for bringing up this topic! I share your intuition.
>>>>> IMO, records should only be filtered out and send to a side output if
>>>>> any of the windows they would be assigned to is closed already.
>>>>>
>>>>> I had a look into the code and found that records are filtered out as
>>>>> late based on the following condition:
>>>>>
>>>>> protected boolean isElementLate(StreamRecord<IN> element){
>>>>>    return (windowAssigner.isEventTime()) &&
>>>>>       (element.getTimestamp() + allowedLateness <=
>>>>> internalTimerService.currentWatermark());
>>>>> }
>>>>>
>>>>>
>>>>> This code shows that your analysis is correct.
>>>>> Records are filtered out based on their timestamp and the current
>>>>> watermark, even though they arrive before the window is closed.
>>>>>
>>>>> OTOH, filtering out records based on the window they would end up in
>>>>> can also be tricky if records are assigned to multiple windows (e.g.,
>>>>> sliding windows).
>>>>> In this case, a side-outputted records could still be in some windows
>>>>> and not in others.
>>>>>
>>>>> @Aljoscha (CC) Might have an explanation for the current behavior.
>>>>>
>>>>> Thanks,
>>>>> Fabian
>>>>>
>>>>>
>>>>> 2018-05-11 10:55 GMT+02:00 Juho Autio <juho.au...@rovio.com>:
>>>>>
>>>>>> I don't understand why I'm getting some data discarded as late on my
>>>>>> Flink stream job a long time before the window even closes.
>>>>>>
>>>>>> I can not be 100% sure, but to me it seems like the kafka consumer is
>>>>>> basically causing the data to be dropped as "late", not the window. I
>>>>>> didn't expect this to ever happen?
>>>>>>
>>>>>> I have a Flink stream job that gathers distinct values using a
>>>>>> 24-hour window. It reads the data from Kafka, using
>>>>>> a BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to
>>>>>> synchronize watermarks accross all kafka partitions. The 
>>>>>> maxOutOfOrderness
>>>>>> of the extractor is set to 10 seconds.
>>>>>>
>>>>>> I have also enabled allowedLateness with 1 minute lateness on the
>>>>>> 24-hour window:
>>>>>>
>>>>>> .timeWindow(Time.days(1))
>>>>>> .allowedLateness(Time.minutes(1))
>>>>>> .sideOutputLateData(lateDataTag)
>>>>>> .reduce(new DistinctFunction())
>>>>>>
>>>>>> I have used accumulators to see that there is some late data. I have
>>>>>> had multiple occurrences of those.
>>>>>>
>>>>>> Now focusing on a particular case that I was investigating more
>>>>>> closely. Around ~12:15 o-clock my late data accumulator started showing
>>>>>> that 1 message had been late. That's in the middle of the time window – 
>>>>>> so
>>>>>> why would this happen? I would expect late data to be discarded only
>>>>>> sometime after 00:01 if some data is arriving late for the window that 
>>>>>> just
>>>>>> closed at 00:00, and doesn't get emitted as part of 1 minute
>>>>>> allowedLateness.
>>>>>>
>>>>>> To analyze the timestamps I read all messages in sequence separately
>>>>>> from each kafka partition and calculated the difference in timestamps
>>>>>> between consecutive messages. I had had exactly one message categorized 
>>>>>> as
>>>>>> late by Flink in this case, and at the time i was using maxOutOfOrderness
>>>>>> = 5 seconds. I found exactly one message in one kafka partition where the
>>>>>> timestamp difference between messages was 5 seconds (they were out of 
>>>>>> order
>>>>>> by 5 s), which makes me wonder, did Flink drop the event as late because 
>>>>>> it
>>>>>> violated maxOutOfOrderness? Have I misunderstood the concept of late
>>>>>> data somehow? I only expected late data to happen on window operations. I
>>>>>> would expect kafka consumer to pass "late" messages onward even though
>>>>>> watermark doesn't change.
>>>>>>
>>>>>> Thank you very much if you can find the time to look at this!
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>>
>>>
>>
>

Reply via email to