The other follow-up question I have is related to this part of your statement: ``` also, even if the key did not change, both records might be interleaved by other records... ``` I take it this means that, if I were joining a KStream with this KTable, it is entirely possible that a KStream record could join with the KTable in a state where the subtractor has been already executed but the adder has not yet been executed for the corresponding KTable-update event?
Cheers, FQ On Mon, 1 Feb 2021 at 11:53, Fq Public <[email protected]> wrote: > Hiya Matthias, Alexandre, > > Thanks for your detailed responses. Your explanation about why the order > of execution in the `KGroupedTable.aggregate` method does not matter (so > much as what's happening in the `KTable.groupBy` method) makes sense to me. > > I have one follow-up question regarding this part of your statement: > > The only guarantee we can provide is (given that you configured the > producer correctly to avoid re-ordring during send()), that if the > grouping key does not change, the send of the old and new value will not > be re-ordered relative to each other. The order of the send is hard-coded in > the upstream processor though: > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java#L93-L99 > > What would be an example of an incorrect configuration I could apply that > would result in re-ordering during the `send()` call? > As you said, "the order of the send is hard-coded in the upstream > processor" so I'm struggling to come up with an example here. > > Cheers, > FQ > > On Fri, 29 Jan 2021 at 17:11, Matthias J. Sax <[email protected]> wrote: > >> What Alex says. >> >> The order is hard-coded (ie not race condition), but there is no >> guarantee that we won't change the order in future releases without >> notice (ie, it's not a public contract and we don't need to do a KIP to >> change it). I guess there would be a Jira about it... But as a matter of >> fact, it does not really matter (detail below). >> >> For the three scenarios you mentioned, the 3rd one cannot happen though: >> We execute an aggregator in a single thread (per shard) and thus we >> either call the adder or subtractor first. >> >> >> >> > 1. Seems like an unusual design choice >> >> Why do you think so? >> >> >> >> > first with a Change<V> value that includes only >> > the old value and then the process function is called again with a >> Change<V> >> > value that includes only the new value. >> >> In general, both records might be processed by different threads and >> thus we cannot only send one record. It's just that the TTD simulates a >> single threaded execution thus both records always end up in the same >> processor. >> >> Cf >> >> https://stackoverflow.com/questions/54372134/topologytestdriver-sending-incorrect-message-on-ktable-aggregations >> >> However, the order actually only matters if both records really end up >> in the same processor (if the grouping key did not change during the >> upstream update). >> >> Furthermore, the order actually depends not on the downstream aggregate >> implementation, but on the order of writes into the repartitions topic >> of the `groupBy()` and with multiple parallel upstream processor, those >> writes are interleaved anyway. Thus, in general, you should think of the >> "add" and "subtract" part as independent entities and not make any >> assumption about their order (also, even if the key did not change, both >> records might be interleaved by other records...) >> >> The only guarantee we can provide is (given that you configured the >> producer correctly to avoid re-ordring during send()), that if the >> grouping key does not change, the send of the old and new value will not >> be re-ordered relative to each other. The order of the send is >> hard-coded in the upstream processor though: >> >> >> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java#L93-L99 >> >> Thus, the order of the downstream aggregate processor is actually >> meaningless. >> >> >> >> -Matthias >> >> On 1/29/21 6:50 AM, Alexandre Brasil wrote: >> > From the source code in KGroupedTableImpl, the subtractor is always >> called >> > before the adder. By not guaranteeing the order, I think the devs meant >> > that it might change on future versions of Kafka Streams (although I'd >> > think it's unlikely to). >> > >> > I have use cases similars with your example, and that phrase worries me >> a >> > bit too. :) >> > >> > On Thu, Jan 28, 2021, 22:31 Fq Public <[email protected]> wrote: >> > >> >> Hi everyone! I posted this same question on stackoverflow >> >> < >> >> >> https://stackoverflow.com/questions/65888756/clarify-the-order-of-execution-for-the-subtractor-and-adder-is-not-defined >> >>> >> >> a few days ago but didn't get any responses. Was hoping someone here >> might >> >> be able to help clarify this part of the documentation for me :) >> >> >> >> On Thu, 28 Jan 2021 at 19:50, Fq Public <[email protected]> wrote: >> >> >> >>> The Streams DSL documentation >> >>> < >> >> >> https://kafka.apache.org/27/documentation/streams/developer-guide/dsl-api.html#aggregating >> > >> >> includes >> >>> a caveat about using the aggregate method to transform a >> KGroupedTable → >> >>> KTable, as follows (emphasis mine): >> >>> >> >>> When subsequent non-null values are received for a key (e.g., UPDATE), >> >>> then (1) the subtractor is called with the old value as stored in the >> >> table >> >>> and (2) the adder is called with the new value of the input record >> that >> >> was >> >>> just received. *The order of execution for the subtractor and adder is >> >>> not defined.* >> >>> >> >>> My interpretation of that last line implies that one of three things >> can >> >>> happen: >> >>> >> >>> 1. subtractor can be called before adder >> >>> 2. adder can be called before subtractor >> >>> 3. adder and subtractor could be called at the same time >> >>> >> >>> Here is the question I'm looking to get answered: >> >>> *Are all 3 scenarios above actually possible when using the aggregate >> >>> method on a KGroupedTable?* >> >>> Or am I misinterpreting the documentation? For my use-case (detailed >> >>> below), it would be ideal if the subtractor was always called before >> the >> >>> adder. >> >>> ------------------------------ >> >>> >> >>> *Why is this question important?* >> >>> >> >>> If the adder and subtractor are non-commutative operations and the >> order >> >>> in which they are executed can vary, you can end up with different >> >> results >> >>> depending on the order of execution of adder and subtractor. An >> example >> >> of >> >>> a useful non-commutative operation would be something like if we’re >> >>> aggregating records into a Set: >> >>> >> >>> .aggregate[Set[Animal]](Set.empty)( >> >>> adder = (zooKey, animalValue, setOfAnimals) => setOfAnimals + >> >> animalValue, >> >>> subtractor = (zooKey, animalValue, setOfAnimals) => setOfAnimals - >> >> animalValue >> >>> ) >> >>> >> >>> In this example, for duplicated events, if the adder is called before >> the >> >>> subtractor you would end up removing the value entirely from the set >> >> (which >> >>> would be problematic for most use-cases I imagine). >> >>> ------------------------------ >> >>> >> >>> *Why am I doubting the documentation (assuming my interpretation of >> it is >> >>> correct)?* >> >>> >> >>> 1. Seems like an unusual design choice >> >>> 2. When I've run unit tests (using TopologyTestDriver and >> >>> EmbeddedKafka), I always see the subtractor is called before the >> >> adder. >> >>> Unfortunately, if there is some kind of race condition involved, >> it's >> >>> entirely possible that I would never hit the other scenarios. >> >>> 3. I did try looking into the kafka-streams codebase as well. The >> >>> KTableProcessorSupplier that calls the user-supplied >> adder/subtracter >> >>> functions appears to be this one: >> >>> >> >> >> https://github.com/apache/kafka/blob/18547633697a29b690a8fb0c24e2f0289ecf8eeb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java#L81 >> >> and >> >>> on line 92, you can even see a comment saying "first try to remove >> >> the old >> >>> value". Unfortunately, during my own testing, I saw was that the >> >>> process function itself is called twice; first with a Change<V> >> value >> >> that >> >>> includes only the old value and then the process function is called >> >>> again with a Change<V> value that includes only the new value. I >> >>> haven't been able to dig deep enough to find the internal code >> that is >> >>> generating the old value record and the new value record (upon >> >> receiving an >> >>> update) to determine if it actually produces those records in that >> >> order. >> >>> >> >>> >> >> >> > >> >
