Hi Matthias, Thanks for your detailed answer. By the way I couldn't find "KGroupedStream" at version of 0.10.0.1?
Kind Regards, Furkan KAMACI On Tue, Oct 18, 2016 at 8:41 PM, Matthias J. Sax <matth...@confluent.io> wrote: > -----BEGIN PGP SIGNED MESSAGE----- > Hash: SHA512 > > Hi, > > You just need to read you stream and apply an (windowed) aggregation > on it. > > If you use non-windowed aggregation you will get "since the > beginning". If you use windowed aggregation you can specify the window > size as 1 hour and get those results. > > One comment: it seems that you want to count *all* queries. To make > this work, you need to make sure all records are using the same key > (because Kafka Streams only supports aggregation over keyed streams). > Keep in mind, that this prohibits parallelization of you aggregation! > > As a workaround, you could also do two consecutive aggregation, and do > parallelize the first one, and do not parallelize the second one (ie, > using the first one as a pre aggregation similar to a combine step) > > Without pre aggregation and assuming all records use the same key > something like this (for current trunk): > > > > KStreamBuilder builder = new KStreamBuilder(): KStream input = > > builder.stream("yourTopic"); > > > > KGroupedStream groupedInput = input.groupByKey(); > > > > groupedInput.count("countStore").to("outputTopicCountFromBeginning"); > > > > > groupedInput.count(TimeWindows.of(3600 * 1000), > "windowedCountStore").to("outputTopicHourlyCounts"): > > > For more details, please see the docs and examples: > > - http://docs.confluent.io/current/streams/index.html > - > https://github.com/confluentinc/examples/tree/kafka-0.10.0.1-cp-3.0.1/ka > fka-streams > > > - -Matthias > > On 10/18/16 5:00 AM, Furkan KAMACI wrote: > > Hi, > > > > I could successfully run Kafka at my environment. I want to monitor > > Queries per Second at my search application with Kafka. Whenever a > > search request is done I create a ProducerRecord which holds > > current nano time of the system. > > > > I know that I have to use a streaming API for calculation i.e. > > Kafka Streams or Spark Streams. My choice is to use Kafka Streams. > > > > For last 1 hours, or since the beginning, I have to calculate the > > queries per second. How can I make such an aggregation at Kafka > > Streams? > > > > Kind Regards, Furkan KAMACI > > > -----BEGIN PGP SIGNATURE----- > Comment: GPGTools - https://gpgtools.org > > iQIcBAEBCgAGBQJYBl6wAAoJECnhiMLycopPVVAP/0EqJJsLnKqvMeIM3XmV7dzP > JnvHJdj0QUn2ONe1Fl9PEDxQvqkw0x/45fBfZsoWqMvIn5uvPfkeF0+TSLFUVUsu > 6r+QV8xjJ53GTuPvBQOcUx1H7onXyPkfa88OGVMFV0Er7/1C/p6CAT/MF8x04Fjh > VqT0EQbqVWxoLXdm+GHaUEgdIsJNaXzOzBcxPL9ayA71G4UtwGUud86kjU8CvURJ > wDsZYdWa2TebqG5g80l1YPzRDbNgHKJ4ezHKxdZ+XufizGcoE48BsGzHe09RQDbZ > 5aiW+rVXO9dQBIP+3FA3Yeno6+lnGmIECFiHw0FaudOVJIxm40eyTltHjmODMP6T > P55XQKvs6rVwjTp1uxcvrggXtkp+B/Wdglo5RM+MAZ/MkZXc8ruY2G4JYqn3Ko7q > 1eEKDpvkbhKGDE9HJGmH0pmYXgSXYhNZPUAURy6pgbpAapysZovJJG1tvIFY2E4R > EpZPHc9JaXOdlOAsK9q468VrCx1pOakC8AZYUAm6vRiSLHGYjiT8sTHQf3IWjP4q > HPCtwk6IZGTGjdLyyMHGm2vbmtiMPBdAN/pau9pehFb5c7Np2uT8WyBL0ECgdOmb > MoxtytRsbuMchZKUo5Wa2wEaBpKwiAnGssW94e3FF898P2tV0br1lLXyrsyNnakN > qOb2YW0mz/+66AJsJw90 > =X1XQ > -----END PGP SIGNATURE----- >