Yes, right, that would be because the default serializer is set to bytes. Sorry i should have spotted that. Your Materialized should look something like:
Materialized.as[String, java.lang.Long, KeyValueStore[Bytes, Array[Byte]]](ACCESS_COUNT_PER_HOST_STORE) .withKeySerde(Serdes.String()) Thanks, Damian On Wed, 15 Nov 2017 at 10:51 Debasish Ghosh <ghosh.debas...@gmail.com> wrote: > It's not working fine .. I get the following exception during runtime .. > > Exception in thread >> "kstream-weblog-processing-c37a3bc1-31cc-4ccc-8427-d51314802f64-StreamThread-1" >> java.lang.ClassCastException: java.lang.String cannot be cast to [B >> at >> org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21) >> at org.apache.kafka.streams.state.StateSerdes.rawKey(StateSerdes.java:168) >> at >> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerKey(MeteredKeyValueBytesStore.java:60) >> at >> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerKey(MeteredKeyValueBytesStore.java:57) >> at >> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:184) >> at >> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:116) >> at >> org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateProcessor.process(KStreamAggregate.java:70) >> at >> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) >> at >> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) >> at >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) >> at >> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) >> at >> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80) >> at >> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216) >> at >> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403) >> at >> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317) >> at >> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942) >> at >> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822) >> at >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) >> at >> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744) > > > Only when I change the key of the first stream to Array[Byte], things > work ok .. like this .. > > val hosts: KStream[Array[Byte], Array[Byte]] = logRecords.mapValues(record > => record.host.getBytes("UTF-8")) > > regards. > > On Wed, Nov 15, 2017 at 4:07 PM, Damian Guy <damian....@gmail.com> wrote: > >> Hi, >> >> That shouldn't be a problem, the inner most store is of type >> `KeyValueStore<Bytes, byte[]>`, however the outer store will be >> `KeyValueStore<String, Long>`. >> It should work fine. >> >> Thanks, >> Damian >> >> On Wed, 15 Nov 2017 at 08:37 Debasish Ghosh <ghosh.debas...@gmail.com> >> wrote: >> >>> Hello - >>> >>> In my Kafka Streams 0.11 application I have the following transformation >>> .. >>> >>> val hosts: KStream[Array[Byte], String] = logRecords.mapValues(record >>> => record.host) >>> >>> // we are changing the key here so that we can do a groupByKey later >>> val hostPairs: KStream[String, String] = hosts.map ((_, value) => new >>> KeyValue(value, value)) >>> >>> // keys have changed - hence need new serdes >>> val groupedStream: KGroupedStream[String, String] = >>> hostPairs.groupByKey(stringSerde, >>> stringSerde) >>> >>> val counts: KTable[String, java.lang.Long] = >>> groupedStream.count(ACCESS_COUNT_PER_HOST_STORE) >>> >>> Now in 1.0.0, this variant of count on KGroupedStream has been deprecated >>> and the one that is introduced takes only KeyValueStore of Array[Byte] .. >>> >>> KTable<K,java.lang.Long> >>> > >>> count(Materialized<K,java.lang.Long,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> >>> > materialized) >>> > Count the number of records in this stream by the grouped key. >>> >>> >>> I **cannot** do the following since I have String in my KeyValueStore .. >>> >>> val counts: KTable[String, java.lang.Long] = >>> groupedStream.count(Materialized.as[String, java.lang.Long, >>> KeyValueStore[Bytes, Array[Byte]]](ACCESS_COUNT_PER_HOST_STORE)) >>> >>> Any suggestions as to how I can move the above code to 1.0.0 ? >>> >>> regards. >>> >>> -- >>> Debasish Ghosh >>> http://manning.com/ghosh2 >>> http://manning.com/ghosh >>> >>> Twttr: @debasishg >>> Blog: http://debasishg.blogspot.com >>> Code: http://github.com/debasishg >>> >> > > > -- > Debasish Ghosh > http://manning.com/ghosh2 > http://manning.com/ghosh > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com > Code: http://github.com/debasishg >