-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

I see. KGroupedStream will be part of 0.10.1.0 (should be release the
next weeks).

So, instead of

> .groupByKey().count()

you need to do

> .countByKey()



- -Matthias

On 10/18/16 12:05 PM, Furkan KAMACI wrote:
> Hi Matthias,
> 
> Thanks for your detailed answer. By the way I couldn't find
> "KGroupedStream" at version of 0.10.0.1?
> 
> Kind Regards, Furkan KAMACI
> 
> On Tue, Oct 18, 2016 at 8:41 PM, Matthias J. Sax
> <matth...@confluent.io> wrote:
> 
> Hi,
> 
> You just need to read you stream and apply an (windowed)
> aggregation on it.
> 
> If you use non-windowed aggregation you will get "since the 
> beginning". If you use windowed aggregation you can specify the
> window size as 1 hour and get those results.
> 
> One comment: it seems that you want to count *all* queries. To
> make this work, you need to make sure all records are using the
> same key (because Kafka Streams only supports aggregation over
> keyed streams). Keep in mind, that this prohibits parallelization
> of you aggregation!
> 
> As a workaround, you could also do two consecutive aggregation, and
> do parallelize the first one, and do not parallelize the second one
> (ie, using the first one as a pre aggregation similar to a combine
> step)
> 
> Without pre aggregation and assuming all records use the same key 
> something like this (for current trunk):
> 
> 
>>>> KStreamBuilder builder = new KStreamBuilder(): KStream input
>>>> = builder.stream("yourTopic");
>>>> 
>>>> KGroupedStream groupedInput = input.groupByKey();
>>>> 
>>>> groupedInput.count("countStore").to("outputTopicCountFromBeginning"
);
>>>>
>>>>
>
>>>> 
groupedInput.count(TimeWindows.of(3600 * 1000),
> "windowedCountStore").to("outputTopicHourlyCounts"):
> 
> 
> For more details, please see the docs and examples:
> 
> - http://docs.confluent.io/current/streams/index.html - 
> https://github.com/confluentinc/examples/tree/kafka-0.10.0.1-cp-3.0.1/
ka
>
> 
fka-streams
> 
> 
> -Matthias
> 
> On 10/18/16 5:00 AM, Furkan KAMACI wrote:
>>>> Hi,
>>>> 
>>>> I could successfully run Kafka at my environment. I want to
>>>> monitor Queries per Second at my search application with
>>>> Kafka. Whenever a search request is done I create a
>>>> ProducerRecord which holds current nano time of the system.
>>>> 
>>>> I know that I have to use a streaming API for calculation
>>>> i.e. Kafka Streams or Spark Streams. My choice is to use
>>>> Kafka Streams.
>>>> 
>>>> For last 1 hours, or since the beginning, I have to calculate
>>>> the queries per second. How can I make such an aggregation at
>>>> Kafka Streams?
>>>> 
>>>> Kind Regards, Furkan KAMACI
>>>> 
>> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYBnlRAAoJECnhiMLycopPF+cQAKWt58HvcEebqXC+KlSc5M8c
rcxqTbkH3YT9SEm0zLinoXWaJyd/EHUkaWSStiNekZgRe9BsXBHjFnhy/Pg20D0A
JYKBA0IK4DTBy6sJvu1Wyd08iQ85HTmFlMZDg38EkTJOkp8SnPhQ4O2/IKyudWFD
kLBBJBLSEEdFcP+HWnP469rcfVBcr7kE+bgPxAPTLH0/v0G7+RAwwxi/wfV+c/TB
kvGkn+sYgRtyduUS62wVUTC4tOYAuooqn6/Aiwdu+e/a4+S0DsSoQQi0Oyts+gd9
6/aDLPnGrHT1kUMNbGIqOqLLw2rxs3NtQXFB3odjgt+rHtEuItqohgkV5SCjut3Y
Uv89xQOKrx9TgtTUTcra3ckwffVFNsFa+DGuZbMvm2P2hC1k/7yCZGa+0l6vRauk
wQ5dw0Ug/DGWHYFSIBuDz81mDsmgmpLh/QXIcqIJ3rQ1VgDbfopwQhuuaQiaEPDF
p9S524sy3EYMVGqzdWOFC2+7MVYrnWK6CEkxpAvOGqJw951eAObM9OFmiN1o0wJ4
Kkif20adZRY6HANFyurEkPHs2id/JVh/LVkV6DO/DAtqun4rFesuC3m8bUyOlBjq
UbHmDnq40X6uohvfiurO4NGmOfLBEm6GQPxTyNFgEUCBrORjsgXaY7bpzsxUNvvc
u+554Ztge1RtJCjbbtR1
=z4/M
-----END PGP SIGNATURE-----

Reply via email to