Hi Raffaele,

In your example, Kafka Streams would send the new and the old value
downstream. More specifically, the groupBy() would send (as you also
observed)

London, (old value: London, new value: null)
Berlin, (old value: null, new value: Berlin)

At the count() record London, (old value: London, new value: null)
would detract 1 from key London and record Berlin, (old value: null,
new value: Berlin) would add 1 to Berlin.

The record structure key, (oldValue, newValue) is called Change in
Kafka Streams and it is used where updates are emitted downstream.

Best,
Bruno

On Thu, May 14, 2020 at 12:17 PM Raffaele Esposito
<rafaelral...@gmail.com> wrote:
>
> I m trying to better understand KTable and I have encountered a behaviour I
> cannot wrap my mind around it.
>
> So* groupByKey()* can only be applied to KStream and not to KTable, that's
> because of the nature of KTable that and its UPSERT logic.
> What I don't understand correctly and therefore ask your help for that is
> how *groupBy()* can actually be applied on KTable, the documentation says
> that:
>
> groupBy() is a shorthand for selectKey(...).groupByKey()
>
> But both these operations can only be applied to KStreams.
>
> The documentation also says:
>
> Because a new key is selected, an internal repartitioning topic will be
> created in Kafka ... All data of this KTable will be redistributed through
> the repartitioning topic by writing all update records to and rereading all
> update records from it, such that the resulting KGroupedTable is
> partitioned on the new key.
>
> Now assume we want to count the favourite cities of users:
>
> We have a key table like:
>
> Mike,LondonAlice,Paris Fred,RomeMike,Berlin (changed his mind)
>
> I would use:
>
> KTable<String, String> usersAndCitiesTable =
> builder.table("user-keys-and-cities");
>
>  KTable<String, Long> favouriteCities =
> usersAndCitiesTable.groupBy((user,city)->new KeyValue<>(city, city))
>      .count(Materialized.<String, Long, KeyValueStore<Bytes,
> byte[]>>as("CountsByCity")
>
> I took a look at the repartitioning topic created because of the groupBy,
> and can see the record mapped using the KeyValueMapper provided
>
> I noticed that table generates two entries when Mike changes his mind, one
> for London (the old city) and one for Berlin (the new city)
>
> Are this entries marked somehow?  if yes, how ?
>
> How does Kafka make sure that on London count is applied a -1 and the
> Berlin count a +1 when the new record with Mike's new favorite city arrives.
>
>
> Any help or suggestion is highly appreciated !
>
> Thanks

Reply via email to