Hi folks, I'm experimenting with Kafka Streams windowed aggregation and came across window retention period behavior I don't fully understand. I'm using custom timestamp extractor which gets the timestamp from the payload. Values are aggregated using tumbling time windows and summed by the key. I am using kafka and kafka-streams with 0.10.1.1 version.
Full code can be found at https://gist.github.com/xdralex/845bcf8f06ab0cfcf9785d9f95450b88, but in general I'm doing the following: val input: KStream[String, String] = builder.stream(Serdes.String(), Serdes.String(), "TimeInputTopic") val window: Windows[TimeWindow] = TimeWindows.of(60000).advanceBy(60000).until(30000) val aggregated: KTable[Windowed[String], JInt] = input .mapValues((v: String) => parse(v)._2) .groupByKey(Serdes.String(), Serdes.Integer()) .reduce((a: JInt, b: JInt) => (a + b).asInstanceOf[JInt], window, "TimeStore1") aggregated.foreach { (w: Windowed[String], s: JInt) => val start = new DateTime(w.window().start(), DateTimeZone.UTC) val end = new DateTime(w.window().end(), DateTimeZone.UTC) println(s"Aggregated: $start..$end - ${w.key()} - $s") } Here is the data being sent to TimeInputTopic: a,1970-01-01T00:00:00Z 1 b,1970-01-01T00:00:01Z 2 a,1970-01-01T00:00:02Z 3 b,1970-01-01T00:05:01Z 10 b,1970-01-01T00:00:15Z 10 Here is the output: Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - a - 4 Aggregated: 1970-01-01T00:05:00.000Z..1970-01-01T00:06:00.000Z - b - 10 Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 12 Here is what confuses me. I would expect that once an event <b,05:01 10> is received, it should update some sort of "current" time value (watermark?), and, because 05:01 is bigger than 00:00..01:00 + 30 seconds of retention, either: A) Drop the bucket 00:00:00..00:01:00, and once an event <b,00:15 10> is received, recreate this bucket. This means there would be two outputs for 00:00..01:00: Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 2 Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 10 Not sure about this behavior because http://docs.confluent.io/3.1.1/streams/developer-guide.html#windowing-a-stream is saying: "Kafka Streams guarantees to keep a window for *at least this specified time*". So I guess the window can be kept longer... B) Just drop the incoming event <b,00:15 10> altogether. In this case there would be only one output for 00:00..01:00: Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 2 I could expect this because http://docs.confluent.io/3.1.1/streams/concepts.html#windowing is saying: "If a record arrives after the retention period has passed, the record cannot be processed and is dropped". Hope all this makes sense. Few questions: – If the window can be kept in store longer, are there any thresholds when it will finally be purged? For example, it would be nice to flush old buckets if they are taking too much space. – How is "current" time value updated / how do Kafka Streams decide that the retention period has passed? Does it maintain a watermark with the biggest time seen? – What is the right mailing list to ask questions about Kafka Streams? I had a choice between this one and Confluent Platform list, and given that open source part of CP consists from patched vanilla Kafka, was not sure where to write. Thanks, Alex