Hi Raffaele, In your example, Kafka Streams would send the new and the old value downstream. More specifically, the groupBy() would send (as you also observed)
London, (old value: London, new value: null) Berlin, (old value: null, new value: Berlin) At the count() record London, (old value: London, new value: null) would detract 1 from key London and record Berlin, (old value: null, new value: Berlin) would add 1 to Berlin. The record structure key, (oldValue, newValue) is called Change in Kafka Streams and it is used where updates are emitted downstream. Best, Bruno On Thu, May 14, 2020 at 12:17 PM Raffaele Esposito <rafaelral...@gmail.com> wrote: > > I m trying to better understand KTable and I have encountered a behaviour I > cannot wrap my mind around it. > > So* groupByKey()* can only be applied to KStream and not to KTable, that's > because of the nature of KTable that and its UPSERT logic. > What I don't understand correctly and therefore ask your help for that is > how *groupBy()* can actually be applied on KTable, the documentation says > that: > > groupBy() is a shorthand for selectKey(...).groupByKey() > > But both these operations can only be applied to KStreams. > > The documentation also says: > > Because a new key is selected, an internal repartitioning topic will be > created in Kafka ... All data of this KTable will be redistributed through > the repartitioning topic by writing all update records to and rereading all > update records from it, such that the resulting KGroupedTable is > partitioned on the new key. > > Now assume we want to count the favourite cities of users: > > We have a key table like: > > Mike,LondonAlice,Paris Fred,RomeMike,Berlin (changed his mind) > > I would use: > > KTable<String, String> usersAndCitiesTable = > builder.table("user-keys-and-cities"); > > KTable<String, Long> favouriteCities = > usersAndCitiesTable.groupBy((user,city)->new KeyValue<>(city, city)) > .count(Materialized.<String, Long, KeyValueStore<Bytes, > byte[]>>as("CountsByCity") > > I took a look at the repartitioning topic created because of the groupBy, > and can see the record mapped using the KeyValueMapper provided > > I noticed that table generates two entries when Mike changes his mind, one > for London (the old city) and one for Berlin (the new city) > > Are this entries marked somehow? if yes, how ? > > How does Kafka make sure that on London count is applied a -1 and the > Berlin count a +1 when the new record with Mike's new favorite city arrives. > > > Any help or suggestion is highly appreciated ! > > Thanks