>From the source code in KGroupedTableImpl, the subtractor is always called before the adder. By not guaranteeing the order, I think the devs meant that it might change on future versions of Kafka Streams (although I'd think it's unlikely to).
I have use cases similars with your example, and that phrase worries me a bit too. :) On Thu, Jan 28, 2021, 22:31 Fq Public <[email protected]> wrote: > Hi everyone! I posted this same question on stackoverflow > < > https://stackoverflow.com/questions/65888756/clarify-the-order-of-execution-for-the-subtractor-and-adder-is-not-defined > > > a few days ago but didn't get any responses. Was hoping someone here might > be able to help clarify this part of the documentation for me :) > > On Thu, 28 Jan 2021 at 19:50, Fq Public <[email protected]> wrote: > > > The Streams DSL documentation > > < > https://kafka.apache.org/27/documentation/streams/developer-guide/dsl-api.html#aggregating> > includes > > a caveat about using the aggregate method to transform a KGroupedTable → > > KTable, as follows (emphasis mine): > > > > When subsequent non-null values are received for a key (e.g., UPDATE), > > then (1) the subtractor is called with the old value as stored in the > table > > and (2) the adder is called with the new value of the input record that > was > > just received. *The order of execution for the subtractor and adder is > > not defined.* > > > > My interpretation of that last line implies that one of three things can > > happen: > > > > 1. subtractor can be called before adder > > 2. adder can be called before subtractor > > 3. adder and subtractor could be called at the same time > > > > Here is the question I'm looking to get answered: > > *Are all 3 scenarios above actually possible when using the aggregate > > method on a KGroupedTable?* > > Or am I misinterpreting the documentation? For my use-case (detailed > > below), it would be ideal if the subtractor was always called before the > > adder. > > ------------------------------ > > > > *Why is this question important?* > > > > If the adder and subtractor are non-commutative operations and the order > > in which they are executed can vary, you can end up with different > results > > depending on the order of execution of adder and subtractor. An example > of > > a useful non-commutative operation would be something like if we’re > > aggregating records into a Set: > > > > .aggregate[Set[Animal]](Set.empty)( > > adder = (zooKey, animalValue, setOfAnimals) => setOfAnimals + > animalValue, > > subtractor = (zooKey, animalValue, setOfAnimals) => setOfAnimals - > animalValue > > ) > > > > In this example, for duplicated events, if the adder is called before the > > subtractor you would end up removing the value entirely from the set > (which > > would be problematic for most use-cases I imagine). > > ------------------------------ > > > > *Why am I doubting the documentation (assuming my interpretation of it is > > correct)?* > > > > 1. Seems like an unusual design choice > > 2. When I've run unit tests (using TopologyTestDriver and > > EmbeddedKafka), I always see the subtractor is called before the > adder. > > Unfortunately, if there is some kind of race condition involved, it's > > entirely possible that I would never hit the other scenarios. > > 3. I did try looking into the kafka-streams codebase as well. The > > KTableProcessorSupplier that calls the user-supplied adder/subtracter > > functions appears to be this one: > > > https://github.com/apache/kafka/blob/18547633697a29b690a8fb0c24e2f0289ecf8eeb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java#L81 > and > > on line 92, you can even see a comment saying "first try to remove > the old > > value". Unfortunately, during my own testing, I saw was that the > > process function itself is called twice; first with a Change<V> value > that > > includes only the old value and then the process function is called > > again with a Change<V> value that includes only the new value. I > > haven't been able to dig deep enough to find the internal code that is > > generating the old value record and the new value record (upon > receiving an > > update) to determine if it actually produces those records in that > order. > > > > >
