Hi Matthias,

Thanks for your detailed answer. By the way I couldn't find "KGroupedStream"
at version of 0.10.0.1?

Kind Regards,
Furkan KAMACI

On Tue, Oct 18, 2016 at 8:41 PM, Matthias J. Sax <matth...@confluent.io>
wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> Hi,
>
> You just need to read you stream and apply an (windowed) aggregation
> on it.
>
> If you use non-windowed aggregation you will get "since the
> beginning". If you use windowed aggregation you can specify the window
> size as 1 hour and get those results.
>
> One comment: it seems that you want to count *all* queries. To make
> this work, you need to make sure all records are using the same key
> (because Kafka Streams only supports aggregation over keyed streams).
> Keep in mind, that this prohibits parallelization of you aggregation!
>
> As a workaround, you could also do two consecutive aggregation, and do
> parallelize the first one, and do not parallelize the second one (ie,
> using the first one as a pre aggregation similar to a combine step)
>
> Without pre aggregation and assuming all records use the same key
> something like this (for current trunk):
>
>
> > KStreamBuilder builder = new KStreamBuilder(): KStream input =
> > builder.stream("yourTopic");
> >
> > KGroupedStream groupedInput = input.groupByKey();
> >
> > groupedInput.count("countStore").to("outputTopicCountFromBeginning");
> >
> >
> groupedInput.count(TimeWindows.of(3600 * 1000),
> "windowedCountStore").to("outputTopicHourlyCounts"):
>
>
> For more details, please see the docs and examples:
>
>  - http://docs.confluent.io/current/streams/index.html
>  -
> https://github.com/confluentinc/examples/tree/kafka-0.10.0.1-cp-3.0.1/ka
> fka-streams
>
>
> - -Matthias
>
> On 10/18/16 5:00 AM, Furkan KAMACI wrote:
> > Hi,
> >
> > I could successfully run Kafka at my environment. I want to monitor
> > Queries per Second at my search application with Kafka. Whenever a
> > search request is done I create a ProducerRecord which holds
> > current nano time of the system.
> >
> > I know that I have to use a streaming API for calculation i.e.
> > Kafka Streams or Spark Streams. My choice is to use Kafka Streams.
> >
> > For last 1 hours, or since the beginning, I have to calculate the
> > queries per second. How can I make such an aggregation at Kafka
> > Streams?
> >
> > Kind Regards, Furkan KAMACI
> >
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJYBl6wAAoJECnhiMLycopPVVAP/0EqJJsLnKqvMeIM3XmV7dzP
> JnvHJdj0QUn2ONe1Fl9PEDxQvqkw0x/45fBfZsoWqMvIn5uvPfkeF0+TSLFUVUsu
> 6r+QV8xjJ53GTuPvBQOcUx1H7onXyPkfa88OGVMFV0Er7/1C/p6CAT/MF8x04Fjh
> VqT0EQbqVWxoLXdm+GHaUEgdIsJNaXzOzBcxPL9ayA71G4UtwGUud86kjU8CvURJ
> wDsZYdWa2TebqG5g80l1YPzRDbNgHKJ4ezHKxdZ+XufizGcoE48BsGzHe09RQDbZ
> 5aiW+rVXO9dQBIP+3FA3Yeno6+lnGmIECFiHw0FaudOVJIxm40eyTltHjmODMP6T
> P55XQKvs6rVwjTp1uxcvrggXtkp+B/Wdglo5RM+MAZ/MkZXc8ruY2G4JYqn3Ko7q
> 1eEKDpvkbhKGDE9HJGmH0pmYXgSXYhNZPUAURy6pgbpAapysZovJJG1tvIFY2E4R
> EpZPHc9JaXOdlOAsK9q468VrCx1pOakC8AZYUAm6vRiSLHGYjiT8sTHQf3IWjP4q
> HPCtwk6IZGTGjdLyyMHGm2vbmtiMPBdAN/pau9pehFb5c7Np2uT8WyBL0ECgdOmb
> MoxtytRsbuMchZKUo5Wa2wEaBpKwiAnGssW94e3FF898P2tV0br1lLXyrsyNnakN
> qOb2YW0mz/+66AJsJw90
> =X1XQ
> -----END PGP SIGNATURE-----
>

Reply via email to