Re: Re: kafka streams with TimeWindows ,incorrect result

2018-04-27 Thread
the value is defined type which implements Serializer and Deserializer


funk...@live.com

From: Ted Yu<mailto:yuzhih...@gmail.com>
Date: 2018-04-27 16:39
To: users<mailto:users@kafka.apache.org>; 杰 杨<mailto:funk...@live.com>
Subject: Re: Re: kafka streams with TimeWindows ,incorrect result
Noticed a typo in streams in subject.

Corrected it in this reply.

 Original message 
From: 杰 杨 <funk...@live.com>
Date: 4/27/18 1:28 AM (GMT-08:00)
To: 杰 杨 <funk...@live.com>, users <users@kafka.apache.org>
Subject: Re: Re: kafka steams with TimeWindows ,incorrect result

and I checked windowStore interface found it has put method not get method.
in one second
the stream had sample key and different value in it.
and I must update key value which store in it.



funk...@live.com

From: funk...@live.com<mailto:funk...@live.com>
Date: 2018-04-27 16:08
To: users<mailto:users@kafka.apache.org>
Subject: Re: Re: kafka steams with TimeWindows ,incorrect result
Hi:
I don't kown what to do with transform function.
and stream is preapred well
like this at blew
key:
44_14_2018-04-27
value:
CountInfo(start=1,active=0,fresh =0)

there is amount data like that。
how I aggregate it with peer 1 seconds using transform function?



funk...@live.com

From: Guozhang Wang<mailto:wangg...@gmail.com>
Date: 2018-04-27 01:50
To: users<mailto:users@kafka.apache.org>
Subject: Re: Re: kafka steams with TimeWindows ,incorrect result
Using a control message to flush results to downstream (in your case to the
result db) looks good to me as well.

On Thu, Apr 26, 2018 at 10:49 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> If you're talking about which store to use in your transform function, it
> should be a windowed store.
>
> You can create such a store with the `Stores` factory, and suppose your
> old code has `windowedBy(TimeWindows.of(6))`, then you can do
>
> `
> windows = TimeWindows.of(6);
>
> Stores.WindowStoreBuilder(
> Stores.persistentWindowStore("Counts"),
> windows.maintainMs(),
>
> windows.segments,
>
> windows.size(),
> true)
>
> )
>
> `
>
>
> Guozhang
>
>
>
> On Thu, Apr 26, 2018 at 4:39 AM, 杰 杨 <funk...@live.com> wrote:
>
>> I return back .
>> Which StateStore could I use for this problem?
>> and another idea .I can send 'flush' message into this topic .
>> when received this message could update results to db.
>> I don't know it's work?
>>
>> 
>> funk...@live.com
>>
>> From: Guozhang Wang<mailto:wangg...@gmail.com>
>> Date: 2018-03-12 03:58
>> To: users<mailto:users@kafka.apache.org>
>> Subject: Re: Re: kafka steams with TimeWindows ,incorrect result
>> If you want to strictly "only have one output per window", then for now
>> you'd probably implement that logic using a lower-level "transform"
>> function in which you can schedule a punctuate function to send all the
>> results at the end of a window.
>>
>> If you just want to reduce the amount of data to your sink, but your sink
>> can still handle overwritten records of the same key, you can enlarge the
>> cache size via the cache.max.bytes.buffering config.
>>
>> https://kafka.apache.org/documentation/#streamsconfigs
>>
>> On Fri, Mar 9, 2018 at 9:45 PM, 杰 杨 <funk...@live.com> wrote:
>>
>> > thx for your reply!
>> > I see that it is designed to operate on an infinite, unbounded stream of
>> > data.
>> > now I want to process for  unbounded stream but divided by time
>> interval .
>> > so what can I do for doing this ?
>> >
>> > 
>> > funk...@live.com
>> >
>> > From: Guozhang Wang<mailto:wangg...@gmail.com>
>> > Date: 2018-03-10 02:50
>> > To: users<mailto:users@kafka.apache.org>
>> > Subject: Re: kafka steams with TimeWindows ,incorrect result
>> > Hi Jie,
>> >
>> > This is by design of Kafka Streams, please read this doc for more
>> details
>> > (search for "outputs of the Wordcount application is actually a
>> continuous
>> > stream of updates"):
>> >
>> > https://kafka.apache.org/0110/documentation/streams/quickstart
>> >
>> > Note this semantics applies for both windowed and un-windowed tables.
>> >
>> >
>> > Guozhang
>> >
>> > On Fri, Mar 9, 2018 at 5:36 AM, 杰 杨 <funk...@live.com> wrote:
>>

Re: Re: kafka steams with TimeWindows ,incorrect result

2018-04-27 Thread
and I checked windowStore interface found it has put method not get method.
in one second
the stream had sample key and different value in it.
and I must update key value which store in it.



funk...@live.com

From: funk...@live.com<mailto:funk...@live.com>
Date: 2018-04-27 16:08
To: users<mailto:users@kafka.apache.org>
Subject: Re: Re: kafka steams with TimeWindows ,incorrect result
Hi:
I don't kown what to do with transform function.
and stream is preapred well
like this at blew
key:
44_14_2018-04-27
value:
CountInfo(start=1,active=0,fresh =0)

there is amount data like that。
how I aggregate it with peer 1 seconds using transform function?



funk...@live.com

From: Guozhang Wang<mailto:wangg...@gmail.com>
Date: 2018-04-27 01:50
To: users<mailto:users@kafka.apache.org>
Subject: Re: Re: kafka steams with TimeWindows ,incorrect result
Using a control message to flush results to downstream (in your case to the
result db) looks good to me as well.

On Thu, Apr 26, 2018 at 10:49 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> If you're talking about which store to use in your transform function, it
> should be a windowed store.
>
> You can create such a store with the `Stores` factory, and suppose your
> old code has `windowedBy(TimeWindows.of(6))`, then you can do
>
> `
> windows = TimeWindows.of(6);
>
> Stores.WindowStoreBuilder(
> Stores.persistentWindowStore("Counts"),
> windows.maintainMs(),
>
> windows.segments,
>
> windows.size(),
> true)
>
> )
>
> `
>
>
> Guozhang
>
>
>
> On Thu, Apr 26, 2018 at 4:39 AM, 杰 杨 <funk...@live.com> wrote:
>
>> I return back .
>> Which StateStore could I use for this problem?
>> and another idea .I can send 'flush' message into this topic .
>> when received this message could update results to db.
>> I don't know it's work?
>>
>> 
>> funk...@live.com
>>
>> From: Guozhang Wang<mailto:wangg...@gmail.com>
>> Date: 2018-03-12 03:58
>> To: users<mailto:users@kafka.apache.org>
>> Subject: Re: Re: kafka steams with TimeWindows ,incorrect result
>> If you want to strictly "only have one output per window", then for now
>> you'd probably implement that logic using a lower-level "transform"
>> function in which you can schedule a punctuate function to send all the
>> results at the end of a window.
>>
>> If you just want to reduce the amount of data to your sink, but your sink
>> can still handle overwritten records of the same key, you can enlarge the
>> cache size via the cache.max.bytes.buffering config.
>>
>> https://kafka.apache.org/documentation/#streamsconfigs
>>
>> On Fri, Mar 9, 2018 at 9:45 PM, 杰 杨 <funk...@live.com> wrote:
>>
>> > thx for your reply!
>> > I see that it is designed to operate on an infinite, unbounded stream of
>> > data.
>> > now I want to process for  unbounded stream but divided by time
>> interval .
>> > so what can I do for doing this ?
>> >
>> > 
>> > funk...@live.com
>> >
>> > From: Guozhang Wang<mailto:wangg...@gmail.com>
>> > Date: 2018-03-10 02:50
>> > To: users<mailto:users@kafka.apache.org>
>> > Subject: Re: kafka steams with TimeWindows ,incorrect result
>> > Hi Jie,
>> >
>> > This is by design of Kafka Streams, please read this doc for more
>> details
>> > (search for "outputs of the Wordcount application is actually a
>> continuous
>> > stream of updates"):
>> >
>> > https://kafka.apache.org/0110/documentation/streams/quickstart
>> >
>> > Note this semantics applies for both windowed and un-windowed tables.
>> >
>> >
>> > Guozhang
>> >
>> > On Fri, Mar 9, 2018 at 5:36 AM, 杰 杨 <funk...@live.com> wrote:
>> >
>> > > Hi:
>> > > I used TimeWindow for aggregate data in kafka.
>> > >
>> > > this is code snippet ;
>> > >
>> > >   view.flatMap(new MultipleKeyValueMapper(client)
>> > > ).groupByKey(Serialized.with(Serdes.String(),
>> > > Serdes.serdeFrom(new CountInfoSerializer(), new
>> > > CountInfoDeserializer(
>> > > .windowedBy(TimeWindows.of(6)).reduce(new
>> > > Reducer() {
>> > > @Override
>> > > public CountInfo apply(CountInfo value1, Count

Re: Re: kafka steams with TimeWindows ,incorrect result

2018-04-27 Thread
Hi:
I don't kown what to do with transform function.
and stream is preapred well
like this at blew
key:
44_14_2018-04-27
value:
CountInfo(start=1,active=0,fresh =0)

there is amount data like that。
how I aggregate it with peer 1 seconds using transform function?



funk...@live.com

From: Guozhang Wang<mailto:wangg...@gmail.com>
Date: 2018-04-27 01:50
To: users<mailto:users@kafka.apache.org>
Subject: Re: Re: kafka steams with TimeWindows ,incorrect result
Using a control message to flush results to downstream (in your case to the
result db) looks good to me as well.

On Thu, Apr 26, 2018 at 10:49 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> If you're talking about which store to use in your transform function, it
> should be a windowed store.
>
> You can create such a store with the `Stores` factory, and suppose your
> old code has `windowedBy(TimeWindows.of(6))`, then you can do
>
> `
> windows = TimeWindows.of(6);
>
> Stores.WindowStoreBuilder(
> Stores.persistentWindowStore("Counts"),
> windows.maintainMs(),
>
> windows.segments,
>
> windows.size(),
>     true)
>
> )
>
> `
>
>
> Guozhang
>
>
>
> On Thu, Apr 26, 2018 at 4:39 AM, 杰 杨 <funk...@live.com> wrote:
>
>> I return back .
>> Which StateStore could I use for this problem?
>> and another idea .I can send 'flush' message into this topic .
>> when received this message could update results to db.
>> I don't know it's work?
>>
>> 
>> funk...@live.com
>>
>> From: Guozhang Wang<mailto:wangg...@gmail.com>
>> Date: 2018-03-12 03:58
>> To: users<mailto:users@kafka.apache.org>
>> Subject: Re: Re: kafka steams with TimeWindows ,incorrect result
>> If you want to strictly "only have one output per window", then for now
>> you'd probably implement that logic using a lower-level "transform"
>> function in which you can schedule a punctuate function to send all the
>> results at the end of a window.
>>
>> If you just want to reduce the amount of data to your sink, but your sink
>> can still handle overwritten records of the same key, you can enlarge the
>> cache size via the cache.max.bytes.buffering config.
>>
>> https://kafka.apache.org/documentation/#streamsconfigs
>>
>> On Fri, Mar 9, 2018 at 9:45 PM, 杰 杨 <funk...@live.com> wrote:
>>
>> > thx for your reply!
>> > I see that it is designed to operate on an infinite, unbounded stream of
>> > data.
>> > now I want to process for  unbounded stream but divided by time
>> interval .
>> > so what can I do for doing this ?
>> >
>> > 
>> > funk...@live.com
>> >
>> > From: Guozhang Wang<mailto:wangg...@gmail.com>
>> > Date: 2018-03-10 02:50
>> > To: users<mailto:users@kafka.apache.org>
>> > Subject: Re: kafka steams with TimeWindows ,incorrect result
>> > Hi Jie,
>> >
>> > This is by design of Kafka Streams, please read this doc for more
>> details
>> > (search for "outputs of the Wordcount application is actually a
>> continuous
>> > stream of updates"):
>> >
>> > https://kafka.apache.org/0110/documentation/streams/quickstart
>> >
>> > Note this semantics applies for both windowed and un-windowed tables.
>> >
>> >
>> > Guozhang
>> >
>> > On Fri, Mar 9, 2018 at 5:36 AM, 杰 杨 <funk...@live.com> wrote:
>> >
>> > > Hi:
>> > > I used TimeWindow for aggregate data in kafka.
>> > >
>> > > this is code snippet ;
>> > >
>> > >   view.flatMap(new MultipleKeyValueMapper(client)
>> > > ).groupByKey(Serialized.with(Serdes.String(),
>> > > Serdes.serdeFrom(new CountInfoSerializer(), new
>> > > CountInfoDeserializer(
>> > > .windowedBy(TimeWindows.of(6)).reduce(new
>> > > Reducer() {
>> > > @Override
>> > > public CountInfo apply(CountInfo value1, CountInfo
>> value2) {
>> > > return new CountInfo(value1.start + value2.start,
>> > > value1.active + value2.active, value1.fresh + value2.fresh);
>> > > }
>> > > }) .toStream(new KeyValueMapper<Windowed, CountInfo,
>> > > String>() {
>> > > @Override
>> > > public String a

Re: Re: kafka steams with TimeWindows ,incorrect result

2018-04-26 Thread
I return back .
Which StateStore could I use for this problem?
and another idea .I can send 'flush' message into this topic .
when received this message could update results to db.
I don't know it's work?


funk...@live.com

From: Guozhang Wang<mailto:wangg...@gmail.com>
Date: 2018-03-12 03:58
To: users<mailto:users@kafka.apache.org>
Subject: Re: Re: kafka steams with TimeWindows ,incorrect result
If you want to strictly "only have one output per window", then for now
you'd probably implement that logic using a lower-level "transform"
function in which you can schedule a punctuate function to send all the
results at the end of a window.

If you just want to reduce the amount of data to your sink, but your sink
can still handle overwritten records of the same key, you can enlarge the
cache size via the cache.max.bytes.buffering config.

https://kafka.apache.org/documentation/#streamsconfigs

On Fri, Mar 9, 2018 at 9:45 PM, 杰 杨 <funk...@live.com> wrote:

> thx for your reply!
> I see that it is designed to operate on an infinite, unbounded stream of
> data.
> now I want to process for  unbounded stream but divided by time interval .
> so what can I do for doing this ?
>
> 
> funk...@live.com
>
> From: Guozhang Wang<mailto:wangg...@gmail.com>
> Date: 2018-03-10 02:50
> To: users<mailto:users@kafka.apache.org>
> Subject: Re: kafka steams with TimeWindows ,incorrect result
> Hi Jie,
>
> This is by design of Kafka Streams, please read this doc for more details
> (search for "outputs of the Wordcount application is actually a continuous
> stream of updates"):
>
> https://kafka.apache.org/0110/documentation/streams/quickstart
>
> Note this semantics applies for both windowed and un-windowed tables.
>
>
> Guozhang
>
> On Fri, Mar 9, 2018 at 5:36 AM, 杰 杨 <funk...@live.com> wrote:
>
> > Hi:
> > I used TimeWindow for aggregate data in kafka.
> >
> > this is code snippet ;
> >
> >   view.flatMap(new MultipleKeyValueMapper(client)
> > ).groupByKey(Serialized.with(Serdes.String(),
> > Serdes.serdeFrom(new CountInfoSerializer(), new
> > CountInfoDeserializer(
> > .windowedBy(TimeWindows.of(6)).reduce(new
> > Reducer() {
> > @Override
> > public CountInfo apply(CountInfo value1, CountInfo value2) {
> > return new CountInfo(value1.start + value2.start,
> > value1.active + value2.active, value1.fresh + value2.fresh);
> > }
> > }) .toStream(new KeyValueMapper<Windowed, CountInfo,
> > String>() {
> > @Override
> > public String apply(Windowed key, CountInfo value) {
> > return key.key();
> > }
> > }).print(Printed.toSysOut());
> >
> > KafkaStreams streams = new KafkaStreams(builder.build(),
> > KStreamReducer.getConf());
> > streams.start();
> >
> > and I test 3 data in kafka .
> > and I print key value .
> >
> >
> > [KTABLE-TOSTREAM-07]: [9_9_2018-03-09_hour_
> > 21@152060130/152060136], CountInfo{start=12179, active=12179,
> > fresh=12179}
> > [KTABLE-TOSTREAM-07]: [9_9_2018-03-09@
> 152060130/152060136],
> > CountInfo{start=12179, active=12179, fresh=12179}
> > [KTABLE-TOSTREAM-07]: [9_9_2018-03-09_hour_
> > 21@152060130/152060136], CountInfo{start=3, active=3,
> > fresh=3}
> > [KTABLE-TOSTREAM-07]: [9_9_2018-03-09@
> 152060130/152060136],
> > CountInfo{start=3, active=3, fresh=3}
> > why in one window duration will be print two result but not one result ?
> >
> > 
> > funk...@live.com
> >
>
>
>
> --
> -- Guozhang
>



--
-- Guozhang


答复: kafka streams consumer ERROR The coordinator is not aware of this member.

2018-04-10 Thread
Kafka 1.0.0

Kafka-stream 1.0.0





发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用




发件人: Guozhang Wang <wangg...@gmail.com>
发送时间: Wednesday, April 11, 2018 1:39:17 AM
收件人: users@kafka.apache.org
主题: Re: kafka streams consumer ERROR The coordinator is not aware of this 
member.

1) It means the consumer has not call poll() for quite some time and hence
be kicked out by the consumer coordinator (the new consumer use heartbeat
as the failure detection protocol, you can read more on the wiki docs about
its design). If you have high variance of record processing latency, e.g.
sometimes a single record would take very long time to process, or if your
applications sometimes have a GC it will trigger this situation. In newer
versions of Streams it can be auto-handled and would not be thrown to the
users. Which Kafka version are you using?

2) this WARN log entry is normal, and can be caused by multiple reasons.
For example, brokers would actively close connections that have been idle
for some time, and the client will just reconnect to broker when
encountering this issue.

On Tue, Apr 10, 2018 at 7:34 AM, 杰 杨 <funk...@live.com> wrote:

> Hi:
> when I run kafka streams applications for a period of time
> and will be throw ERROR
>
> .1) questions
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.error.301|[Consumer
> clientId=filter-StreamThread-1-consumer, groupId=filter] Offset commit
> failed on partition sample-1 at offset 68067604: The coordinator is not
> aware of this member.
> I check broker logs and found some warnnings
>
>  Member filter-StreamThread-3-consumer-7ce502f2-51d6-4042-9ddb-ffdb0ed37894
> in group filter has failed, removing it from the group
> (kafka.coordinator.group.GroupCoordinator)
>
> the all broker is health.
> why sometimes will be throw  this error?
>
> .2) questions
>
> brokers sometimes throw the WARNNing:
>
> java.io.IOException: Connection to 1 was disconnected before the response
> was read
>
> is it mens that the broker which broker.id is 1 is't health ?
> but I check topic describe found all broker is well
>
> 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用
>
>


--
-- Guozhang


kafka streams consumer ERROR The coordinator is not aware of this member.

2018-04-10 Thread
Hi:
when I run kafka streams applications for a period of time
and will be throw ERROR

.1) questions

org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.error.301|[Consumer
 clientId=filter-StreamThread-1-consumer, groupId=filter] Offset commit failed 
on partition sample-1 at offset 68067604: The coordinator is not aware of this 
member.
I check broker logs and found some warnnings

 Member filter-StreamThread-3-consumer-7ce502f2-51d6-4042-9ddb-ffdb0ed37894 in 
group filter has failed, removing it from the group 
(kafka.coordinator.group.GroupCoordinator)

the all broker is health.
why sometimes will be throw  this error?

.2) questions

brokers sometimes throw the WARNNing:

java.io.IOException: Connection to 1 was disconnected before the response was 
read

is it mens that the broker which broker.id is 1 is't health ?
but I check topic describe found all broker is well

发送自 Windows 10 版邮件应用



Re: Re: kafka streams coordinator error

2018-03-29 Thread
there is no errors in that time broker server.logs


funk...@live.com

From: Sameer Kumar
Date: 2018-03-30 13:33
To: users
Subject: Re: kafka streams coordinator error
Check Kafka broker logs as well, see if there some error there.

On Fri, Mar 30, 2018, 10:57 AM ? ?  wrote:

> Hi:
> I used kafka streams for days.
> and I meet a problem today.when I test 2400W data in kafka though kafka
> streaming then write datas to HDFS .
> I found the final results is bigger then 2400W.and in console I found the
> error.
>
>  Offset commit failed on partition -2 at offset 63514037: The
> coordinator is not aware of this member
>
> kafka commit in internal topic.
> I set max.poll.record is 300.and I check max.poll.interval.ms is default
> 30
> and I check one record need 1 ms to be processed .
> I wonder why throw this error ?
>
> write
> 
>
> funk...@live.com
>


kafka producer send failed

2018-03-12 Thread
Hi:
I used producer sending data to kafka.
due to network and other problem makes send failed .
I register callback on send data.
so I test 300W data in kafka,but only 200W data in kafka.
and 100W data in local files .

I did config retries 10 and acks is 'all' in producer.

it seems kafka didn't retry send on failure data?

this is my configuration on producer.

acks=all
retries=3
batch.size=16384
linger.ms=10
buffer.memory=67108864
max.request.size=1048576
request.timeout.ms=5
compression.type=gzip
so anybody for help to avoid this problem ?

funk...@live.com


Re: Re: kafka steams with TimeWindows ,incorrect result

2018-03-09 Thread
thx for your reply!
I see that it is designed to operate on an infinite, unbounded stream of data.
now I want to process for  unbounded stream but divided by time interval .
so what can I do for doing this ?


funk...@live.com

From: Guozhang Wang<mailto:wangg...@gmail.com>
Date: 2018-03-10 02:50
To: users<mailto:users@kafka.apache.org>
Subject: Re: kafka steams with TimeWindows ,incorrect result
Hi Jie,

This is by design of Kafka Streams, please read this doc for more details
(search for "outputs of the Wordcount application is actually a continuous
stream of updates"):

https://kafka.apache.org/0110/documentation/streams/quickstart

Note this semantics applies for both windowed and un-windowed tables.


Guozhang

On Fri, Mar 9, 2018 at 5:36 AM, 杰 杨 <funk...@live.com> wrote:

> Hi:
> I used TimeWindow for aggregate data in kafka.
>
> this is code snippet ;
>
>   view.flatMap(new MultipleKeyValueMapper(client)
> ).groupByKey(Serialized.with(Serdes.String(),
> Serdes.serdeFrom(new CountInfoSerializer(), new
> CountInfoDeserializer(
> .windowedBy(TimeWindows.of(6)).reduce(new
> Reducer() {
> @Override
> public CountInfo apply(CountInfo value1, CountInfo value2) {
> return new CountInfo(value1.start + value2.start,
> value1.active + value2.active, value1.fresh + value2.fresh);
> }
> }) .toStream(new KeyValueMapper<Windowed, CountInfo,
> String>() {
> @Override
> public String apply(Windowed key, CountInfo value) {
> return key.key();
> }
> }).print(Printed.toSysOut());
>
> KafkaStreams streams = new KafkaStreams(builder.build(),
> KStreamReducer.getConf());
> streams.start();
>
> and I test 3 data in kafka .
> and I print key value .
>
>
> [KTABLE-TOSTREAM-07]: [9_9_2018-03-09_hour_
> 21@152060130/152060136], CountInfo{start=12179, active=12179,
> fresh=12179}
> [KTABLE-TOSTREAM-07]: 
> [9_9_2018-03-09@152060130/152060136],
> CountInfo{start=12179, active=12179, fresh=12179}
> [KTABLE-TOSTREAM-07]: [9_9_2018-03-09_hour_
> 21@152060130/152060136], CountInfo{start=3, active=3,
> fresh=3}
> [KTABLE-TOSTREAM-07]: 
> [9_9_2018-03-09@152060130/152060136],
> CountInfo{start=3, active=3, fresh=3}
> why in one window duration will be print two result but not one result ?
>
> 
> funk...@live.com
>



--
-- Guozhang


kafka steams with TimeWindows ,incorrect result

2018-03-09 Thread
Hi:
I used TimeWindow for aggregate data in kafka.

this is code snippet ;

  view.flatMap(new 
MultipleKeyValueMapper(client)).groupByKey(Serialized.with(Serdes.String(),
Serdes.serdeFrom(new CountInfoSerializer(), new 
CountInfoDeserializer(
.windowedBy(TimeWindows.of(6)).reduce(new Reducer() {
@Override
public CountInfo apply(CountInfo value1, CountInfo value2) {
return new CountInfo(value1.start + value2.start, value1.active 
+ value2.active, value1.fresh + value2.fresh);
}
}) .toStream(new KeyValueMapper() {
@Override
public String apply(Windowed key, CountInfo value) {
return key.key();
}
}).print(Printed.toSysOut());

KafkaStreams streams = new KafkaStreams(builder.build(), 
KStreamReducer.getConf());
streams.start();

and I test 3 data in kafka .
and I print key value .


[KTABLE-TOSTREAM-07]: 
[9_9_2018-03-09_hour_21@152060130/152060136], 
CountInfo{start=12179, active=12179, fresh=12179}
[KTABLE-TOSTREAM-07]: 
[9_9_2018-03-09@152060130/152060136], CountInfo{start=12179, 
active=12179, fresh=12179}
[KTABLE-TOSTREAM-07]: 
[9_9_2018-03-09_hour_21@152060130/152060136], 
CountInfo{start=3, active=3, fresh=3}
[KTABLE-TOSTREAM-07]: 
[9_9_2018-03-09@152060130/152060136], CountInfo{start=3, 
active=3, fresh=3}
why in one window duration will be print two result but not one result ?


funk...@live.com


Re: when use kafka streams to(topic) method sometime throw error?

2018-03-05 Thread
it seems i don't config ProducerConfig in stream application.
should I config that ?




funk...@live.com

From: funk...@live.com
Date: 2018-03-06 11:23
To: users
Subject: when use kafka streams to(topic) method sometime throw error?

hi:
I meet a problem today.
when I use kafka stream to consumer one topic and do mapValues() method,
and to another topic then .sometimes throw an error
this is code sample:
new StreamsBuilder().stream(xxxtopic, Consumed.with(Serdes.String(), 
Serdes.String())).mapValus(method).to(newTopic).
sometimes it's work well but sometime it's throw error

to topic newTopic due to org.apache.kafka.common.errors.TimeoutException: 
Expiring 6 record(s) for newTopic-2: 30030 ms has passed since last attempt 
plus backoff time


funk...@live.com


回复:答复: which Kafka StateStore could I use ?

2018-03-02 Thread
can you show some tips for this?

---原始邮件---
发件人: "Guozhang Wang "<wangg...@gmail.com>
发送时间: 2018年3月3日 01:32:55
收件人: "users"<users@kafka.apache.org>;
主题: Re: 答复: which Kafka StateStore could I use ?


Hello Jie,

By default Kafka Streams uses caching on top of its internal state stores
to de-dup output streams to the final destination (in your case the DB) so
that for a single key, fewer updates will be generated giving a small
working set. If your aggregation logic follows such key distribution, you
can try enlarge the cache size (by default it is only 50MB) and see if it
helps reduce the downstream traffic to your DB.


Guozhang


On Thu, Mar 1, 2018 at 6:33 PM, 杰 杨 <funk...@live.com> wrote:

> Yes .but the DB’s Concurrent quantity is  the limitation.
> Now I can process 600 records/second
> And I want enhance it
>
> 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用
>
> 发件人: Guozhang Wang<mailto:wangg...@gmail.com>
> 发送时间: 2018年3月2日 2:59
> 收件人: users@kafka.apache.org<mailto:users@kafka.apache.org>
> 主题: Re: which Kafka StateStore could I use ?
>
> Hello Jie,
>
> Just to understand your problem better, are you referring "db" for an
> external storage engine outside Kafka Streams, and you are asking how to
> only send one record per aggregation key (assuming you are doing some
> aggregations with Streams statestore) to that end storage engine?
>
>
> Guozhang
>
>
> On Wed, Feb 28, 2018 at 7:53 PM, 杰 杨 <funk...@live.com> wrote:
>
> >
> > HI:
> > I use kafka streams for real-time data analysis
> > and I meet a problem.
> > now I process a record in kafka and compute it and send to db.
> > but db concurrency level is not suit for me.
> > so I want that
> > 1)when there is not data in kakfa ,the statestore is  no results.
> > 2) when there is a lot of data records in kafka the statestore save
> > computed result and I need send its once to db.
> > which StateStoe can I use for do that above
> > 
> > funk...@live.com
> >
>
>
>
> --
> -- Guozhang
>
>


--
-- Guozhang



回复:答复: which Kafka StateStore could I use ?

2018-03-02 Thread
mongodb i used,but i need update 10 operator for one record
I process a record for 20 ms for one thread

---原始邮件---
发件人: "Ted Yu "<yuzhih...@gmail.com>
发送时间: 2018年3月3日 01:37:13
收件人: "users"<users@kafka.apache.org>;
主题: Re: 答复: which Kafka StateStore could I use ?


Jie:
Which DB are you using ?

600 records/second is very low rate.

Probably your DB needs some tuning.

Cheers

On Fri, Mar 2, 2018 at 9:32 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Jie,
>
> By default Kafka Streams uses caching on top of its internal state stores
> to de-dup output streams to the final destination (in your case the DB) so
> that for a single key, fewer updates will be generated giving a small
> working set. If your aggregation logic follows such key distribution, you
> can try enlarge the cache size (by default it is only 50MB) and see if it
> helps reduce the downstream traffic to your DB.
>
>
> Guozhang
>
>
> On Thu, Mar 1, 2018 at 6:33 PM, 杰 杨 <funk...@live.com> wrote:
>
> > Yes .but the DB’s Concurrent quantity is  the limitation.
> > Now I can process 600 records/second
> > And I want enhance it
> >
> > 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用
> >
> > 发件人: Guozhang Wang<mailto:wangg...@gmail.com>
> > 发送时间: 2018年3月2日 2:59
> > 收件人: users@kafka.apache.org<mailto:users@kafka.apache.org>
> > 主题: Re: which Kafka StateStore could I use ?
> >
> > Hello Jie,
> >
> > Just to understand your problem better, are you referring "db" for an
> > external storage engine outside Kafka Streams, and you are asking how to
> > only send one record per aggregation key (assuming you are doing some
> > aggregations with Streams statestore) to that end storage engine?
> >
> >
> > Guozhang
> >
> >
> > On Wed, Feb 28, 2018 at 7:53 PM, 杰 杨 <funk...@live.com> wrote:
> >
> > >
> > > HI:
> > > I use kafka streams for real-time data analysis
> > > and I meet a problem.
> > > now I process a record in kafka and compute it and send to db.
> > > but db concurrency level is not suit for me.
> > > so I want that
> > > 1)when there is not data in kakfa ,the statestore is  no results.
> > > 2) when there is a lot of data records in kafka the statestore save
> > > computed result and I need send its once to db.
> > > which StateStoe can I use for do that above
> > > 
> > > funk...@live.com
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
> >
>
>
> --
> -- Guozhang
>



答复: which Kafka StateStore could I use ?

2018-03-01 Thread
Yes .but the DB’s Concurrent quantity is  the limitation.
Now I can process 600 records/second
And I want enhance it

发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用

发件人: Guozhang Wang<mailto:wangg...@gmail.com>
发送时间: 2018年3月2日 2:59
收件人: users@kafka.apache.org<mailto:users@kafka.apache.org>
主题: Re: which Kafka StateStore could I use ?

Hello Jie,

Just to understand your problem better, are you referring "db" for an
external storage engine outside Kafka Streams, and you are asking how to
only send one record per aggregation key (assuming you are doing some
aggregations with Streams' statestore) to that end storage engine?


Guozhang


On Wed, Feb 28, 2018 at 7:53 PM, 杰 杨 <funk...@live.com> wrote:

>
> HI:
> I use kafka streams for real-time data analysis
> and I meet a problem.
> now I process a record in kafka and compute it and send to db.
> but db concurrency level is not suit for me.
> so I want that
> 1)when there is not data in kakfa ,the statestore is  no results.
> 2) when there is a lot of data records in kafka the statestore save
> computed result and I need send its once to db.
> which StateStoe can I use for do that above
> 
> funk...@live.com
>



--
-- Guozhang



which Kafka StateStore could I use ?

2018-02-28 Thread

HI:
I use kafka streams for real-time data analysis
and I meet a problem.
now I process a record in kafka and compute it and send to db.
but db concurrency level is not suit for me.
so I want that
1)when there is not data in kakfa ,the statestore is  no results.
2) when there is a lot of data records in kafka the statestore save computed 
result and I need send its once to db.
which StateStoe can I use for do that above

funk...@live.com


This server is not the leader for that topic-partition

2018-01-17 Thread
Hi:
 I use kafka for days. and it's normal at beginning .
yesterday. I found kafka topic leader often switch to one to another.
and in kafka server.log throw the error:
This server is not the leader for that topic-partition.
kafka-version:
kafka_2.11-1.0.0

funk...@live.com


kafka straming progress had be done a few minutes later

2017-08-22 Thread
Hi all:
When I find kafka stream api.I use it to consumer one topic and filter data and 
transform to another topic .
I foud that the progress will be done in a few minutes later?

发送自 Windows 10 版邮件应用