Yes, right, that would be because the default serializer is set to bytes.
Sorry i should have spotted that. Your Materialized should look something
like:

Materialized.as[String, java.lang.Long, KeyValueStore[Bytes,
Array[Byte]]](ACCESS_COUNT_PER_HOST_STORE)
   .withKeySerde(Serdes.String())

Thanks,
Damian


On Wed, 15 Nov 2017 at 10:51 Debasish Ghosh <ghosh.debas...@gmail.com>
wrote:

> It's not working fine .. I get the following exception during runtime ..
>
> Exception in thread
>> "kstream-weblog-processing-c37a3bc1-31cc-4ccc-8427-d51314802f64-StreamThread-1"
>> java.lang.ClassCastException: java.lang.String cannot be cast to [B
>> at
>> org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)
>> at org.apache.kafka.streams.state.StateSerdes.rawKey(StateSerdes.java:168)
>> at
>> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerKey(MeteredKeyValueBytesStore.java:60)
>> at
>> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerKey(MeteredKeyValueBytesStore.java:57)
>> at
>> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:184)
>> at
>> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:116)
>> at
>> org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateProcessor.process(KStreamAggregate.java:70)
>> at
>> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
>> at
>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
>> at
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
>> at
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
>> at
>> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
>> at
>> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
>> at
>> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
>> at
>> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
>
>
> Only when I change the key of the first stream to Array[Byte], things
> work ok .. like this ..
>
> val hosts: KStream[Array[Byte], Array[Byte]] = logRecords.mapValues(record
> => record.host.getBytes("UTF-8"))
>
> regards.
>
> On Wed, Nov 15, 2017 at 4:07 PM, Damian Guy <damian....@gmail.com> wrote:
>
>> Hi,
>>
>> That shouldn't be a problem, the inner most store is of type
>> `KeyValueStore<Bytes, byte[]>`, however the outer store will be
>> `KeyValueStore<String, Long>`.
>> It should work fine.
>>
>> Thanks,
>> Damian
>>
>> On Wed, 15 Nov 2017 at 08:37 Debasish Ghosh <ghosh.debas...@gmail.com>
>> wrote:
>>
>>> Hello -
>>>
>>> In my Kafka Streams 0.11 application I have the following transformation
>>> ..
>>>
>>>     val hosts: KStream[Array[Byte], String] = logRecords.mapValues(record
>>> => record.host)
>>>
>>>     // we are changing the key here so that we can do a groupByKey later
>>>     val hostPairs: KStream[String, String] = hosts.map ((_, value) => new
>>> KeyValue(value, value))
>>>
>>>     // keys have changed - hence need new serdes
>>>     val groupedStream: KGroupedStream[String, String] =
>>> hostPairs.groupByKey(stringSerde,
>>> stringSerde)
>>>
>>>     val counts: KTable[String, java.lang.Long] =
>>> groupedStream.count(ACCESS_COUNT_PER_HOST_STORE)
>>>
>>> Now in 1.0.0, this variant of count on KGroupedStream has been deprecated
>>> and the one that is introduced takes only KeyValueStore of Array[Byte] ..
>>>
>>> KTable<K,java.lang.Long>
>>> >
>>> count(Materialized<K,java.lang.Long,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>>
>>> > materialized)
>>> > Count the number of records in this stream by the grouped key.
>>>
>>>
>>> I **cannot** do the following since I have String in my KeyValueStore ..
>>>
>>>     val counts: KTable[String, java.lang.Long] =
>>>       groupedStream.count(Materialized.as[String, java.lang.Long,
>>> KeyValueStore[Bytes, Array[Byte]]](ACCESS_COUNT_PER_HOST_STORE))
>>>
>>> Any suggestions as to how I can move the above code to 1.0.0 ?
>>>
>>> regards.
>>>
>>> --
>>> Debasish Ghosh
>>> http://manning.com/ghosh2
>>> http://manning.com/ghosh
>>>
>>> Twttr: @debasishg
>>> Blog: http://debasishg.blogspot.com
>>> Code: http://github.com/debasishg
>>>
>>
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>

Reply via email to