Hiya Matthias, Alexandre,

Thanks for your detailed responses. Your explanation about why the order of
execution in the `KGroupedTable.aggregate` method does not matter (so much
as what's happening in the `KTable.groupBy` method) makes sense to me.

I have one follow-up question regarding this part of your statement:

The only guarantee we can provide is (given that you configured the
producer correctly to avoid re-ordring during send()), that if the
grouping key does not change, the send of the old and new value will not
be re-ordered relative to each other. The order of the send is
hard-coded in the upstream processor though:
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java#L93-L99

What would be an example of an incorrect configuration I could apply that
would result in re-ordering during the `send()` call?
As you said, "the order of the send is hard-coded in the upstream
processor" so I'm struggling to come up with an example here.

Cheers,
FQ

On Fri, 29 Jan 2021 at 17:11, Matthias J. Sax <[email protected]> wrote:

> What Alex says.
>
> The order is hard-coded (ie not race condition), but there is no
> guarantee that we won't change the order in future releases without
> notice (ie, it's not a public contract and we don't need to do a KIP to
> change it). I guess there would be a Jira about it... But as a matter of
> fact, it does not really matter (detail below).
>
> For the three scenarios you mentioned, the 3rd one cannot happen though:
> We execute an aggregator in a single thread (per shard) and thus we
> either call the adder or subtractor first.
>
>
>
> > 1. Seems like an unusual design choice
>
> Why do you think so?
>
>
>
> > first with a Change<V> value that includes only
> > the old value and then the process function is called again with a
> Change<V>
> > value that includes only the new value.
>
> In general, both records might be processed by different threads and
> thus we cannot only send one record. It's just that the TTD simulates a
> single threaded execution thus both records always end up in the same
> processor.
>
> Cf
>
> https://stackoverflow.com/questions/54372134/topologytestdriver-sending-incorrect-message-on-ktable-aggregations
>
> However, the order actually only matters if both records really end up
> in the same processor (if the grouping key did not change during the
> upstream update).
>
> Furthermore, the order actually depends not on the downstream aggregate
> implementation, but on the order of writes into the repartitions topic
> of the `groupBy()` and with multiple parallel upstream processor, those
> writes are interleaved anyway. Thus, in general, you should think of the
> "add" and "subtract" part as independent entities and not make any
> assumption about their order (also, even if the key did not change, both
> records might be interleaved by other records...)
>
> The only guarantee we can provide is (given that you configured the
> producer correctly to avoid re-ordring during send()), that if the
> grouping key does not change, the send of the old and new value will not
> be re-ordered relative to each other. The order of the send is
> hard-coded in the upstream processor though:
>
>
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java#L93-L99
>
> Thus, the order of the downstream aggregate processor is actually
> meaningless.
>
>
>
> -Matthias
>
> On 1/29/21 6:50 AM, Alexandre Brasil wrote:
> > From the source code in KGroupedTableImpl, the subtractor is always
> called
> > before the adder. By not guaranteeing the order, I think the devs meant
> > that it might change on future versions of Kafka Streams (although I'd
> > think it's unlikely to).
> >
> > I have use cases similars with your example, and that phrase worries me a
> > bit too. :)
> >
> > On Thu, Jan 28, 2021, 22:31 Fq Public <[email protected]> wrote:
> >
> >> Hi everyone! I posted this same question on stackoverflow
> >> <
> >>
> https://stackoverflow.com/questions/65888756/clarify-the-order-of-execution-for-the-subtractor-and-adder-is-not-defined
> >>>
> >> a few days ago but didn't get any responses. Was hoping someone here
> might
> >> be able to help clarify this part of the documentation for me :)
> >>
> >> On Thu, 28 Jan 2021 at 19:50, Fq Public <[email protected]> wrote:
> >>
> >>> The Streams DSL documentation
> >>> <
> >>
> https://kafka.apache.org/27/documentation/streams/developer-guide/dsl-api.html#aggregating
> >
> >> includes
> >>> a caveat about using the aggregate method to transform a KGroupedTable
> →
> >>> KTable, as follows (emphasis mine):
> >>>
> >>> When subsequent non-null values are received for a key (e.g., UPDATE),
> >>> then (1) the subtractor is called with the old value as stored in the
> >> table
> >>> and (2) the adder is called with the new value of the input record that
> >> was
> >>> just received. *The order of execution for the subtractor and adder is
> >>> not defined.*
> >>>
> >>> My interpretation of that last line implies that one of three things
> can
> >>> happen:
> >>>
> >>>    1. subtractor can be called before adder
> >>>    2. adder can be called before subtractor
> >>>    3. adder and subtractor could be called at the same time
> >>>
> >>> Here is the question I'm looking to get answered:
> >>> *Are all 3 scenarios above actually possible when using the aggregate
> >>> method on a KGroupedTable?*
> >>> Or am I misinterpreting the documentation? For my use-case (detailed
> >>> below), it would be ideal if the subtractor was always called before
> the
> >>> adder.
> >>> ------------------------------
> >>>
> >>> *Why is this question important?*
> >>>
> >>> If the adder and subtractor are non-commutative operations and the
> order
> >>> in which they are executed can vary, you can end up with different
> >> results
> >>> depending on the order of execution of adder and subtractor. An example
> >> of
> >>> a useful non-commutative operation would be something like if we’re
> >>> aggregating records into a Set:
> >>>
> >>> .aggregate[Set[Animal]](Set.empty)(
> >>>   adder = (zooKey, animalValue, setOfAnimals) => setOfAnimals +
> >> animalValue,
> >>>   subtractor = (zooKey, animalValue, setOfAnimals) => setOfAnimals -
> >> animalValue
> >>> )
> >>>
> >>> In this example, for duplicated events, if the adder is called before
> the
> >>> subtractor you would end up removing the value entirely from the set
> >> (which
> >>> would be problematic for most use-cases I imagine).
> >>> ------------------------------
> >>>
> >>> *Why am I doubting the documentation (assuming my interpretation of it
> is
> >>> correct)?*
> >>>
> >>>    1. Seems like an unusual design choice
> >>>    2. When I've run unit tests (using TopologyTestDriver and
> >>>    EmbeddedKafka), I always see the subtractor is called before the
> >> adder.
> >>>    Unfortunately, if there is some kind of race condition involved,
> it's
> >>>    entirely possible that I would never hit the other scenarios.
> >>>    3. I did try looking into the kafka-streams codebase as well. The
> >>>    KTableProcessorSupplier that calls the user-supplied
> adder/subtracter
> >>>    functions appears to be this one:
> >>>
> >>
> https://github.com/apache/kafka/blob/18547633697a29b690a8fb0c24e2f0289ecf8eeb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java#L81
> >> and
> >>>    on line 92, you can even see a comment saying "first try to remove
> >> the old
> >>>    value". Unfortunately, during my own testing, I saw was that the
> >>>    process function itself is called twice; first with a Change<V>
> value
> >> that
> >>>    includes only the old value and then the process function is called
> >>>    again with a Change<V> value that includes only the new value. I
> >>>    haven't been able to dig deep enough to find the internal code that
> is
> >>>    generating the old value record and the new value record (upon
> >> receiving an
> >>>    update) to determine if it actually produces those records in that
> >> order.
> >>>
> >>>
> >>
> >
>

Reply via email to