The Streams DSL documentation <https://kafka.apache.org/27/documentation/streams/developer-guide/dsl-api.html#aggregating> includes a caveat about using the aggregate method to transform a KGroupedTable → KTable, as follows (emphasis mine):
When subsequent non-null values are received for a key (e.g., UPDATE), then (1) the subtractor is called with the old value as stored in the table and (2) the adder is called with the new value of the input record that was just received. *The order of execution for the subtractor and adder is not defined.* My interpretation of that last line implies that one of three things can happen: 1. subtractor can be called before adder 2. adder can be called before subtractor 3. adder and subtractor could be called at the same time Here is the question I'm looking to get answered: *Are all 3 scenarios above actually possible when using the aggregate method on a KGroupedTable?* Or am I misinterpreting the documentation? For my use-case (detailed below), it would be ideal if the subtractor was always called before the adder. ------------------------------ *Why is this question important?* If the adder and subtractor are non-commutative operations and the order in which they are executed can vary, you can end up with different results depending on the order of execution of adder and subtractor. An example of a useful non-commutative operation would be something like if we’re aggregating records into a Set: .aggregate[Set[Animal]](Set.empty)( adder = (zooKey, animalValue, setOfAnimals) => setOfAnimals + animalValue, subtractor = (zooKey, animalValue, setOfAnimals) => setOfAnimals - animalValue ) In this example, for duplicated events, if the adder is called before the subtractor you would end up removing the value entirely from the set (which would be problematic for most use-cases I imagine). ------------------------------ *Why am I doubting the documentation (assuming my interpretation of it is correct)?* 1. Seems like an unusual design choice 2. When I've run unit tests (using TopologyTestDriver and EmbeddedKafka), I always see the subtractor is called before the adder. Unfortunately, if there is some kind of race condition involved, it's entirely possible that I would never hit the other scenarios. 3. I did try looking into the kafka-streams codebase as well. The KTableProcessorSupplier that calls the user-supplied adder/subtracter functions appears to be this one: https://github.com/apache/kafka/blob/18547633697a29b690a8fb0c24e2f0289ecf8eeb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java#L81 and on line 92, you can even see a comment saying "first try to remove the old value". Unfortunately, during my own testing, I saw was that the process function itself is called twice; first with a Change<V> value that includes only the old value and then the process function is called again with a Change<V> value that includes only the new value. I haven't been able to dig deep enough to find the internal code that is generating the old value record and the new value record (upon receiving an update) to determine if it actually produces those records in that order.
