Hi Raffaele,

Change is an internal class in Streams and also its SerDes are
internal. To consume the repartition topic you mention outside of
Streams you would need to use those internal classes (note: I've never
tried this). Those classes can change at any time. So consuming from
repartition topics for other than educational purposes is not a good
idea.

toStream() only emits the new value of the Change.

Regarding docs, since these are internals, the code is the best
source. For example:

The Change class:
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java

Here the Change class is used to first remove the old value from the
aggregate and then to add the new value to the aggregate:
https://github.com/apache/kafka/blob/873e9446ef8426061e2f1b6cd21815b270e27f03/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java#L90

Best,
Bruno

On Thu, May 14, 2020 at 8:50 PM Raffaele Esposito
<rafaelral...@gmail.com> wrote:
>
> Hi Bruno,
> Also when you mention:
>
> The record structure key, (oldValue, newValue) is called Change in
> Kafka Streams and it is used where updates are emitted downstream
>
> Does it also mean the same happen when we convert a KTable to a KStream ?
> Do you know any docs or article about this topics?
>
> Thanks again,
> Raffaele
>
>
>
> On Thu, May 14, 2020 at 8:39 PM Raffaele Esposito <rafaelral...@gmail.com>
> wrote:
>
> > Hi Bruno,
> > Thanks,
> > One more thing, As I told you I was consuming the repartitioning topic
> > created by group by
> > and I just saw the old and new value, as you are telling me now they are
> > indeed marked as old and new,
> > is this mark visible somehow consuming the repartitioning topic ?
> > Raffaele
> >
> > On Thu, May 14, 2020 at 7:48 PM Bruno Cadonna <br...@confluent.io> wrote:
> >
> >> Hi Raffaele,
> >>
> >> In your example, Kafka Streams would send the new and the old value
> >> downstream. More specifically, the groupBy() would send (as you also
> >> observed)
> >>
> >> London, (old value: London, new value: null)
> >> Berlin, (old value: null, new value: Berlin)
> >>
> >> At the count() record London, (old value: London, new value: null)
> >> would detract 1 from key London and record Berlin, (old value: null,
> >> new value: Berlin) would add 1 to Berlin.
> >>
> >> The record structure key, (oldValue, newValue) is called Change in
> >> Kafka Streams and it is used where updates are emitted downstream.
> >>
> >> Best,
> >> Bruno
> >>
> >> On Thu, May 14, 2020 at 12:17 PM Raffaele Esposito
> >> <rafaelral...@gmail.com> wrote:
> >> >
> >> > I m trying to better understand KTable and I have encountered a
> >> behaviour I
> >> > cannot wrap my mind around it.
> >> >
> >> > So* groupByKey()* can only be applied to KStream and not to KTable,
> >> that's
> >> > because of the nature of KTable that and its UPSERT logic.
> >> > What I don't understand correctly and therefore ask your help for that
> >> is
> >> > how *groupBy()* can actually be applied on KTable, the documentation
> >> says
> >> > that:
> >> >
> >> > groupBy() is a shorthand for selectKey(...).groupByKey()
> >> >
> >> > But both these operations can only be applied to KStreams.
> >> >
> >> > The documentation also says:
> >> >
> >> > Because a new key is selected, an internal repartitioning topic will be
> >> > created in Kafka ... All data of this KTable will be redistributed
> >> through
> >> > the repartitioning topic by writing all update records to and rereading
> >> all
> >> > update records from it, such that the resulting KGroupedTable is
> >> > partitioned on the new key.
> >> >
> >> > Now assume we want to count the favourite cities of users:
> >> >
> >> > We have a key table like:
> >> >
> >> > Mike,LondonAlice,Paris Fred,RomeMike,Berlin (changed his mind)
> >> >
> >> > I would use:
> >> >
> >> > KTable<String, String> usersAndCitiesTable =
> >> > builder.table("user-keys-and-cities");
> >> >
> >> >  KTable<String, Long> favouriteCities =
> >> > usersAndCitiesTable.groupBy((user,city)->new KeyValue<>(city, city))
> >> >      .count(Materialized.<String, Long, KeyValueStore<Bytes,
> >> > byte[]>>as("CountsByCity")
> >> >
> >> > I took a look at the repartitioning topic created because of the
> >> groupBy,
> >> > and can see the record mapped using the KeyValueMapper provided
> >> >
> >> > I noticed that table generates two entries when Mike changes his mind,
> >> one
> >> > for London (the old city) and one for Berlin (the new city)
> >> >
> >> > Are this entries marked somehow?  if yes, how ?
> >> >
> >> > How does Kafka make sure that on London count is applied a -1 and the
> >> > Berlin count a +1 when the new record with Mike's new favorite city
> >> arrives.
> >> >
> >> >
> >> > Any help or suggestion is highly appreciated !
> >> >
> >> > Thanks
> >>
> >

Reply via email to