Hello -

In my Kafka Streams 0.11 application I have the following transformation ..

    val hosts: KStream[Array[Byte], String] = logRecords.mapValues(record
=> record.host)

    // we are changing the key here so that we can do a groupByKey later
    val hostPairs: KStream[String, String] = hosts.map ((_, value) => new
KeyValue(value, value))

    // keys have changed - hence need new serdes
    val groupedStream: KGroupedStream[String, String] =
hostPairs.groupByKey(stringSerde,
stringSerde)

    val counts: KTable[String, java.lang.Long] =
groupedStream.count(ACCESS_COUNT_PER_HOST_STORE)

Now in 1.0.0, this variant of count on KGroupedStream has been deprecated
and the one that is introduced takes only KeyValueStore of Array[Byte] ..

KTable<K,java.lang.Long>
> count(Materialized<K,java.lang.Long,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>>
> materialized)
> Count the number of records in this stream by the grouped key.


I **cannot** do the following since I have String in my KeyValueStore ..

    val counts: KTable[String, java.lang.Long] =
      groupedStream.count(Materialized.as[String, java.lang.Long,
KeyValueStore[Bytes, Array[Byte]]](ACCESS_COUNT_PER_HOST_STORE))

Any suggestions as to how I can move the above code to 1.0.0 ?

regards.

-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to