It's not working fine .. I get the following exception during runtime ..

Exception in thread
> "kstream-weblog-processing-c37a3bc1-31cc-4ccc-8427-d51314802f64-StreamThread-1"
> java.lang.ClassCastException: java.lang.String cannot be cast to [B
> at
> org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)
> at org.apache.kafka.streams.state.StateSerdes.rawKey(StateSerdes.java:168)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerKey(MeteredKeyValueBytesStore.java:60)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerKey(MeteredKeyValueBytesStore.java:57)
> at
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:184)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:116)
> at
> org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateProcessor.process(KStreamAggregate.java:70)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
> at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
> at
> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)


Only when I change the key of the first stream to Array[Byte], things work
ok .. like this ..

val hosts: KStream[Array[Byte], Array[Byte]] = logRecords.mapValues(record
=> record.host.getBytes("UTF-8"))

regards.

On Wed, Nov 15, 2017 at 4:07 PM, Damian Guy <damian....@gmail.com> wrote:

> Hi,
>
> That shouldn't be a problem, the inner most store is of type
> `KeyValueStore<Bytes, byte[]>`, however the outer store will be
> `KeyValueStore<String, Long>`.
> It should work fine.
>
> Thanks,
> Damian
>
> On Wed, 15 Nov 2017 at 08:37 Debasish Ghosh <ghosh.debas...@gmail.com>
> wrote:
>
>> Hello -
>>
>> In my Kafka Streams 0.11 application I have the following transformation
>> ..
>>
>>     val hosts: KStream[Array[Byte], String] = logRecords.mapValues(record
>> => record.host)
>>
>>     // we are changing the key here so that we can do a groupByKey later
>>     val hostPairs: KStream[String, String] = hosts.map ((_, value) => new
>> KeyValue(value, value))
>>
>>     // keys have changed - hence need new serdes
>>     val groupedStream: KGroupedStream[String, String] =
>> hostPairs.groupByKey(stringSerde,
>> stringSerde)
>>
>>     val counts: KTable[String, java.lang.Long] =
>> groupedStream.count(ACCESS_COUNT_PER_HOST_STORE)
>>
>> Now in 1.0.0, this variant of count on KGroupedStream has been deprecated
>> and the one that is introduced takes only KeyValueStore of Array[Byte] ..
>>
>> KTable<K,java.lang.Long>
>> > count(Materialized<K,java.lang.Long,KeyValueStore<org.
>> apache.kafka.common.utils.Bytes,byte[]>>
>> > materialized)
>> > Count the number of records in this stream by the grouped key.
>>
>>
>> I **cannot** do the following since I have String in my KeyValueStore ..
>>
>>     val counts: KTable[String, java.lang.Long] =
>>       groupedStream.count(Materialized.as[String, java.lang.Long,
>> KeyValueStore[Bytes, Array[Byte]]](ACCESS_COUNT_PER_HOST_STORE))
>>
>> Any suggestions as to how I can move the above code to 1.0.0 ?
>>
>> regards.
>>
>> --
>> Debasish Ghosh
>> http://manning.com/ghosh2
>> http://manning.com/ghosh
>>
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com
>> Code: http://github.com/debasishg
>>
>


-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to