Thanks .. It works ..

On Wed, Nov 15, 2017 at 4:44 PM, Damian Guy <damian....@gmail.com> wrote:

> Yes, right, that would be because the default serializer is set to bytes.
> Sorry i should have spotted that. Your Materialized should look something
> like:
>
> Materialized.as[String, java.lang.Long, KeyValueStore[Bytes,
> Array[Byte]]](ACCESS_COUNT_PER_HOST_STORE)
>    .withKeySerde(Serdes.String())
>
> Thanks,
> Damian
>
>
> On Wed, 15 Nov 2017 at 10:51 Debasish Ghosh <ghosh.debas...@gmail.com>
> wrote:
>
> > It's not working fine .. I get the following exception during runtime ..
> >
> > Exception in thread
> >> "kstream-weblog-processing-c37a3bc1-31cc-4ccc-8427-
> d51314802f64-StreamThread-1"
> >> java.lang.ClassCastException: java.lang.String cannot be cast to [B
> >> at
> >> org.apache.kafka.common.serialization.ByteArraySerializer.serialize(
> ByteArraySerializer.java:21)
> >> at org.apache.kafka.streams.state.StateSerdes.rawKey(
> StateSerdes.java:168)
> >> at
> >> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.
> innerKey(MeteredKeyValueBytesStore.java:60)
> >> at
> >> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.
> innerKey(MeteredKeyValueBytesStore.java:57)
> >> at
> >> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(
> InnerMeteredKeyValueStore.java:184)
> >> at
> >> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(
> MeteredKeyValueBytesStore.java:116)
> >> at
> >> org.apache.kafka.streams.kstream.internals.KStreamAggregate$
> KStreamAggregateProcessor.process(KStreamAggregate.java:70)
> >> at
> >> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> ProcessorNode.java:46)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:208)
> >> at
> >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:124)
> >> at
> >> org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
> >> at
> >> org.apache.kafka.streams.processor.internals.
> SourceNode.process(SourceNode.java:80)
> >> at
> >> org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:216)
> >> at
> >> org.apache.kafka.streams.processor.internals.AssignedTasks.process(
> AssignedTasks.java:403)
> >> at
> >> org.apache.kafka.streams.processor.internals.TaskManager.process(
> TaskManager.java:317)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread.
> processAndMaybeCommit(StreamThread.java:942)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> StreamThread.java:822)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:774)
> >> at
> >> org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:744)
> >
> >
> > Only when I change the key of the first stream to Array[Byte], things
> > work ok .. like this ..
> >
> > val hosts: KStream[Array[Byte], Array[Byte]] =
> logRecords.mapValues(record
> > => record.host.getBytes("UTF-8"))
> >
> > regards.
> >
> > On Wed, Nov 15, 2017 at 4:07 PM, Damian Guy <damian....@gmail.com>
> wrote:
> >
> >> Hi,
> >>
> >> That shouldn't be a problem, the inner most store is of type
> >> `KeyValueStore<Bytes, byte[]>`, however the outer store will be
> >> `KeyValueStore<String, Long>`.
> >> It should work fine.
> >>
> >> Thanks,
> >> Damian
> >>
> >> On Wed, 15 Nov 2017 at 08:37 Debasish Ghosh <ghosh.debas...@gmail.com>
> >> wrote:
> >>
> >>> Hello -
> >>>
> >>> In my Kafka Streams 0.11 application I have the following
> transformation
> >>> ..
> >>>
> >>>     val hosts: KStream[Array[Byte], String] =
> logRecords.mapValues(record
> >>> => record.host)
> >>>
> >>>     // we are changing the key here so that we can do a groupByKey
> later
> >>>     val hostPairs: KStream[String, String] = hosts.map ((_, value) =>
> new
> >>> KeyValue(value, value))
> >>>
> >>>     // keys have changed - hence need new serdes
> >>>     val groupedStream: KGroupedStream[String, String] =
> >>> hostPairs.groupByKey(stringSerde,
> >>> stringSerde)
> >>>
> >>>     val counts: KTable[String, java.lang.Long] =
> >>> groupedStream.count(ACCESS_COUNT_PER_HOST_STORE)
> >>>
> >>> Now in 1.0.0, this variant of count on KGroupedStream has been
> deprecated
> >>> and the one that is introduced takes only KeyValueStore of Array[Byte]
> ..
> >>>
> >>> KTable<K,java.lang.Long>
> >>> >
> >>> count(Materialized<K,java.lang.Long,KeyValueStore<org.
> apache.kafka.common.utils.Bytes,byte[]>>
> >>> > materialized)
> >>> > Count the number of records in this stream by the grouped key.
> >>>
> >>>
> >>> I **cannot** do the following since I have String in my KeyValueStore
> ..
> >>>
> >>>     val counts: KTable[String, java.lang.Long] =
> >>>       groupedStream.count(Materialized.as[String, java.lang.Long,
> >>> KeyValueStore[Bytes, Array[Byte]]](ACCESS_COUNT_PER_HOST_STORE))
> >>>
> >>> Any suggestions as to how I can move the above code to 1.0.0 ?
> >>>
> >>> regards.
> >>>
> >>> --
> >>> Debasish Ghosh
> >>> http://manning.com/ghosh2
> >>> http://manning.com/ghosh
> >>>
> >>> Twttr: @debasishg
> >>> Blog: http://debasishg.blogspot.com
> >>> Code: http://github.com/debasishg
> >>>
> >>
> >
> >
> > --
> > Debasish Ghosh
> > http://manning.com/ghosh2
> > http://manning.com/ghosh
> >
> > Twttr: @debasishg
> > Blog: http://debasishg.blogspot.com
> > Code: http://github.com/debasishg
> >
>



-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to