Hi Joe, As per the spark structured streaming documentation and I quote "withWatermark must be called on the same column as the timestamp column used in the aggregate. For example, df.withWatermark("time", "1 min").groupBy("time2").count() is invalid in Append output mode, as watermark is defined on a different column from the aggregation column."
*And after referring the following code * // Group the data by window and word and compute the count of each group val windowedCounts = words .withWatermark("timestamp", "10 minutes") .groupBy( window($"timestamp", "10 minutes", "5 minutes"), $"word") .count() I would suggest you to try following code df = inputStream.withWatermark("eventtime", "20 seconds").groupBy($"sharedId", window($"eventtime","20 seconds", "10 seconds")) And If this doesn't work, you can try trigger on query. Regards, Suket On Tue, 14 May 2019 at 19:18, Joe Ammann <j...@pyx.ch> wrote: > Hi all > > I'm fairly new to Spark structured streaming and I'm only starting to > develop an understanding for the watermark handling. > > Our application reads data from a Kafka input topic and as one of the > first steps, it has to group incoming messages. Those messages come in > bulks, e.g. 5 messages which belong to the same "business event" (share a > common key), with event timestamps differing in only a few millisecs. And > then no messages for say 3 minutes. And after that another bulk of 3 > messages with very close event timestamps. > > I have set a watermark of 20 seconds on my streaming query, and a groupBy > on the shared common key, and a window of 20 seconds (10 seconds sliding). > So something like > > df = inputStream.withWatermark("eventtime", "20 > seconds").groupBy("sharedId", window("20 seconds", "10 seconds") > > The output mode is set to append, since I intend to join this streams with > other streams later in the application. > > Naively, I would have expected to see any incoming bulk of messages as an > aggregated message ~20 seconds after it's eventtime on the output stream. > But my observations indicate that the "latest bulk of events" always stays > queued inside the query, until a new bulk of events arrive and bump up the > watermark. In my example above, this means that I see the first bulk of > events only after 3 minutes, when the second bulk comes in. > > This does indeed make some sense, and if I understand the documentation > correctly the watermark is only ever updated upon arrival of new inputs. > The "real time" does not play a role in the setting of watermarks. > > But to me this means that any bulk of events is prohibited from being sent > downstreams until a new bulk comes in. This is not what I intended. > > Is my understanding more or less correct? And is there any way of bringing > "the real time" into the calculation of the watermark (short of producing > regular dummy messages which are then again filtered out). > > -- > CU, Joe > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >