I m trying to better understand KTable and I have encountered a behaviour I cannot wrap my mind around it.
So* groupByKey()* can only be applied to KStream and not to KTable, that's because of the nature of KTable that and its UPSERT logic. What I don't understand correctly and therefore ask your help for that is how *groupBy()* can actually be applied on KTable, the documentation says that: groupBy() is a shorthand for selectKey(...).groupByKey() But both these operations can only be applied to KStreams. The documentation also says: Because a new key is selected, an internal repartitioning topic will be created in Kafka ... All data of this KTable will be redistributed through the repartitioning topic by writing all update records to and rereading all update records from it, such that the resulting KGroupedTable is partitioned on the new key. Now assume we want to count the favourite cities of users: We have a key table like: Mike,LondonAlice,Paris Fred,RomeMike,Berlin (changed his mind) I would use: KTable<String, String> usersAndCitiesTable = builder.table("user-keys-and-cities"); KTable<String, Long> favouriteCities = usersAndCitiesTable.groupBy((user,city)->new KeyValue<>(city, city)) .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("CountsByCity") I took a look at the repartitioning topic created because of the groupBy, and can see the record mapped using the KeyValueMapper provided I noticed that table generates two entries when Mike changes his mind, one for London (the old city) and one for Berlin (the new city) Are this entries marked somehow? if yes, how ? How does Kafka make sure that on London count is applied a -1 and the Berlin count a +1 when the new record with Mike's new favorite city arrives. Any help or suggestion is highly appreciated ! Thanks