-----BEGIN PGP SIGNED MESSAGE----- Hash: SHA512 I see. KGroupedStream will be part of 0.10.1.0 (should be release the next weeks).
So, instead of > .groupByKey().count() you need to do > .countByKey() - -Matthias On 10/18/16 12:05 PM, Furkan KAMACI wrote: > Hi Matthias, > > Thanks for your detailed answer. By the way I couldn't find > "KGroupedStream" at version of 0.10.0.1? > > Kind Regards, Furkan KAMACI > > On Tue, Oct 18, 2016 at 8:41 PM, Matthias J. Sax > <matth...@confluent.io> wrote: > > Hi, > > You just need to read you stream and apply an (windowed) > aggregation on it. > > If you use non-windowed aggregation you will get "since the > beginning". If you use windowed aggregation you can specify the > window size as 1 hour and get those results. > > One comment: it seems that you want to count *all* queries. To > make this work, you need to make sure all records are using the > same key (because Kafka Streams only supports aggregation over > keyed streams). Keep in mind, that this prohibits parallelization > of you aggregation! > > As a workaround, you could also do two consecutive aggregation, and > do parallelize the first one, and do not parallelize the second one > (ie, using the first one as a pre aggregation similar to a combine > step) > > Without pre aggregation and assuming all records use the same key > something like this (for current trunk): > > >>>> KStreamBuilder builder = new KStreamBuilder(): KStream input >>>> = builder.stream("yourTopic"); >>>> >>>> KGroupedStream groupedInput = input.groupByKey(); >>>> >>>> groupedInput.count("countStore").to("outputTopicCountFromBeginning" ); >>>> >>>> > >>>> groupedInput.count(TimeWindows.of(3600 * 1000), > "windowedCountStore").to("outputTopicHourlyCounts"): > > > For more details, please see the docs and examples: > > - http://docs.confluent.io/current/streams/index.html - > https://github.com/confluentinc/examples/tree/kafka-0.10.0.1-cp-3.0.1/ ka > > fka-streams > > > -Matthias > > On 10/18/16 5:00 AM, Furkan KAMACI wrote: >>>> Hi, >>>> >>>> I could successfully run Kafka at my environment. I want to >>>> monitor Queries per Second at my search application with >>>> Kafka. Whenever a search request is done I create a >>>> ProducerRecord which holds current nano time of the system. >>>> >>>> I know that I have to use a streaming API for calculation >>>> i.e. Kafka Streams or Spark Streams. My choice is to use >>>> Kafka Streams. >>>> >>>> For last 1 hours, or since the beginning, I have to calculate >>>> the queries per second. How can I make such an aggregation at >>>> Kafka Streams? >>>> >>>> Kind Regards, Furkan KAMACI >>>> >> > -----BEGIN PGP SIGNATURE----- Comment: GPGTools - https://gpgtools.org iQIcBAEBCgAGBQJYBnlRAAoJECnhiMLycopPF+cQAKWt58HvcEebqXC+KlSc5M8c rcxqTbkH3YT9SEm0zLinoXWaJyd/EHUkaWSStiNekZgRe9BsXBHjFnhy/Pg20D0A JYKBA0IK4DTBy6sJvu1Wyd08iQ85HTmFlMZDg38EkTJOkp8SnPhQ4O2/IKyudWFD kLBBJBLSEEdFcP+HWnP469rcfVBcr7kE+bgPxAPTLH0/v0G7+RAwwxi/wfV+c/TB kvGkn+sYgRtyduUS62wVUTC4tOYAuooqn6/Aiwdu+e/a4+S0DsSoQQi0Oyts+gd9 6/aDLPnGrHT1kUMNbGIqOqLLw2rxs3NtQXFB3odjgt+rHtEuItqohgkV5SCjut3Y Uv89xQOKrx9TgtTUTcra3ckwffVFNsFa+DGuZbMvm2P2hC1k/7yCZGa+0l6vRauk wQ5dw0Ug/DGWHYFSIBuDz81mDsmgmpLh/QXIcqIJ3rQ1VgDbfopwQhuuaQiaEPDF p9S524sy3EYMVGqzdWOFC2+7MVYrnWK6CEkxpAvOGqJw951eAObM9OFmiN1o0wJ4 Kkif20adZRY6HANFyurEkPHs2id/JVh/LVkV6DO/DAtqun4rFesuC3m8bUyOlBjq UbHmDnq40X6uohvfiurO4NGmOfLBEm6GQPxTyNFgEUCBrORjsgXaY7bpzsxUNvvc u+554Ztge1RtJCjbbtR1 =z4/M -----END PGP SIGNATURE-----