That may very well be possible. The watermark delay guarantees that any record newer than or equal to watermark (that is, max event time seen - 20 seconds), will be considered and never be ignored. It does not guarantee the other way, that is, it does NOT guarantee that records older than the watermark will definitely get ignored. In a distributed setting, it is super hard to get strict guarantees, so we choose to err on the side of being more inclusive (that is, include some old data), rather than the side of dropping any not-old data.
I will update the programming guide to make this more clear. On Feb 6, 2018 5:01 PM, "Vishnu Viswanath" <[email protected]> wrote: > Could it be that these messages were processed in the same micro batch? In > that case, watermark will be updated only after the batch finishes which > did not have any effect of the late data in the current batch. > > On Tue, Feb 6, 2018 at 4:18 PM Jiewen Shao <[email protected]> wrote: > >> Ok, Thanks for confirmation. >> >> So based on my code, I have messages with following timestamps (converted >> to more readable format) in the following order: >> >> 2018-02-06 12:00:00 >> 2018-02-06 12:00:01 >> 2018-02-06 12:00:02 >> 2018-02-06 12:00:03 >> 2018-02-06 11:59:00 <-- this message should not be counted, right? >> however in my test, this one is still counted >> >> >> >> On Tue, Feb 6, 2018 at 2:05 PM, Vishnu Viswanath < >> [email protected]> wrote: >> >>> Yes, that is correct. >>> >>> On Tue, Feb 6, 2018 at 4:56 PM, Jiewen Shao <[email protected]> >>> wrote: >>> >>>> Vishnu, thanks for the reply >>>> so "event time" and "window end time" have nothing to do with current >>>> system timestamp, watermark moves with the higher value of "timestamp" >>>> field of the input and never moves down, is that correct understanding? >>>> >>>> >>>> On Tue, Feb 6, 2018 at 11:47 AM, Vishnu Viswanath < >>>> [email protected]> wrote: >>>> >>>>> Hi >>>>> >>>>> 20 second corresponds to when the window state should be cleared. For >>>>> the late message to be dropped, it should come in after you receive a >>>>> message with event time >= window end time + 20 seconds. >>>>> >>>>> I wrote a post on this recently: http://vishnuviswanath.com/spark_ >>>>> structured_streaming.html#watermark >>>>> >>>>> Thanks, >>>>> Vishnu >>>>> >>>>> On Tue, Feb 6, 2018 at 12:11 PM Jiewen Shao <[email protected]> >>>>> wrote: >>>>> >>>>>> sample code: >>>>>> >>>>>> Let's say Xyz is POJO with a field called timestamp, >>>>>> >>>>>> regarding code withWatermark("timestamp", "20 seconds") >>>>>> >>>>>> I expect the msg with timestamp 20 seconds or older will be dropped, >>>>>> what does 20 seconds compare to? based on my test nothing was dropped no >>>>>> matter how old the timestamp is, what did i miss? >>>>>> >>>>>> Dataset<Xyz> xyz = lines >>>>>> .as(Encoders.STRING()) >>>>>> .map((MapFunction<String, Xyz>) value -> mapper.readValue(value, >>>>>> Xyz.class), Encoders.bean(Xyz.class)); >>>>>> >>>>>> Dataset<Row> aggregated = xyz.withWatermark("timestamp", "20 seconds") >>>>>> .groupBy(functions.window(xyz.col("timestamp"), "5 seconds"), >>>>>> xyz.col("x") //tumbling window of size 5 seconds (timestamp) >>>>>> ).count(); >>>>>> >>>>>> Thanks >>>>>> >>>>>> >>>> >>> >>
