Hi, That shouldn't be a problem, the inner most store is of type `KeyValueStore<Bytes, byte[]>`, however the outer store will be `KeyValueStore<String, Long>`. It should work fine.
Thanks, Damian On Wed, 15 Nov 2017 at 08:37 Debasish Ghosh <ghosh.debas...@gmail.com> wrote: > Hello - > > In my Kafka Streams 0.11 application I have the following transformation .. > > val hosts: KStream[Array[Byte], String] = logRecords.mapValues(record > => record.host) > > // we are changing the key here so that we can do a groupByKey later > val hostPairs: KStream[String, String] = hosts.map ((_, value) => new > KeyValue(value, value)) > > // keys have changed - hence need new serdes > val groupedStream: KGroupedStream[String, String] = > hostPairs.groupByKey(stringSerde, > stringSerde) > > val counts: KTable[String, java.lang.Long] = > groupedStream.count(ACCESS_COUNT_PER_HOST_STORE) > > Now in 1.0.0, this variant of count on KGroupedStream has been deprecated > and the one that is introduced takes only KeyValueStore of Array[Byte] .. > > KTable<K,java.lang.Long> > > > count(Materialized<K,java.lang.Long,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> > > materialized) > > Count the number of records in this stream by the grouped key. > > > I **cannot** do the following since I have String in my KeyValueStore .. > > val counts: KTable[String, java.lang.Long] = > groupedStream.count(Materialized.as[String, java.lang.Long, > KeyValueStore[Bytes, Array[Byte]]](ACCESS_COUNT_PER_HOST_STORE)) > > Any suggestions as to how I can move the above code to 1.0.0 ? > > regards. > > -- > Debasish Ghosh > http://manning.com/ghosh2 > http://manning.com/ghosh > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com > Code: http://github.com/debasishg >