Hiya Matthias, Alexandre, Thanks for your detailed responses. Your explanation about why the order of execution in the `KGroupedTable.aggregate` method does not matter (so much as what's happening in the `KTable.groupBy` method) makes sense to me.
I have one follow-up question regarding this part of your statement: The only guarantee we can provide is (given that you configured the producer correctly to avoid re-ordring during send()), that if the grouping key does not change, the send of the old and new value will not be re-ordered relative to each other. The order of the send is hard-coded in the upstream processor though: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java#L93-L99 What would be an example of an incorrect configuration I could apply that would result in re-ordering during the `send()` call? As you said, "the order of the send is hard-coded in the upstream processor" so I'm struggling to come up with an example here. Cheers, FQ On Fri, 29 Jan 2021 at 17:11, Matthias J. Sax <[email protected]> wrote: > What Alex says. > > The order is hard-coded (ie not race condition), but there is no > guarantee that we won't change the order in future releases without > notice (ie, it's not a public contract and we don't need to do a KIP to > change it). I guess there would be a Jira about it... But as a matter of > fact, it does not really matter (detail below). > > For the three scenarios you mentioned, the 3rd one cannot happen though: > We execute an aggregator in a single thread (per shard) and thus we > either call the adder or subtractor first. > > > > > 1. Seems like an unusual design choice > > Why do you think so? > > > > > first with a Change<V> value that includes only > > the old value and then the process function is called again with a > Change<V> > > value that includes only the new value. > > In general, both records might be processed by different threads and > thus we cannot only send one record. It's just that the TTD simulates a > single threaded execution thus both records always end up in the same > processor. > > Cf > > https://stackoverflow.com/questions/54372134/topologytestdriver-sending-incorrect-message-on-ktable-aggregations > > However, the order actually only matters if both records really end up > in the same processor (if the grouping key did not change during the > upstream update). > > Furthermore, the order actually depends not on the downstream aggregate > implementation, but on the order of writes into the repartitions topic > of the `groupBy()` and with multiple parallel upstream processor, those > writes are interleaved anyway. Thus, in general, you should think of the > "add" and "subtract" part as independent entities and not make any > assumption about their order (also, even if the key did not change, both > records might be interleaved by other records...) > > The only guarantee we can provide is (given that you configured the > producer correctly to avoid re-ordring during send()), that if the > grouping key does not change, the send of the old and new value will not > be re-ordered relative to each other. The order of the send is > hard-coded in the upstream processor though: > > > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java#L93-L99 > > Thus, the order of the downstream aggregate processor is actually > meaningless. > > > > -Matthias > > On 1/29/21 6:50 AM, Alexandre Brasil wrote: > > From the source code in KGroupedTableImpl, the subtractor is always > called > > before the adder. By not guaranteeing the order, I think the devs meant > > that it might change on future versions of Kafka Streams (although I'd > > think it's unlikely to). > > > > I have use cases similars with your example, and that phrase worries me a > > bit too. :) > > > > On Thu, Jan 28, 2021, 22:31 Fq Public <[email protected]> wrote: > > > >> Hi everyone! I posted this same question on stackoverflow > >> < > >> > https://stackoverflow.com/questions/65888756/clarify-the-order-of-execution-for-the-subtractor-and-adder-is-not-defined > >>> > >> a few days ago but didn't get any responses. Was hoping someone here > might > >> be able to help clarify this part of the documentation for me :) > >> > >> On Thu, 28 Jan 2021 at 19:50, Fq Public <[email protected]> wrote: > >> > >>> The Streams DSL documentation > >>> < > >> > https://kafka.apache.org/27/documentation/streams/developer-guide/dsl-api.html#aggregating > > > >> includes > >>> a caveat about using the aggregate method to transform a KGroupedTable > → > >>> KTable, as follows (emphasis mine): > >>> > >>> When subsequent non-null values are received for a key (e.g., UPDATE), > >>> then (1) the subtractor is called with the old value as stored in the > >> table > >>> and (2) the adder is called with the new value of the input record that > >> was > >>> just received. *The order of execution for the subtractor and adder is > >>> not defined.* > >>> > >>> My interpretation of that last line implies that one of three things > can > >>> happen: > >>> > >>> 1. subtractor can be called before adder > >>> 2. adder can be called before subtractor > >>> 3. adder and subtractor could be called at the same time > >>> > >>> Here is the question I'm looking to get answered: > >>> *Are all 3 scenarios above actually possible when using the aggregate > >>> method on a KGroupedTable?* > >>> Or am I misinterpreting the documentation? For my use-case (detailed > >>> below), it would be ideal if the subtractor was always called before > the > >>> adder. > >>> ------------------------------ > >>> > >>> *Why is this question important?* > >>> > >>> If the adder and subtractor are non-commutative operations and the > order > >>> in which they are executed can vary, you can end up with different > >> results > >>> depending on the order of execution of adder and subtractor. An example > >> of > >>> a useful non-commutative operation would be something like if we’re > >>> aggregating records into a Set: > >>> > >>> .aggregate[Set[Animal]](Set.empty)( > >>> adder = (zooKey, animalValue, setOfAnimals) => setOfAnimals + > >> animalValue, > >>> subtractor = (zooKey, animalValue, setOfAnimals) => setOfAnimals - > >> animalValue > >>> ) > >>> > >>> In this example, for duplicated events, if the adder is called before > the > >>> subtractor you would end up removing the value entirely from the set > >> (which > >>> would be problematic for most use-cases I imagine). > >>> ------------------------------ > >>> > >>> *Why am I doubting the documentation (assuming my interpretation of it > is > >>> correct)?* > >>> > >>> 1. Seems like an unusual design choice > >>> 2. When I've run unit tests (using TopologyTestDriver and > >>> EmbeddedKafka), I always see the subtractor is called before the > >> adder. > >>> Unfortunately, if there is some kind of race condition involved, > it's > >>> entirely possible that I would never hit the other scenarios. > >>> 3. I did try looking into the kafka-streams codebase as well. The > >>> KTableProcessorSupplier that calls the user-supplied > adder/subtracter > >>> functions appears to be this one: > >>> > >> > https://github.com/apache/kafka/blob/18547633697a29b690a8fb0c24e2f0289ecf8eeb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java#L81 > >> and > >>> on line 92, you can even see a comment saying "first try to remove > >> the old > >>> value". Unfortunately, during my own testing, I saw was that the > >>> process function itself is called twice; first with a Change<V> > value > >> that > >>> includes only the old value and then the process function is called > >>> again with a Change<V> value that includes only the new value. I > >>> haven't been able to dig deep enough to find the internal code that > is > >>> generating the old value record and the new value record (upon > >> receiving an > >>> update) to determine if it actually produces those records in that > >> order. > >>> > >>> > >> > > >
