Re: Problem with KGroupedStream.count in 1.0.0

2017-11-15 Thread Debasish Ghosh
Thanks .. It works .. On Wed, Nov 15, 2017 at 4:44 PM, Damian Guy wrote: > Yes, right, that would be because the default serializer is set to bytes. > Sorry i should have spotted that. Your Materialized should look something > like: > > Materialized.as[String, java.lang.Long, KeyValueStore[Bytes

Re: Problem with KGroupedStream.count in 1.0.0

2017-11-15 Thread Damian Guy
Yes, right, that would be because the default serializer is set to bytes. Sorry i should have spotted that. Your Materialized should look something like: Materialized.as[String, java.lang.Long, KeyValueStore[Bytes, Array[Byte]]](ACCESS_COUNT_PER_HOST_STORE) .withKeySerde(Serdes.String()) Thank

Re: Problem with KGroupedStream.count in 1.0.0

2017-11-15 Thread Debasish Ghosh
It's not working fine .. I get the following exception during runtime .. Exception in thread > "kstream-weblog-processing-c37a3bc1-31cc-4ccc-8427-d51314802f64-StreamThread-1" > java.lang.ClassCastException: java.lang.String cannot be cast to [B > at > org.apache.kafka.common.serialization.ByteArra

Re: Problem with KGroupedStream.count in 1.0.0

2017-11-15 Thread Damian Guy
Hi, That shouldn't be a problem, the inner most store is of type `KeyValueStore`, however the outer store will be `KeyValueStore`. It should work fine. Thanks, Damian On Wed, 15 Nov 2017 at 08:37 Debasish Ghosh wrote: > Hello - > > In my Kafka Streams 0.11 application I have the following tran

Problem with KGroupedStream.count in 1.0.0

2017-11-15 Thread Debasish Ghosh
Hello - In my Kafka Streams 0.11 application I have the following transformation .. val hosts: KStream[Array[Byte], String] = logRecords.mapValues(record => record.host) // we are changing the key here so that we can do a groupByKey later val hostPairs: KStream[String, String] = host