Interesting question.
I do not think without any aggregation operation/groupBy , watermark is
supported currently .

Reason:
Watermark in Structured Streaming is used for limiting the size of state
needed to keep intermediate information in-memory.
And state only comes in picture in case of stateful processing.
Also in the code, it seems that  filtering out records on basis of
watermark happen only in case of stateful operators
(statefulOperators.scala)
Have not tried running code though and would like to know if someone can
shed more light on this.

Regards,
Chandan


On Sat, Sep 22, 2018 at 7:43 PM peay <p...@protonmail.com.invalid> wrote:

> Hello,
>
> I am trying to use watermarking without aggregation, to filter out records
> that are just too late, instead of appending them to the output. My
> understanding is that aggregation is required for `withWatermark` to have
> any effect. Is that correct?
>
> I am looking for something along the lines of
>
> ```
> df.withWatermark("ts", ...).filter(F.col("ts") <F.getCurrentWatermark())
> ```
>
> Is there any way to get the watermark value to achieve that?
>
> Thanks!
>


-- 
Chandan Prakash

Reply via email to