Thanks for the info, however there's an alarming functionality, duplicate message is a tricky thing to manage.. I thought 'retention-period' could work for that purpose, however here's the result
My TimeWindow is TimeWindows.of(60000).until(60000), And here's the input 00:00:00,metric,1 00:01:00,metric,1 00:03:00,metric,1 00:04:00,metric,1 00:05:00,metric,1 00:06:00,metric,1 <long pause> 00:00:00,metric,2 <long pause> 00:00:00,metric,3 The output below [metric@1487894400000] , Map(1.0 -> 1) [metric@1487894460000] , Map(1.0 -> 1) [metric@1487894580000] , Map(1.0 -> 1) [metric@1487894640000] , Map(1.0 -> 1) [metric@1487894700000] , Map(1.0 -> 1) [metric@1487894760000] , Map(1.0 -> 1) [metric@1487894400000] , Map(2.0 -> 1, 1.0 -> 1) <======== ?? [metric@1487894400000] , Map(2.0 -> 1, 1.0 -> 1, 3.0 -> 1) <====== ?? I don't understand why the last two happens ... I'm looking into the source code, however I wonder if I'm doing something wrong .. On Fri, Feb 24, 2017 at 8:33 AM, Eno Thereska <eno.there...@gmail.com> wrote: > Hi Kohki, > > As you mentioned, this is expected behavior. However, if you are willing > to tolerate some more latency, you can improve the chance that a message > with the same key is overwritten by increasing the commit time. By default > it is 30 seconds, but you can increase it: > > streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 40000); > > This will make the dedup cache work better (for documentation see > http://docs.confluent.io/3.1.2/streams/developer-guide. > html#memory-management <http://docs.confluent.io/3.1. > 2/streams/developer-guide.html#memory-management>). However, this does > not guarantee deduplicates do not happen. > > Thanks > Eno > > > > On 24 Feb 2017, at 15:20, Kohki Nishio <tarop...@gmail.com> wrote: > > > > Hello Kafka experts > > > > I'm trying to do windowed aggregation with Kafka Stream, however I'm > > getting multiple messages for the same time window, I know this is an > > expected behavior, however I really want to have a single message for > given > > time window. > > > > my test code looks like below > > > > builder.stream("test-stream") > > .groupByKey() > > .aggregate( > > new DataPointsInitializer, > > new DataPointsAggregator, > > TimeWindows.of(60000).until(60000), > > new DataPointsSerde, > > "test-stream") > > .toStream() > > .print() > > > > But if data arrives like this (it has its own time field) > > > > 01:38:20,Metric1,10 > > 01:38:21,Metric1,10 > > > > < long pause > > > > > 01:38:22,Metric1,10 > > > > Then I get output like this > > > > [KTABLE-TOSTREAM-0000000002]: [Metric1@1487813880000] , Map(10.0 -> 2) > > [KTABLE-TOSTREAM-0000000002]: [Metric1@1487813880000] , Map(10.0 -> 3) > > > > I want to drop the last one so that I don't have duplicate messages, > Thanks > > -- > > Kohki Nishio > > -- Kohki Nishio