Hi Bruno,
Thanks,
One more thing, As I told you I was consuming the repartitioning topic
created by group by
and I just saw the old and new value, as you are telling me now they are
indeed marked as old and new,
is this mark visible somehow consuming the repartitioning topic ?
Raffaele

On Thu, May 14, 2020 at 7:48 PM Bruno Cadonna <br...@confluent.io> wrote:

> Hi Raffaele,
>
> In your example, Kafka Streams would send the new and the old value
> downstream. More specifically, the groupBy() would send (as you also
> observed)
>
> London, (old value: London, new value: null)
> Berlin, (old value: null, new value: Berlin)
>
> At the count() record London, (old value: London, new value: null)
> would detract 1 from key London and record Berlin, (old value: null,
> new value: Berlin) would add 1 to Berlin.
>
> The record structure key, (oldValue, newValue) is called Change in
> Kafka Streams and it is used where updates are emitted downstream.
>
> Best,
> Bruno
>
> On Thu, May 14, 2020 at 12:17 PM Raffaele Esposito
> <rafaelral...@gmail.com> wrote:
> >
> > I m trying to better understand KTable and I have encountered a
> behaviour I
> > cannot wrap my mind around it.
> >
> > So* groupByKey()* can only be applied to KStream and not to KTable,
> that's
> > because of the nature of KTable that and its UPSERT logic.
> > What I don't understand correctly and therefore ask your help for that is
> > how *groupBy()* can actually be applied on KTable, the documentation says
> > that:
> >
> > groupBy() is a shorthand for selectKey(...).groupByKey()
> >
> > But both these operations can only be applied to KStreams.
> >
> > The documentation also says:
> >
> > Because a new key is selected, an internal repartitioning topic will be
> > created in Kafka ... All data of this KTable will be redistributed
> through
> > the repartitioning topic by writing all update records to and rereading
> all
> > update records from it, such that the resulting KGroupedTable is
> > partitioned on the new key.
> >
> > Now assume we want to count the favourite cities of users:
> >
> > We have a key table like:
> >
> > Mike,LondonAlice,Paris Fred,RomeMike,Berlin (changed his mind)
> >
> > I would use:
> >
> > KTable<String, String> usersAndCitiesTable =
> > builder.table("user-keys-and-cities");
> >
> >  KTable<String, Long> favouriteCities =
> > usersAndCitiesTable.groupBy((user,city)->new KeyValue<>(city, city))
> >      .count(Materialized.<String, Long, KeyValueStore<Bytes,
> > byte[]>>as("CountsByCity")
> >
> > I took a look at the repartitioning topic created because of the groupBy,
> > and can see the record mapped using the KeyValueMapper provided
> >
> > I noticed that table generates two entries when Mike changes his mind,
> one
> > for London (the old city) and one for Berlin (the new city)
> >
> > Are this entries marked somehow?  if yes, how ?
> >
> > How does Kafka make sure that on London count is applied a -1 and the
> > Berlin count a +1 when the new record with Mike's new favorite city
> arrives.
> >
> >
> > Any help or suggestion is highly appreciated !
> >
> > Thanks
>

Reply via email to