Sorry about concurrent questions. Tried below code, didn't get any error but couldn't get created output topic:
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 1000; i++) { producer.send(new ProducerRecord<>( "input-topic", String.format("{\"type\":\"test\", \"t\":%.3f, \"k\":%d}", System.nanoTime() * 1e-9, i))); final KStreamBuilder builder = new KStreamBuilder(); final KStream<String, Long> qps = builder.stream(Serdes.String(), Serdes.Long(), "input-topic"); qps.countByKey(TimeWindows.of("Hourly", 3600 * 1000)).mapValues(Object::toString).to("output-topic"); final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); streams.start(); Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); On Wed, Oct 19, 2016 at 12:14 AM, Matthias J. Sax <matth...@confluent.io> wrote: > -----BEGIN PGP SIGNED MESSAGE----- > Hash: SHA512 > > Two things: > > 1) you should not apply the window to the first count, but to the base > stream to get correct results. > > 2) your windowed aggregation, doew not just return String type, but > Window<K> type. Thus, you need to either insert a .map() to transform > you data into String typo, or you provide a custom serializer when > writing data to output topic (method, .to(...) has multiple overloads) > > Per default, each topic read/write operation uses Serdes from the > streams config. If you data has a different type, you need to provide > appropriate Serdes for those operators. > > > - -Matthias > > On 10/18/16 2:01 PM, Furkan KAMACI wrote: > > Hi Matthias, > > > > I've tried this code: > > > > * final Properties streamsConfiguration = new > > Properties();* * > > streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, > > "myapp");* * > > streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > > "localhost:9092");* * > > streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, > > "localhost:2181");* * > > streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > > Serdes.String().getClass().getName());* * > > streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > > Serdes.String().getClass().getName());* * final > > KStreamBuilder builder = new KStreamBuilder();* * final > > KStream input = builder.stream("myapp-test");* > > > > * final KStream<String, Long> searchCounts = > > input.countByKey("SearchRequests").toStream();* * > > searchCounts.countByKey(TimeWindows.of("Hourly", 3600 * > > 1000)).to("outputTopicHourlyCounts");* > > > > * final KafkaStreams streams = new KafkaStreams(builder, > > streamsConfiguration);* * streams.start();* > > > > * Runtime.getRuntime().addShutdownHook(new > > Thread(streams::close));* > > > > However I get an error: > > > > > > *Exception in thread "StreamThread-1" > > java.lang.ClassCastException: > > org.apache.kafka.streams.kstream.Windowed cannot be cast to > > java.lang.String* > > > > On the other hand when I try this code: > > > > https://gist.github.com/timothyrenner/a99c86b2d6ed2c22c8703e8c7760af3a > > > > I get an error too which indicates that: > > > > *Exception in thread "StreamThread-1" > > org.apache.kafka.common.errors.SerializationException: Size of > > data received by LongDeserializer is not 8 * > > > > Here is generated topic: > > > > *kafka-console-consumer --zookeeper localhost:2181 --topic > > myapp-test --from-beginning* * 28952314828122* * > > 28988681653726* * 29080089383233* > > > > I know that I miss something but couldn't find it. > > > > Kind Regards, Furkan KAMACI > > > > On Tue, Oct 18, 2016 at 10:34 PM, Matthias J. Sax > > <matth...@confluent.io> wrote: > > > > I see. KGroupedStream will be part of 0.10.1.0 (should be release > > the next weeks). > > > > So, instead of > > > >>>> .groupByKey().count() > > > > you need to do > > > >>>> .countByKey() > > > > > > > > -Matthias > > > > On 10/18/16 12:05 PM, Furkan KAMACI wrote: > >>>> Hi Matthias, > >>>> > >>>> Thanks for your detailed answer. By the way I couldn't find > >>>> "KGroupedStream" at version of 0.10.0.1? > >>>> > >>>> Kind Regards, Furkan KAMACI > >>>> > >>>> On Tue, Oct 18, 2016 at 8:41 PM, Matthias J. Sax > >>>> <matth...@confluent.io> wrote: > >>>> > >>>> Hi, > >>>> > >>>> You just need to read you stream and apply an (windowed) > >>>> aggregation on it. > >>>> > >>>> If you use non-windowed aggregation you will get "since the > >>>> beginning". If you use windowed aggregation you can specify > >>>> the window size as 1 hour and get those results. > >>>> > >>>> One comment: it seems that you want to count *all* queries. > >>>> To make this work, you need to make sure all records are > >>>> using the same key (because Kafka Streams only supports > >>>> aggregation over keyed streams). Keep in mind, that this > >>>> prohibits parallelization of you aggregation! > >>>> > >>>> As a workaround, you could also do two consecutive > >>>> aggregation, and do parallelize the first one, and do not > >>>> parallelize the second one (ie, using the first one as a pre > >>>> aggregation similar to a combine step) > >>>> > >>>> Without pre aggregation and assuming all records use the same > >>>> key something like this (for current trunk): > >>>> > >>>> > >>>>>>> KStreamBuilder builder = new KStreamBuilder(): KStream > >>>>>>> input = builder.stream("yourTopic"); > >>>>>>> > >>>>>>> KGroupedStream groupedInput = input.groupByKey(); > >>>>>>> > >>>>>>> groupedInput.count("countStore").to("outputTopicCountFromBeginni > ng" > > > >>>>>>> > ); > >>>>>>> > >>>>>>> > >>>> > >>>>>>> > > groupedInput.count(TimeWindows.of(3600 * 1000), > >>>> "windowedCountStore").to("outputTopicHourlyCounts"): > >>>> > >>>> > >>>> For more details, please see the docs and examples: > >>>> > >>>> - http://docs.confluent.io/current/streams/index.html - > >>>> https://github.com/confluentinc/examples/tree/kafka-0.10.0.1-cp-3.0 > .1/ > > > >>>> > ka > >>>> > >>>> > > fka-streams > >>>> > >>>> > >>>> -Matthias > >>>> > >>>> On 10/18/16 5:00 AM, Furkan KAMACI wrote: > >>>>>>> Hi, > >>>>>>> > >>>>>>> I could successfully run Kafka at my environment. I > >>>>>>> want to monitor Queries per Second at my search > >>>>>>> application with Kafka. Whenever a search request is > >>>>>>> done I create a ProducerRecord which holds current nano > >>>>>>> time of the system. > >>>>>>> > >>>>>>> I know that I have to use a streaming API for > >>>>>>> calculation i.e. Kafka Streams or Spark Streams. My > >>>>>>> choice is to use Kafka Streams. > >>>>>>> > >>>>>>> For last 1 hours, or since the beginning, I have to > >>>>>>> calculate the queries per second. How can I make such > >>>>>>> an aggregation at Kafka Streams? > >>>>>>> > >>>>>>> Kind Regards, Furkan KAMACI > >>>>>>> > >>>>> > >>>> > >> > > > -----BEGIN PGP SIGNATURE----- > Comment: GPGTools - https://gpgtools.org > > iQIcBAEBCgAGBQJYBpDSAAoJECnhiMLycopPguYQAIfe/JwkpDgvePNJceb5s+kr > oQQrQ2ja0A7R4aNmnFBFA5QZ9vbtP25CUCAD4y/FAKDoneGi8vYBf0Ky9l3flh5+ > admwq5wQyJesgS+mHTo/iUqHJUbTHTHixyKyvMMwmqJvgbaRkLFFx5GFhgrZZhHo > 4jc0s1oebdzMA4dNkrdaM6+M0G9pZmE1ILz26EDPXdxfnBIp8zNK8LxqRubzvzML > gv+wVU8USB2dkRR6WTB56WKlpfSFjAUweyrv9iEJdvfOwsuBStRf5ex7YG5BWbgi > E3yCeKPR0GPy+3zj7Bjjsts5hYA0LZnJZpjGjpSxtd4dl/nH7El+SEEB+aNXv+3f > UuSufV335sSDYteLMWySJBKQAu8AgDIeVnqMQwnaNywhhXVXuoLkoRv/h/x9Fiwk > g26S7+JN4MQKwbHreMDrLSPEQy0oPdgCTtgcjA0BlOb6wzcUNNiETiyYVy2OoT04 > bCAge7KW43afmwiY4t7WetLjSvQMOJMq+tRArpVuX0Fk6IfE5LsiStRTQCnlQxHM > ruXSbqPWh3DYU32EL/QwzyiiZhPUmjN5SCehBjmRWEfnEgay2qbXh0Hnft0sk6f0 > /SUbl/i11D4hhhPSnNTSQj9qEJT2SD7A7N90FplgQDwfCMWKg76Sfn85qMLiFnRE > FDk0ghehl5ROJhXgs1eN > =OU4j > -----END PGP SIGNATURE----- >