Hi,

That shouldn't be a problem, the inner most store is of type
`KeyValueStore<Bytes, byte[]>`, however the outer store will be
`KeyValueStore<String, Long>`.
It should work fine.

Thanks,
Damian

On Wed, 15 Nov 2017 at 08:37 Debasish Ghosh <ghosh.debas...@gmail.com>
wrote:

> Hello -
>
> In my Kafka Streams 0.11 application I have the following transformation ..
>
>     val hosts: KStream[Array[Byte], String] = logRecords.mapValues(record
> => record.host)
>
>     // we are changing the key here so that we can do a groupByKey later
>     val hostPairs: KStream[String, String] = hosts.map ((_, value) => new
> KeyValue(value, value))
>
>     // keys have changed - hence need new serdes
>     val groupedStream: KGroupedStream[String, String] =
> hostPairs.groupByKey(stringSerde,
> stringSerde)
>
>     val counts: KTable[String, java.lang.Long] =
> groupedStream.count(ACCESS_COUNT_PER_HOST_STORE)
>
> Now in 1.0.0, this variant of count on KGroupedStream has been deprecated
> and the one that is introduced takes only KeyValueStore of Array[Byte] ..
>
> KTable<K,java.lang.Long>
> >
> count(Materialized<K,java.lang.Long,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>>
> > materialized)
> > Count the number of records in this stream by the grouped key.
>
>
> I **cannot** do the following since I have String in my KeyValueStore ..
>
>     val counts: KTable[String, java.lang.Long] =
>       groupedStream.count(Materialized.as[String, java.lang.Long,
> KeyValueStore[Bytes, Array[Byte]]](ACCESS_COUNT_PER_HOST_STORE))
>
> Any suggestions as to how I can move the above code to 1.0.0 ?
>
> regards.
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>

Reply via email to