Thanks for reporting back and the debugging advice! Best, Fabian
2018-06-08 9:00 GMT+02:00 Juho Autio <juho.au...@rovio.com>: > Flink was NOT at fault. Turns out our Kafka producer had OS level clock > sync problems :( > > Because of that, our Kafka occasionally had some messages in between with > an incorrect timestamp. In practice they were about 7 days older than they > should. > > I'm really sorry for wasting your time on this. But thank you once more > for taking the time to answer. > > For any similar case, I would first advise user to extra carefully compare > the actual timestamps of their input data. For me it was helpful to make > this change in my Flink job: for late data output, include both processing > time (DateTime.now()) along with the event time (original timestamp). > > On Mon, May 14, 2018 at 12:42 PM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Thanks for correcting me Piotr. I didn't look close enough at the code. >> With the presently implemented logic, a record should not be emitted to a >> side output if its window wasn't closed yet. >> >> 2018-05-11 14:13 GMT+02:00 Piotr Nowojski <pi...@data-artisans.com>: >> >>> Generally speaking best practise is always to simplify your program as >>> much as possible to narrow down the scope of the search. Replace data >>> source with statically generated events, remove unnecessary components Etc. >>> Either such process help you figure out what’s wrong on your own and if >>> not, if you share us such minimal program that reproduces the issue, it >>> will allow us to debug it. >>> >>> Piotrek >>> >>> >>> On 11 May 2018, at 13:54, Juho Autio <juho.au...@rovio.com> wrote: >>> >>> Thanks for that code snippet, I should try it out to simulate my DAG.. >>> If any suggestions how to debug futher what's causing late data on a >>> production stream job, please let me know. >>> >>> On Fri, May 11, 2018 at 2:18 PM, Piotr Nowojski <pi...@data-artisans.com >>> > wrote: >>> >>>> Hey, >>>> >>>> Actually I think Fabian initial message was incorrect. As far as I can >>>> see in the code of WindowOperator (last lines of >>>> org.apache.flink.streaming. >>>> runtime.operators.windowing.WindowOperator#processElement ), the >>>> element is sent to late side output if it is late AND it wasn’t assigned to >>>> any of the existing windows (because they were late as well). In other >>>> words, it should work as you Juho are wishing: element should be marked as >>>> late once they are overdue/late for the window after one full day. >>>> >>>> I have tested it and it works as expected. Following program: >>>> >>>> https://gist.github.com/pnowojski/8cd650170925cf35be521cf236f1d97a >>>> >>>> Prints only ONE number to the standard err: >>>> >>>> > 1394 >>>> >>>> And there is nothing on the side output. >>>> >>>> Piotrek >>>> >>>> On 11 May 2018, at 12:32, Juho Autio <juho.au...@rovio.com> wrote: >>>> >>>> Thanks. What I still don't get is why my message got filtered in the >>>> first place. Even if the allowed lateness filtering would be done "on the >>>> window", data should not be dropped as late if it's not in fact late by >>>> more than the allowedLateness setting. >>>> >>>> Assuming that these conditions hold: >>>> - messages (and thus the extracted timestamps) were not out of order by >>>> more than 5 secods (as far as I didn't make any mistake in my >>>> partition-level analysis) >>>> - allowedLateness=1 minute >>>> - watermarks are assigned on kafka consumer meaning that they are >>>> synchronized across all partitions >>>> >>>> I don't see how the watermark could have ever been more than 5 seconds >>>> further when the message arrives on the isElementLate filter. Do you have >>>> any idea on this? Is there some existing test that simulates out of order >>>> input to flink's kafka consumer? I could try to build a test case based on >>>> that to possibly reproduce my problem. I'm not sure how to gather enough >>>> debug information on the production stream so that it would clearly show >>>> the watermarks, how they progressed on each kafka partition & later in the >>>> chain in case isElementLate filters something. >>>> >>>> On Fri, May 11, 2018 at 12:12 PM, Fabian Hueske <fhue...@gmail.com> >>>> wrote: >>>> >>>>> Hi Juho, >>>>> >>>>> Thanks for bringing up this topic! I share your intuition. >>>>> IMO, records should only be filtered out and send to a side output if >>>>> any of the windows they would be assigned to is closed already. >>>>> >>>>> I had a look into the code and found that records are filtered out as >>>>> late based on the following condition: >>>>> >>>>> protected boolean isElementLate(StreamRecord<IN> element){ >>>>> return (windowAssigner.isEventTime()) && >>>>> (element.getTimestamp() + allowedLateness <= >>>>> internalTimerService.currentWatermark()); >>>>> } >>>>> >>>>> >>>>> This code shows that your analysis is correct. >>>>> Records are filtered out based on their timestamp and the current >>>>> watermark, even though they arrive before the window is closed. >>>>> >>>>> OTOH, filtering out records based on the window they would end up in >>>>> can also be tricky if records are assigned to multiple windows (e.g., >>>>> sliding windows). >>>>> In this case, a side-outputted records could still be in some windows >>>>> and not in others. >>>>> >>>>> @Aljoscha (CC) Might have an explanation for the current behavior. >>>>> >>>>> Thanks, >>>>> Fabian >>>>> >>>>> >>>>> 2018-05-11 10:55 GMT+02:00 Juho Autio <juho.au...@rovio.com>: >>>>> >>>>>> I don't understand why I'm getting some data discarded as late on my >>>>>> Flink stream job a long time before the window even closes. >>>>>> >>>>>> I can not be 100% sure, but to me it seems like the kafka consumer is >>>>>> basically causing the data to be dropped as "late", not the window. I >>>>>> didn't expect this to ever happen? >>>>>> >>>>>> I have a Flink stream job that gathers distinct values using a >>>>>> 24-hour window. It reads the data from Kafka, using >>>>>> a BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to >>>>>> synchronize watermarks accross all kafka partitions. The >>>>>> maxOutOfOrderness >>>>>> of the extractor is set to 10 seconds. >>>>>> >>>>>> I have also enabled allowedLateness with 1 minute lateness on the >>>>>> 24-hour window: >>>>>> >>>>>> .timeWindow(Time.days(1)) >>>>>> .allowedLateness(Time.minutes(1)) >>>>>> .sideOutputLateData(lateDataTag) >>>>>> .reduce(new DistinctFunction()) >>>>>> >>>>>> I have used accumulators to see that there is some late data. I have >>>>>> had multiple occurrences of those. >>>>>> >>>>>> Now focusing on a particular case that I was investigating more >>>>>> closely. Around ~12:15 o-clock my late data accumulator started showing >>>>>> that 1 message had been late. That's in the middle of the time window – >>>>>> so >>>>>> why would this happen? I would expect late data to be discarded only >>>>>> sometime after 00:01 if some data is arriving late for the window that >>>>>> just >>>>>> closed at 00:00, and doesn't get emitted as part of 1 minute >>>>>> allowedLateness. >>>>>> >>>>>> To analyze the timestamps I read all messages in sequence separately >>>>>> from each kafka partition and calculated the difference in timestamps >>>>>> between consecutive messages. I had had exactly one message categorized >>>>>> as >>>>>> late by Flink in this case, and at the time i was using maxOutOfOrderness >>>>>> = 5 seconds. I found exactly one message in one kafka partition where the >>>>>> timestamp difference between messages was 5 seconds (they were out of >>>>>> order >>>>>> by 5 s), which makes me wonder, did Flink drop the event as late because >>>>>> it >>>>>> violated maxOutOfOrderness? Have I misunderstood the concept of late >>>>>> data somehow? I only expected late data to happen on window operations. I >>>>>> would expect kafka consumer to pass "late" messages onward even though >>>>>> watermark doesn't change. >>>>>> >>>>>> Thank you very much if you can find the time to look at this! >>>>>> >>>>> >>>>> >>>> >>>> >>> >>> >>> >> >