I m trying to better understand KTable and I have encountered a behaviour I
cannot wrap my mind around it.

So* groupByKey()* can only be applied to KStream and not to KTable, that's
because of the nature of KTable that and its UPSERT logic.
What I don't understand correctly and therefore ask your help for that is
how *groupBy()* can actually be applied on KTable, the documentation says
that:

groupBy() is a shorthand for selectKey(...).groupByKey()

But both these operations can only be applied to KStreams.

The documentation also says:

Because a new key is selected, an internal repartitioning topic will be
created in Kafka ... All data of this KTable will be redistributed through
the repartitioning topic by writing all update records to and rereading all
update records from it, such that the resulting KGroupedTable is
partitioned on the new key.

Now assume we want to count the favourite cities of users:

We have a key table like:

Mike,LondonAlice,Paris Fred,RomeMike,Berlin (changed his mind)

I would use:

KTable<String, String> usersAndCitiesTable =
builder.table("user-keys-and-cities");

 KTable<String, Long> favouriteCities =
usersAndCitiesTable.groupBy((user,city)->new KeyValue<>(city, city))
     .count(Materialized.<String, Long, KeyValueStore<Bytes,
byte[]>>as("CountsByCity")

I took a look at the repartitioning topic created because of the groupBy,
and can see the record mapped using the KeyValueMapper provided

I noticed that table generates two entries when Mike changes his mind, one
for London (the old city) and one for Berlin (the new city)

Are this entries marked somehow?  if yes, how ?

How does Kafka make sure that on London count is applied a -1 and the
Berlin count a +1 when the new record with Mike's new favorite city arrives.


Any help or suggestion is highly appreciated !

Thanks

Reply via email to