It's not working fine .. I get the following exception during runtime .. Exception in thread > "kstream-weblog-processing-c37a3bc1-31cc-4ccc-8427-d51314802f64-StreamThread-1" > java.lang.ClassCastException: java.lang.String cannot be cast to [B > at > org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21) > at org.apache.kafka.streams.state.StateSerdes.rawKey(StateSerdes.java:168) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerKey(MeteredKeyValueBytesStore.java:60) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerKey(MeteredKeyValueBytesStore.java:57) > at > org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:184) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:116) > at > org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateProcessor.process(KStreamAggregate.java:70) > at > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317) > at > org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
Only when I change the key of the first stream to Array[Byte], things work ok .. like this .. val hosts: KStream[Array[Byte], Array[Byte]] = logRecords.mapValues(record => record.host.getBytes("UTF-8")) regards. On Wed, Nov 15, 2017 at 4:07 PM, Damian Guy <damian....@gmail.com> wrote: > Hi, > > That shouldn't be a problem, the inner most store is of type > `KeyValueStore<Bytes, byte[]>`, however the outer store will be > `KeyValueStore<String, Long>`. > It should work fine. > > Thanks, > Damian > > On Wed, 15 Nov 2017 at 08:37 Debasish Ghosh <ghosh.debas...@gmail.com> > wrote: > >> Hello - >> >> In my Kafka Streams 0.11 application I have the following transformation >> .. >> >> val hosts: KStream[Array[Byte], String] = logRecords.mapValues(record >> => record.host) >> >> // we are changing the key here so that we can do a groupByKey later >> val hostPairs: KStream[String, String] = hosts.map ((_, value) => new >> KeyValue(value, value)) >> >> // keys have changed - hence need new serdes >> val groupedStream: KGroupedStream[String, String] = >> hostPairs.groupByKey(stringSerde, >> stringSerde) >> >> val counts: KTable[String, java.lang.Long] = >> groupedStream.count(ACCESS_COUNT_PER_HOST_STORE) >> >> Now in 1.0.0, this variant of count on KGroupedStream has been deprecated >> and the one that is introduced takes only KeyValueStore of Array[Byte] .. >> >> KTable<K,java.lang.Long> >> > count(Materialized<K,java.lang.Long,KeyValueStore<org. >> apache.kafka.common.utils.Bytes,byte[]>> >> > materialized) >> > Count the number of records in this stream by the grouped key. >> >> >> I **cannot** do the following since I have String in my KeyValueStore .. >> >> val counts: KTable[String, java.lang.Long] = >> groupedStream.count(Materialized.as[String, java.lang.Long, >> KeyValueStore[Bytes, Array[Byte]]](ACCESS_COUNT_PER_HOST_STORE)) >> >> Any suggestions as to how I can move the above code to 1.0.0 ? >> >> regards. >> >> -- >> Debasish Ghosh >> http://manning.com/ghosh2 >> http://manning.com/ghosh >> >> Twttr: @debasishg >> Blog: http://debasishg.blogspot.com >> Code: http://github.com/debasishg >> > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg