Hi Joe, How often do you trigger your mini-batch? Maybe you can specify the trigger time explicitly to a low value or even better set it off.
See: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers Best, Anastasios On Tue, May 14, 2019 at 3:49 PM Joe Ammann <j...@pyx.ch> wrote: > Hi all > > I'm fairly new to Spark structured streaming and I'm only starting to > develop an understanding for the watermark handling. > > Our application reads data from a Kafka input topic and as one of the > first steps, it has to group incoming messages. Those messages come in > bulks, e.g. 5 messages which belong to the same "business event" (share a > common key), with event timestamps differing in only a few millisecs. And > then no messages for say 3 minutes. And after that another bulk of 3 > messages with very close event timestamps. > > I have set a watermark of 20 seconds on my streaming query, and a groupBy > on the shared common key, and a window of 20 seconds (10 seconds sliding). > So something like > > df = inputStream.withWatermark("eventtime", "20 > seconds").groupBy("sharedId", window("20 seconds", "10 seconds") > > The output mode is set to append, since I intend to join this streams with > other streams later in the application. > > Naively, I would have expected to see any incoming bulk of messages as an > aggregated message ~20 seconds after it's eventtime on the output stream. > But my observations indicate that the "latest bulk of events" always stays > queued inside the query, until a new bulk of events arrive and bump up the > watermark. In my example above, this means that I see the first bulk of > events only after 3 minutes, when the second bulk comes in. > > This does indeed make some sense, and if I understand the documentation > correctly the watermark is only ever updated upon arrival of new inputs. > The "real time" does not play a role in the setting of watermarks. > > But to me this means that any bulk of events is prohibited from being sent > downstreams until a new bulk comes in. This is not what I intended. > > Is my understanding more or less correct? And is there any way of bringing > "the real time" into the calculation of the watermark (short of producing > regular dummy messages which are then again filtered out). > > -- > CU, Joe > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- -- Anastasios Zouzias <a...@zurich.ibm.com>