Thanks for the info, however there's an alarming functionality, duplicate
message is a tricky thing to manage.. I thought 'retention-period' could
work for that purpose, however here's the result

My TimeWindow is

TimeWindows.of(60000).until(60000),

And here's the input

00:00:00,metric,1
00:01:00,metric,1
00:03:00,metric,1
00:04:00,metric,1
00:05:00,metric,1
00:06:00,metric,1

<long pause>

00:00:00,metric,2

<long pause>

00:00:00,metric,3

The output below

[metric@1487894400000] , Map(1.0 -> 1)
[metric@1487894460000] , Map(1.0 -> 1)
[metric@1487894580000] , Map(1.0 -> 1)
[metric@1487894640000] , Map(1.0 -> 1)
[metric@1487894700000] , Map(1.0 -> 1)
[metric@1487894760000] , Map(1.0 -> 1)
[metric@1487894400000] , Map(2.0 -> 1, 1.0 -> 1)  <======== ??
[metric@1487894400000] , Map(2.0 -> 1, 1.0 -> 1, 3.0 -> 1) <====== ??

I don't understand why the last two happens ... I'm looking into the source
code, however I wonder if I'm doing something wrong ..


On Fri, Feb 24, 2017 at 8:33 AM, Eno Thereska <eno.there...@gmail.com>
wrote:

> Hi Kohki,
>
> As you mentioned, this is expected behavior. However, if you are willing
> to tolerate some more latency, you can improve the chance that a message
> with the same key is overwritten by increasing the commit time. By default
> it is 30 seconds, but you can increase it:
>
> streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 40000);
>
> This will make the dedup cache work better (for documentation see
> http://docs.confluent.io/3.1.2/streams/developer-guide.
> html#memory-management <http://docs.confluent.io/3.1.
> 2/streams/developer-guide.html#memory-management>). However, this does
> not guarantee deduplicates do not happen.
>
> Thanks
> Eno
>
>
> > On 24 Feb 2017, at 15:20, Kohki Nishio <tarop...@gmail.com> wrote:
> >
> > Hello Kafka experts
> >
> > I'm trying to do windowed aggregation with Kafka Stream, however I'm
> > getting multiple messages for the same time window, I know this is an
> > expected behavior, however I really want to have a single message for
> given
> > time window.
> >
> > my test code looks like below
> >
> >    builder.stream("test-stream")
> >      .groupByKey()
> >      .aggregate(
> >        new DataPointsInitializer,
> >        new DataPointsAggregator,
> >        TimeWindows.of(60000).until(60000),
> >        new DataPointsSerde,
> >        "test-stream")
> >      .toStream()
> >      .print()
> >
> > But if data arrives like this (it has its own time field)
> >
> > 01:38:20,Metric1,10
> > 01:38:21,Metric1,10
> >
> > < long pause >
> >
> > 01:38:22,Metric1,10
> >
> > Then I get output like this
> >
> > [KTABLE-TOSTREAM-0000000002]: [Metric1@1487813880000] , Map(10.0 -> 2)
> > [KTABLE-TOSTREAM-0000000002]: [Metric1@1487813880000] , Map(10.0 -> 3)
> >
> > I want to drop the last one so that I don't have duplicate messages,
> Thanks
> > --
> > Kohki Nishio
>
>


-- 
Kohki Nishio

Reply via email to