Hi Bruno, Thanks, One more thing, As I told you I was consuming the repartitioning topic created by group by and I just saw the old and new value, as you are telling me now they are indeed marked as old and new, is this mark visible somehow consuming the repartitioning topic ? Raffaele
On Thu, May 14, 2020 at 7:48 PM Bruno Cadonna <br...@confluent.io> wrote: > Hi Raffaele, > > In your example, Kafka Streams would send the new and the old value > downstream. More specifically, the groupBy() would send (as you also > observed) > > London, (old value: London, new value: null) > Berlin, (old value: null, new value: Berlin) > > At the count() record London, (old value: London, new value: null) > would detract 1 from key London and record Berlin, (old value: null, > new value: Berlin) would add 1 to Berlin. > > The record structure key, (oldValue, newValue) is called Change in > Kafka Streams and it is used where updates are emitted downstream. > > Best, > Bruno > > On Thu, May 14, 2020 at 12:17 PM Raffaele Esposito > <rafaelral...@gmail.com> wrote: > > > > I m trying to better understand KTable and I have encountered a > behaviour I > > cannot wrap my mind around it. > > > > So* groupByKey()* can only be applied to KStream and not to KTable, > that's > > because of the nature of KTable that and its UPSERT logic. > > What I don't understand correctly and therefore ask your help for that is > > how *groupBy()* can actually be applied on KTable, the documentation says > > that: > > > > groupBy() is a shorthand for selectKey(...).groupByKey() > > > > But both these operations can only be applied to KStreams. > > > > The documentation also says: > > > > Because a new key is selected, an internal repartitioning topic will be > > created in Kafka ... All data of this KTable will be redistributed > through > > the repartitioning topic by writing all update records to and rereading > all > > update records from it, such that the resulting KGroupedTable is > > partitioned on the new key. > > > > Now assume we want to count the favourite cities of users: > > > > We have a key table like: > > > > Mike,LondonAlice,Paris Fred,RomeMike,Berlin (changed his mind) > > > > I would use: > > > > KTable<String, String> usersAndCitiesTable = > > builder.table("user-keys-and-cities"); > > > > KTable<String, Long> favouriteCities = > > usersAndCitiesTable.groupBy((user,city)->new KeyValue<>(city, city)) > > .count(Materialized.<String, Long, KeyValueStore<Bytes, > > byte[]>>as("CountsByCity") > > > > I took a look at the repartitioning topic created because of the groupBy, > > and can see the record mapped using the KeyValueMapper provided > > > > I noticed that table generates two entries when Mike changes his mind, > one > > for London (the old city) and one for Berlin (the new city) > > > > Are this entries marked somehow? if yes, how ? > > > > How does Kafka make sure that on London count is applied a -1 and the > > Berlin count a +1 when the new record with Mike's new favorite city > arrives. > > > > > > Any help or suggestion is highly appreciated ! > > > > Thanks >