Hello Kafka experts

I'm trying to do windowed aggregation with Kafka Stream, however I'm
getting multiple messages for the same time window, I know this is an
expected behavior, however I really want to have a single message for given
time window.

my test code looks like below

    builder.stream("test-stream")
      .groupByKey()
      .aggregate(
        new DataPointsInitializer,
        new DataPointsAggregator,
        TimeWindows.of(60000).until(60000),
        new DataPointsSerde,
        "test-stream")
      .toStream()
      .print()

But if data arrives like this (it has its own time field)

01:38:20,Metric1,10
01:38:21,Metric1,10

< long pause >

01:38:22,Metric1,10

Then I get output like this

[KTABLE-TOSTREAM-0000000002]: [Metric1@1487813880000] , Map(10.0 -> 2)
[KTABLE-TOSTREAM-0000000002]: [Metric1@1487813880000] , Map(10.0 -> 3)

I want to drop the last one so that I don't have duplicate messages, Thanks
-- 
Kohki Nishio

Reply via email to