Ok, Thanks for confirmation. So based on my code, I have messages with following timestamps (converted to more readable format) in the following order:
2018-02-06 12:00:00 2018-02-06 12:00:01 2018-02-06 12:00:02 2018-02-06 12:00:03 2018-02-06 11:59:00 <-- this message should not be counted, right? however in my test, this one is still counted On Tue, Feb 6, 2018 at 2:05 PM, Vishnu Viswanath < [email protected]> wrote: > Yes, that is correct. > > On Tue, Feb 6, 2018 at 4:56 PM, Jiewen Shao <[email protected]> > wrote: > >> Vishnu, thanks for the reply >> so "event time" and "window end time" have nothing to do with current >> system timestamp, watermark moves with the higher value of "timestamp" >> field of the input and never moves down, is that correct understanding? >> >> >> On Tue, Feb 6, 2018 at 11:47 AM, Vishnu Viswanath < >> [email protected]> wrote: >> >>> Hi >>> >>> 20 second corresponds to when the window state should be cleared. For >>> the late message to be dropped, it should come in after you receive a >>> message with event time >= window end time + 20 seconds. >>> >>> I wrote a post on this recently: http://vishnuviswana >>> th.com/spark_structured_streaming.html#watermark >>> >>> Thanks, >>> Vishnu >>> >>> On Tue, Feb 6, 2018 at 12:11 PM Jiewen Shao <[email protected]> >>> wrote: >>> >>>> sample code: >>>> >>>> Let's say Xyz is POJO with a field called timestamp, >>>> >>>> regarding code withWatermark("timestamp", "20 seconds") >>>> >>>> I expect the msg with timestamp 20 seconds or older will be dropped, >>>> what does 20 seconds compare to? based on my test nothing was dropped no >>>> matter how old the timestamp is, what did i miss? >>>> >>>> Dataset<Xyz> xyz = lines >>>> .as(Encoders.STRING()) >>>> .map((MapFunction<String, Xyz>) value -> mapper.readValue(value, >>>> Xyz.class), Encoders.bean(Xyz.class)); >>>> >>>> Dataset<Row> aggregated = xyz.withWatermark("timestamp", "20 seconds") >>>> .groupBy(functions.window(xyz.col("timestamp"), "5 seconds"), >>>> xyz.col("x") //tumbling window of size 5 seconds (timestamp) >>>> ).count(); >>>> >>>> Thanks >>>> >>>> >> >
