Sorry about concurrent questions. Tried below code, didn't get any error
but couldn't get created output topic:

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 1000; i++) {
                producer.send(new ProducerRecord<>(
                        "input-topic",
                        String.format("{\"type\":\"test\", \"t\":%.3f,
\"k\":%d}", System.nanoTime() * 1e-9, i)));


        final KStreamBuilder builder = new KStreamBuilder();
        final KStream<String, Long> qps = builder.stream(Serdes.String(),
Serdes.Long(), "input-topic");
        qps.countByKey(TimeWindows.of("Hourly", 3600 *
1000)).mapValues(Object::toString).to("output-topic");

        final KafkaStreams streams = new KafkaStreams(builder,
streamsConfiguration);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

On Wed, Oct 19, 2016 at 12:14 AM, Matthias J. Sax <matth...@confluent.io>
wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> Two things:
>
> 1) you should not apply the window to the first count, but to the base
> stream to get correct results.
>
> 2) your windowed aggregation, doew not just return String type, but
> Window<K> type. Thus, you need to either insert a .map() to transform
> you data into String typo, or you provide a custom serializer when
> writing data to output topic (method, .to(...) has multiple overloads)
>
> Per default, each topic read/write operation uses Serdes from the
> streams config. If you data has a different type, you need to provide
> appropriate Serdes for those operators.
>
>
> - -Matthias
>
> On 10/18/16 2:01 PM, Furkan KAMACI wrote:
> > Hi Matthias,
> >
> > I've tried this code:
> >
> > *        final Properties streamsConfiguration = new
> > Properties();* *
> > streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
> > "myapp");* *
> > streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > "localhost:9092");* *
> > streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
> > "localhost:2181");* *
> > streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > Serdes.String().getClass().getName());* *
> > streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > Serdes.String().getClass().getName());* *        final
> > KStreamBuilder builder = new KStreamBuilder();* *        final
> > KStream input = builder.stream("myapp-test");*
> >
> > *        final KStream<String, Long> searchCounts =
> > input.countByKey("SearchRequests").toStream();* *
> > searchCounts.countByKey(TimeWindows.of("Hourly", 3600 *
> > 1000)).to("outputTopicHourlyCounts");*
> >
> > *        final KafkaStreams streams = new KafkaStreams(builder,
> > streamsConfiguration);* *        streams.start();*
> >
> > *        Runtime.getRuntime().addShutdownHook(new
> > Thread(streams::close));*
> >
> > However I get an error:
> >
> >
> > *Exception in thread "StreamThread-1"
> > java.lang.ClassCastException:
> > org.apache.kafka.streams.kstream.Windowed cannot be cast to
> > java.lang.String*
> >
> > On the other hand when I try this code:
> >
> > https://gist.github.com/timothyrenner/a99c86b2d6ed2c22c8703e8c7760af3a
> >
> >  I get an error too which indicates that:
> >
> > *Exception in thread "StreamThread-1"
> > org.apache.kafka.common.errors.SerializationException: Size of
> > data received by LongDeserializer is not 8 *
> >
> > Here is generated topic:
> >
> > *kafka-console-consumer --zookeeper localhost:2181 --topic
> > myapp-test --from-beginning* *        28952314828122* *
> > 28988681653726* *        29080089383233*
> >
> > I know that I miss something but couldn't find it.
> >
> > Kind Regards, Furkan KAMACI
> >
> > On Tue, Oct 18, 2016 at 10:34 PM, Matthias J. Sax
> > <matth...@confluent.io> wrote:
> >
> > I see. KGroupedStream will be part of 0.10.1.0 (should be release
> > the next weeks).
> >
> > So, instead of
> >
> >>>> .groupByKey().count()
> >
> > you need to do
> >
> >>>> .countByKey()
> >
> >
> >
> > -Matthias
> >
> > On 10/18/16 12:05 PM, Furkan KAMACI wrote:
> >>>> Hi Matthias,
> >>>>
> >>>> Thanks for your detailed answer. By the way I couldn't find
> >>>> "KGroupedStream" at version of 0.10.0.1?
> >>>>
> >>>> Kind Regards, Furkan KAMACI
> >>>>
> >>>> On Tue, Oct 18, 2016 at 8:41 PM, Matthias J. Sax
> >>>> <matth...@confluent.io> wrote:
> >>>>
> >>>> Hi,
> >>>>
> >>>> You just need to read you stream and apply an (windowed)
> >>>> aggregation on it.
> >>>>
> >>>> If you use non-windowed aggregation you will get "since the
> >>>> beginning". If you use windowed aggregation you can specify
> >>>> the window size as 1 hour and get those results.
> >>>>
> >>>> One comment: it seems that you want to count *all* queries.
> >>>> To make this work, you need to make sure all records are
> >>>> using the same key (because Kafka Streams only supports
> >>>> aggregation over keyed streams). Keep in mind, that this
> >>>> prohibits parallelization of you aggregation!
> >>>>
> >>>> As a workaround, you could also do two consecutive
> >>>> aggregation, and do parallelize the first one, and do not
> >>>> parallelize the second one (ie, using the first one as a pre
> >>>> aggregation similar to a combine step)
> >>>>
> >>>> Without pre aggregation and assuming all records use the same
> >>>> key something like this (for current trunk):
> >>>>
> >>>>
> >>>>>>> KStreamBuilder builder = new KStreamBuilder(): KStream
> >>>>>>> input = builder.stream("yourTopic");
> >>>>>>>
> >>>>>>> KGroupedStream groupedInput = input.groupByKey();
> >>>>>>>
> >>>>>>> groupedInput.count("countStore").to("outputTopicCountFromBeginni
> ng"
> >
> >>>>>>>
> );
> >>>>>>>
> >>>>>>>
> >>>>
> >>>>>>>
> > groupedInput.count(TimeWindows.of(3600 * 1000),
> >>>> "windowedCountStore").to("outputTopicHourlyCounts"):
> >>>>
> >>>>
> >>>> For more details, please see the docs and examples:
> >>>>
> >>>> - http://docs.confluent.io/current/streams/index.html -
> >>>> https://github.com/confluentinc/examples/tree/kafka-0.10.0.1-cp-3.0
> .1/
> >
> >>>>
> ka
> >>>>
> >>>>
> > fka-streams
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 10/18/16 5:00 AM, Furkan KAMACI wrote:
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>> I could successfully run Kafka at my environment. I
> >>>>>>> want to monitor Queries per Second at my search
> >>>>>>> application with Kafka. Whenever a search request is
> >>>>>>> done I create a ProducerRecord which holds current nano
> >>>>>>> time of the system.
> >>>>>>>
> >>>>>>> I know that I have to use a streaming API for
> >>>>>>> calculation i.e. Kafka Streams or Spark Streams. My
> >>>>>>> choice is to use Kafka Streams.
> >>>>>>>
> >>>>>>> For last 1 hours, or since the beginning, I have to
> >>>>>>> calculate the queries per second. How can I make such
> >>>>>>> an aggregation at Kafka Streams?
> >>>>>>>
> >>>>>>> Kind Regards, Furkan KAMACI
> >>>>>>>
> >>>>>
> >>>>
> >>
> >
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJYBpDSAAoJECnhiMLycopPguYQAIfe/JwkpDgvePNJceb5s+kr
> oQQrQ2ja0A7R4aNmnFBFA5QZ9vbtP25CUCAD4y/FAKDoneGi8vYBf0Ky9l3flh5+
> admwq5wQyJesgS+mHTo/iUqHJUbTHTHixyKyvMMwmqJvgbaRkLFFx5GFhgrZZhHo
> 4jc0s1oebdzMA4dNkrdaM6+M0G9pZmE1ILz26EDPXdxfnBIp8zNK8LxqRubzvzML
> gv+wVU8USB2dkRR6WTB56WKlpfSFjAUweyrv9iEJdvfOwsuBStRf5ex7YG5BWbgi
> E3yCeKPR0GPy+3zj7Bjjsts5hYA0LZnJZpjGjpSxtd4dl/nH7El+SEEB+aNXv+3f
> UuSufV335sSDYteLMWySJBKQAu8AgDIeVnqMQwnaNywhhXVXuoLkoRv/h/x9Fiwk
> g26S7+JN4MQKwbHreMDrLSPEQy0oPdgCTtgcjA0BlOb6wzcUNNiETiyYVy2OoT04
> bCAge7KW43afmwiY4t7WetLjSvQMOJMq+tRArpVuX0Fk6IfE5LsiStRTQCnlQxHM
> ruXSbqPWh3DYU32EL/QwzyiiZhPUmjN5SCehBjmRWEfnEgay2qbXh0Hnft0sk6f0
> /SUbl/i11D4hhhPSnNTSQj9qEJT2SD7A7N90FplgQDwfCMWKg76Sfn85qMLiFnRE
> FDk0ghehl5ROJhXgs1eN
> =OU4j
> -----END PGP SIGNATURE-----
>

Reply via email to