Interesting question. I do not think without any aggregation operation/groupBy , watermark is supported currently .
Reason: Watermark in Structured Streaming is used for limiting the size of state needed to keep intermediate information in-memory. And state only comes in picture in case of stateful processing. Also in the code, it seems that filtering out records on basis of watermark happen only in case of stateful operators (statefulOperators.scala) Have not tried running code though and would like to know if someone can shed more light on this. Regards, Chandan On Sat, Sep 22, 2018 at 7:43 PM peay <p...@protonmail.com.invalid> wrote: > Hello, > > I am trying to use watermarking without aggregation, to filter out records > that are just too late, instead of appending them to the output. My > understanding is that aggregation is required for `withWatermark` to have > any effect. Is that correct? > > I am looking for something along the lines of > > ``` > df.withWatermark("ts", ...).filter(F.col("ts") <F.getCurrentWatermark()) > ``` > > Is there any way to get the watermark value to achieve that? > > Thanks! > -- Chandan Prakash