Thanks .. It works .. On Wed, Nov 15, 2017 at 4:44 PM, Damian Guy <damian....@gmail.com> wrote:
> Yes, right, that would be because the default serializer is set to bytes. > Sorry i should have spotted that. Your Materialized should look something > like: > > Materialized.as[String, java.lang.Long, KeyValueStore[Bytes, > Array[Byte]]](ACCESS_COUNT_PER_HOST_STORE) > .withKeySerde(Serdes.String()) > > Thanks, > Damian > > > On Wed, 15 Nov 2017 at 10:51 Debasish Ghosh <ghosh.debas...@gmail.com> > wrote: > > > It's not working fine .. I get the following exception during runtime .. > > > > Exception in thread > >> "kstream-weblog-processing-c37a3bc1-31cc-4ccc-8427- > d51314802f64-StreamThread-1" > >> java.lang.ClassCastException: java.lang.String cannot be cast to [B > >> at > >> org.apache.kafka.common.serialization.ByteArraySerializer.serialize( > ByteArraySerializer.java:21) > >> at org.apache.kafka.streams.state.StateSerdes.rawKey( > StateSerdes.java:168) > >> at > >> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1. > innerKey(MeteredKeyValueBytesStore.java:60) > >> at > >> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1. > innerKey(MeteredKeyValueBytesStore.java:57) > >> at > >> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get( > InnerMeteredKeyValueStore.java:184) > >> at > >> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get( > MeteredKeyValueBytesStore.java:116) > >> at > >> org.apache.kafka.streams.kstream.internals.KStreamAggregate$ > KStreamAggregateProcessor.process(KStreamAggregate.java:70) > >> at > >> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run( > ProcessorNode.java:46) > >> at > >> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > measureLatencyNs(StreamsMetricsImpl.java:208) > >> at > >> org.apache.kafka.streams.processor.internals.ProcessorNode.process( > ProcessorNode.java:124) > >> at > >> org.apache.kafka.streams.processor.internals. > ProcessorContextImpl.forward(ProcessorContextImpl.java:85) > >> at > >> org.apache.kafka.streams.processor.internals. > SourceNode.process(SourceNode.java:80) > >> at > >> org.apache.kafka.streams.processor.internals. > StreamTask.process(StreamTask.java:216) > >> at > >> org.apache.kafka.streams.processor.internals.AssignedTasks.process( > AssignedTasks.java:403) > >> at > >> org.apache.kafka.streams.processor.internals.TaskManager.process( > TaskManager.java:317) > >> at > >> org.apache.kafka.streams.processor.internals.StreamThread. > processAndMaybeCommit(StreamThread.java:942) > >> at > >> org.apache.kafka.streams.processor.internals.StreamThread.runOnce( > StreamThread.java:822) > >> at > >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:774) > >> at > >> org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:744) > > > > > > Only when I change the key of the first stream to Array[Byte], things > > work ok .. like this .. > > > > val hosts: KStream[Array[Byte], Array[Byte]] = > logRecords.mapValues(record > > => record.host.getBytes("UTF-8")) > > > > regards. > > > > On Wed, Nov 15, 2017 at 4:07 PM, Damian Guy <damian....@gmail.com> > wrote: > > > >> Hi, > >> > >> That shouldn't be a problem, the inner most store is of type > >> `KeyValueStore<Bytes, byte[]>`, however the outer store will be > >> `KeyValueStore<String, Long>`. > >> It should work fine. > >> > >> Thanks, > >> Damian > >> > >> On Wed, 15 Nov 2017 at 08:37 Debasish Ghosh <ghosh.debas...@gmail.com> > >> wrote: > >> > >>> Hello - > >>> > >>> In my Kafka Streams 0.11 application I have the following > transformation > >>> .. > >>> > >>> val hosts: KStream[Array[Byte], String] = > logRecords.mapValues(record > >>> => record.host) > >>> > >>> // we are changing the key here so that we can do a groupByKey > later > >>> val hostPairs: KStream[String, String] = hosts.map ((_, value) => > new > >>> KeyValue(value, value)) > >>> > >>> // keys have changed - hence need new serdes > >>> val groupedStream: KGroupedStream[String, String] = > >>> hostPairs.groupByKey(stringSerde, > >>> stringSerde) > >>> > >>> val counts: KTable[String, java.lang.Long] = > >>> groupedStream.count(ACCESS_COUNT_PER_HOST_STORE) > >>> > >>> Now in 1.0.0, this variant of count on KGroupedStream has been > deprecated > >>> and the one that is introduced takes only KeyValueStore of Array[Byte] > .. > >>> > >>> KTable<K,java.lang.Long> > >>> > > >>> count(Materialized<K,java.lang.Long,KeyValueStore<org. > apache.kafka.common.utils.Bytes,byte[]>> > >>> > materialized) > >>> > Count the number of records in this stream by the grouped key. > >>> > >>> > >>> I **cannot** do the following since I have String in my KeyValueStore > .. > >>> > >>> val counts: KTable[String, java.lang.Long] = > >>> groupedStream.count(Materialized.as[String, java.lang.Long, > >>> KeyValueStore[Bytes, Array[Byte]]](ACCESS_COUNT_PER_HOST_STORE)) > >>> > >>> Any suggestions as to how I can move the above code to 1.0.0 ? > >>> > >>> regards. > >>> > >>> -- > >>> Debasish Ghosh > >>> http://manning.com/ghosh2 > >>> http://manning.com/ghosh > >>> > >>> Twttr: @debasishg > >>> Blog: http://debasishg.blogspot.com > >>> Code: http://github.com/debasishg > >>> > >> > > > > > > -- > > Debasish Ghosh > > http://manning.com/ghosh2 > > http://manning.com/ghosh > > > > Twttr: @debasishg > > Blog: http://debasishg.blogspot.com > > Code: http://github.com/debasishg > > > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg