I return back .
Which StateStore could I use for this problem?
and another idea .I can send 'flush' message into this topic .
when received this message could update results to db.
I don't know it's work?

________________________________
funk...@live.com

From: Guozhang Wang<mailto:wangg...@gmail.com>
Date: 2018-03-12 03:58
To: users<mailto:users@kafka.apache.org>
Subject: Re: Re: kafka steams with TimeWindows ,incorrect result
If you want to strictly "only have one output per window", then for now
you'd probably implement that logic using a lower-level "transform"
function in which you can schedule a punctuate function to send all the
results at the end of a window.

If you just want to reduce the amount of data to your sink, but your sink
can still handle overwritten records of the same key, you can enlarge the
cache size via the cache.max.bytes.buffering config.

https://kafka.apache.org/documentation/#streamsconfigs

On Fri, Mar 9, 2018 at 9:45 PM, 杰 杨 <funk...@live.com> wrote:

> thx for your reply!
> I see that it is designed to operate on an infinite, unbounded stream of
> data.
> now I want to process for  unbounded stream but divided by time interval .
> so what can I do for doing this ?
>
> ________________________________
> funk...@live.com
>
> From: Guozhang Wang<mailto:wangg...@gmail.com>
> Date: 2018-03-10 02:50
> To: users<mailto:users@kafka.apache.org>
> Subject: Re: kafka steams with TimeWindows ,incorrect result
> Hi Jie,
>
> This is by design of Kafka Streams, please read this doc for more details
> (search for "outputs of the Wordcount application is actually a continuous
> stream of updates"):
>
> https://kafka.apache.org/0110/documentation/streams/quickstart
>
> Note this semantics applies for both windowed and un-windowed tables.
>
>
> Guozhang
>
> On Fri, Mar 9, 2018 at 5:36 AM, 杰 杨 <funk...@live.com> wrote:
>
> > Hi:
> > I used TimeWindow for aggregate data in kafka.
> >
> > this is code snippet ;
> >
> >   view.flatMap(new MultipleKeyValueMapper(client)
> > ).groupByKey(Serialized.with(Serdes.String(),
> >                 Serdes.serdeFrom(new CountInfoSerializer(), new
> > CountInfoDeserializer())))
> >         .windowedBy(TimeWindows.of(60000)).reduce(new
> > Reducer<CountInfo>() {
> >             @Override
> >             public CountInfo apply(CountInfo value1, CountInfo value2) {
> >                 return new CountInfo(value1.start + value2.start,
> > value1.active + value2.active, value1.fresh + value2.fresh);
> >             }
> >         }) .toStream(new KeyValueMapper<Windowed<String>, CountInfo,
> > String>() {
> >             @Override
> >             public String apply(Windowed<String> key, CountInfo value) {
> >                 return key.key();
> >             }
> >         }).print(Printed.toSysOut());
> >
> >         KafkaStreams streams = new KafkaStreams(builder.build(),
> > KStreamReducer.getConf());
> >         streams.start();
> >
> > and I test 30000 data in kafka .
> > and I print key value .
> >
> >
> > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09_hour_
> > 21@1520601300000/1520601360000], CountInfo{start=12179, active=12179,
> > fresh=12179}
> > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09@
> 1520601300000/1520601360000],
> > CountInfo{start=12179, active=12179, fresh=12179}
> > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09_hour_
> > 21@1520601300000/1520601360000], CountInfo{start=30000, active=30000,
> > fresh=30000}
> > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09@
> 1520601300000/1520601360000],
> > CountInfo{start=30000, active=30000, fresh=30000}
> > why in one window duration will be print two result but not one result ?
> >
> > ________________________________
> > funk...@live.com
> >
>
>
>
> --
> -- Guozhang
>



--
-- Guozhang

Reply via email to