Hi everyone! I posted this same question on stackoverflow <https://stackoverflow.com/questions/65888756/clarify-the-order-of-execution-for-the-subtractor-and-adder-is-not-defined> a few days ago but didn't get any responses. Was hoping someone here might be able to help clarify this part of the documentation for me :)
On Thu, 28 Jan 2021 at 19:50, Fq Public <[email protected]> wrote: > The Streams DSL documentation > <https://kafka.apache.org/27/documentation/streams/developer-guide/dsl-api.html#aggregating> > includes > a caveat about using the aggregate method to transform a KGroupedTable → > KTable, as follows (emphasis mine): > > When subsequent non-null values are received for a key (e.g., UPDATE), > then (1) the subtractor is called with the old value as stored in the table > and (2) the adder is called with the new value of the input record that was > just received. *The order of execution for the subtractor and adder is > not defined.* > > My interpretation of that last line implies that one of three things can > happen: > > 1. subtractor can be called before adder > 2. adder can be called before subtractor > 3. adder and subtractor could be called at the same time > > Here is the question I'm looking to get answered: > *Are all 3 scenarios above actually possible when using the aggregate > method on a KGroupedTable?* > Or am I misinterpreting the documentation? For my use-case (detailed > below), it would be ideal if the subtractor was always called before the > adder. > ------------------------------ > > *Why is this question important?* > > If the adder and subtractor are non-commutative operations and the order > in which they are executed can vary, you can end up with different results > depending on the order of execution of adder and subtractor. An example of > a useful non-commutative operation would be something like if we’re > aggregating records into a Set: > > .aggregate[Set[Animal]](Set.empty)( > adder = (zooKey, animalValue, setOfAnimals) => setOfAnimals + animalValue, > subtractor = (zooKey, animalValue, setOfAnimals) => setOfAnimals - > animalValue > ) > > In this example, for duplicated events, if the adder is called before the > subtractor you would end up removing the value entirely from the set (which > would be problematic for most use-cases I imagine). > ------------------------------ > > *Why am I doubting the documentation (assuming my interpretation of it is > correct)?* > > 1. Seems like an unusual design choice > 2. When I've run unit tests (using TopologyTestDriver and > EmbeddedKafka), I always see the subtractor is called before the adder. > Unfortunately, if there is some kind of race condition involved, it's > entirely possible that I would never hit the other scenarios. > 3. I did try looking into the kafka-streams codebase as well. The > KTableProcessorSupplier that calls the user-supplied adder/subtracter > functions appears to be this one: > > https://github.com/apache/kafka/blob/18547633697a29b690a8fb0c24e2f0289ecf8eeb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java#L81 > and > on line 92, you can even see a comment saying "first try to remove the old > value". Unfortunately, during my own testing, I saw was that the > process function itself is called twice; first with a Change<V> value that > includes only the old value and then the process function is called > again with a Change<V> value that includes only the new value. I > haven't been able to dig deep enough to find the internal code that is > generating the old value record and the new value record (upon receiving an > update) to determine if it actually produces those records in that order. > >
