Hi folks,

I'm experimenting with Kafka Streams windowed aggregation and came across
window retention period behavior I don't fully understand.
I'm using custom timestamp extractor which gets the timestamp from the
payload. Values are aggregated using tumbling time windows and summed by
the key.
I am using kafka and kafka-streams with 0.10.1.1 version.

Full code can be found at
https://gist.github.com/xdralex/845bcf8f06ab0cfcf9785d9f95450b88, but in
general I'm doing the following:

val input: KStream[String, String] = builder.stream(Serdes.String(),
Serdes.String(), "TimeInputTopic")
val window: Windows[TimeWindow] =
TimeWindows.of(60000).advanceBy(60000).until(30000)

val aggregated: KTable[Windowed[String], JInt] = input
  .mapValues((v: String) => parse(v)._2)
  .groupByKey(Serdes.String(), Serdes.Integer())
  .reduce((a: JInt, b: JInt) => (a + b).asInstanceOf[JInt], window,
"TimeStore1")

aggregated.foreach {
  (w: Windowed[String], s: JInt) =>
    val start = new DateTime(w.window().start(), DateTimeZone.UTC)
    val end = new DateTime(w.window().end(), DateTimeZone.UTC)
    println(s"Aggregated: $start..$end - ${w.key()} - $s")
}

Here is the data being sent to TimeInputTopic:
a,1970-01-01T00:00:00Z 1
b,1970-01-01T00:00:01Z 2
a,1970-01-01T00:00:02Z 3
b,1970-01-01T00:05:01Z 10
b,1970-01-01T00:00:15Z 10

Here is the output:
Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - a - 4
Aggregated: 1970-01-01T00:05:00.000Z..1970-01-01T00:06:00.000Z - b - 10
Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 12

Here is what confuses me.
I would expect that once an event <b,05:01 10> is received, it should
update some sort of "current" time value (watermark?), and, because 05:01 is
bigger than 00:00..01:00 + 30 seconds of retention, either:

A) Drop the bucket 00:00:00..00:01:00, and once an event <b,00:15 10> is
received, recreate this bucket. This means there would be two outputs for
00:00..01:00:
Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 2
Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 10
Not sure about this behavior because
http://docs.confluent.io/3.1.1/streams/developer-guide.html#windowing-a-stream
is saying: "Kafka Streams guarantees to keep a window for *at least this
specified time*". So I guess the window can be kept longer...


B) Just drop the incoming event <b,00:15 10> altogether. In this case there
would be only one output for 00:00..01:00:
Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 2
I could expect this because
http://docs.confluent.io/3.1.1/streams/concepts.html#windowing is saying:
"If a record arrives after the retention period has passed, the record
cannot be processed and is dropped".


Hope all this makes sense. Few questions:

– If the window can be kept in store longer, are there any thresholds when
it will finally be purged? For example, it would be nice to flush old
buckets if they are taking too much space.

– How is "current" time value updated / how do Kafka Streams decide that
the retention period has passed? Does it maintain a watermark with the
biggest time seen?

– What is the right mailing list to ask questions about Kafka Streams? I
had a choice between this one and Confluent Platform list, and given that
open source part of CP consists from patched vanilla Kafka, was not sure
where to write.

Thanks,
Alex

Reply via email to