About the producer config: By default, the producer will re-try to send record batches in case a send failed. Furthermore, the producer default configs allows for 5 parallel in-flight request per TCP connect. Thus, if one request fails while other succeed, and the producer retries the send request, the retired request would effectively get re-ordered.
To avoid re-ordering, you can either set config `max.in.flight.request = 1`, or you enable idempotent writes. Ie, the default config does _not_ guard against reordering in case of retries, but is optimized for increased throughput. >> I take it this means that, if I were joining a KStream with this KTable, it >> is entirely possible that a KStream record could join with the KTable in a >> state where the subtractor has been already executed but the adder has not >> yet been executed for the corresponding KTable-update event? Yes, that would be possible (even without interleaved records, as we always send two records). We could maybe close this gap, but the fix would depend on a proper implementation of .equals() method for the key type. Feel free to create a Jira ticket for this case. We have work in progress the improve DSL operator semantics. In the meantime, if the current implementation of the DSL operators are an issue, you could fall back to the PAPI and implement custom operators that address the corner cases that are important for your use case. -Matthias On 2/1/21 10:39 AM, Fq Public wrote: > The other follow-up question I have is related to this part of your > statement: > ``` > also, even if the key did not change, both records might be interleaved by > other records... > ``` > I take it this means that, if I were joining a KStream with this KTable, it > is entirely possible that a KStream record could join with the KTable in a > state where the subtractor has been already executed but the adder has not > yet been executed for the corresponding KTable-update event? > > Cheers, > FQ > > On Mon, 1 Feb 2021 at 11:53, Fq Public <[email protected]> wrote: > >> Hiya Matthias, Alexandre, >> >> Thanks for your detailed responses. Your explanation about why the order >> of execution in the `KGroupedTable.aggregate` method does not matter (so >> much as what's happening in the `KTable.groupBy` method) makes sense to me. >> >> I have one follow-up question regarding this part of your statement: >> >> The only guarantee we can provide is (given that you configured the >> producer correctly to avoid re-ordring during send()), that if the >> grouping key does not change, the send of the old and new value will not >> be re-ordered relative to each other. The order of the send is hard-coded in >> the upstream processor though: >> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java#L93-L99 >> >> What would be an example of an incorrect configuration I could apply that >> would result in re-ordering during the `send()` call? >> As you said, "the order of the send is hard-coded in the upstream >> processor" so I'm struggling to come up with an example here. >> >> Cheers, >> FQ >> >> On Fri, 29 Jan 2021 at 17:11, Matthias J. Sax <[email protected]> wrote: >> >>> What Alex says. >>> >>> The order is hard-coded (ie not race condition), but there is no >>> guarantee that we won't change the order in future releases without >>> notice (ie, it's not a public contract and we don't need to do a KIP to >>> change it). I guess there would be a Jira about it... But as a matter of >>> fact, it does not really matter (detail below). >>> >>> For the three scenarios you mentioned, the 3rd one cannot happen though: >>> We execute an aggregator in a single thread (per shard) and thus we >>> either call the adder or subtractor first. >>> >>> >>> >>>> 1. Seems like an unusual design choice >>> >>> Why do you think so? >>> >>> >>> >>>> first with a Change<V> value that includes only >>>> the old value and then the process function is called again with a >>> Change<V> >>>> value that includes only the new value. >>> >>> In general, both records might be processed by different threads and >>> thus we cannot only send one record. It's just that the TTD simulates a >>> single threaded execution thus both records always end up in the same >>> processor. >>> >>> Cf >>> >>> https://stackoverflow.com/questions/54372134/topologytestdriver-sending-incorrect-message-on-ktable-aggregations >>> >>> However, the order actually only matters if both records really end up >>> in the same processor (if the grouping key did not change during the >>> upstream update). >>> >>> Furthermore, the order actually depends not on the downstream aggregate >>> implementation, but on the order of writes into the repartitions topic >>> of the `groupBy()` and with multiple parallel upstream processor, those >>> writes are interleaved anyway. Thus, in general, you should think of the >>> "add" and "subtract" part as independent entities and not make any >>> assumption about their order (also, even if the key did not change, both >>> records might be interleaved by other records...) >>> >>> The only guarantee we can provide is (given that you configured the >>> producer correctly to avoid re-ordring during send()), that if the >>> grouping key does not change, the send of the old and new value will not >>> be re-ordered relative to each other. The order of the send is >>> hard-coded in the upstream processor though: >>> >>> >>> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java#L93-L99 >>> >>> Thus, the order of the downstream aggregate processor is actually >>> meaningless. >>> >>> >>> >>> -Matthias >>> >>> On 1/29/21 6:50 AM, Alexandre Brasil wrote: >>>> From the source code in KGroupedTableImpl, the subtractor is always >>> called >>>> before the adder. By not guaranteeing the order, I think the devs meant >>>> that it might change on future versions of Kafka Streams (although I'd >>>> think it's unlikely to). >>>> >>>> I have use cases similars with your example, and that phrase worries me >>> a >>>> bit too. :) >>>> >>>> On Thu, Jan 28, 2021, 22:31 Fq Public <[email protected]> wrote: >>>> >>>>> Hi everyone! I posted this same question on stackoverflow >>>>> < >>>>> >>> https://stackoverflow.com/questions/65888756/clarify-the-order-of-execution-for-the-subtractor-and-adder-is-not-defined >>>>>> >>>>> a few days ago but didn't get any responses. Was hoping someone here >>> might >>>>> be able to help clarify this part of the documentation for me :) >>>>> >>>>> On Thu, 28 Jan 2021 at 19:50, Fq Public <[email protected]> wrote: >>>>> >>>>>> The Streams DSL documentation >>>>>> < >>>>> >>> https://kafka.apache.org/27/documentation/streams/developer-guide/dsl-api.html#aggregating >>>> >>>>> includes >>>>>> a caveat about using the aggregate method to transform a >>> KGroupedTable → >>>>>> KTable, as follows (emphasis mine): >>>>>> >>>>>> When subsequent non-null values are received for a key (e.g., UPDATE), >>>>>> then (1) the subtractor is called with the old value as stored in the >>>>> table >>>>>> and (2) the adder is called with the new value of the input record >>> that >>>>> was >>>>>> just received. *The order of execution for the subtractor and adder is >>>>>> not defined.* >>>>>> >>>>>> My interpretation of that last line implies that one of three things >>> can >>>>>> happen: >>>>>> >>>>>> 1. subtractor can be called before adder >>>>>> 2. adder can be called before subtractor >>>>>> 3. adder and subtractor could be called at the same time >>>>>> >>>>>> Here is the question I'm looking to get answered: >>>>>> *Are all 3 scenarios above actually possible when using the aggregate >>>>>> method on a KGroupedTable?* >>>>>> Or am I misinterpreting the documentation? For my use-case (detailed >>>>>> below), it would be ideal if the subtractor was always called before >>> the >>>>>> adder. >>>>>> ------------------------------ >>>>>> >>>>>> *Why is this question important?* >>>>>> >>>>>> If the adder and subtractor are non-commutative operations and the >>> order >>>>>> in which they are executed can vary, you can end up with different >>>>> results >>>>>> depending on the order of execution of adder and subtractor. An >>> example >>>>> of >>>>>> a useful non-commutative operation would be something like if we’re >>>>>> aggregating records into a Set: >>>>>> >>>>>> .aggregate[Set[Animal]](Set.empty)( >>>>>> adder = (zooKey, animalValue, setOfAnimals) => setOfAnimals + >>>>> animalValue, >>>>>> subtractor = (zooKey, animalValue, setOfAnimals) => setOfAnimals - >>>>> animalValue >>>>>> ) >>>>>> >>>>>> In this example, for duplicated events, if the adder is called before >>> the >>>>>> subtractor you would end up removing the value entirely from the set >>>>> (which >>>>>> would be problematic for most use-cases I imagine). >>>>>> ------------------------------ >>>>>> >>>>>> *Why am I doubting the documentation (assuming my interpretation of >>> it is >>>>>> correct)?* >>>>>> >>>>>> 1. Seems like an unusual design choice >>>>>> 2. When I've run unit tests (using TopologyTestDriver and >>>>>> EmbeddedKafka), I always see the subtractor is called before the >>>>> adder. >>>>>> Unfortunately, if there is some kind of race condition involved, >>> it's >>>>>> entirely possible that I would never hit the other scenarios. >>>>>> 3. I did try looking into the kafka-streams codebase as well. The >>>>>> KTableProcessorSupplier that calls the user-supplied >>> adder/subtracter >>>>>> functions appears to be this one: >>>>>> >>>>> >>> https://github.com/apache/kafka/blob/18547633697a29b690a8fb0c24e2f0289ecf8eeb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java#L81 >>>>> and >>>>>> on line 92, you can even see a comment saying "first try to remove >>>>> the old >>>>>> value". Unfortunately, during my own testing, I saw was that the >>>>>> process function itself is called twice; first with a Change<V> >>> value >>>>> that >>>>>> includes only the old value and then the process function is called >>>>>> again with a Change<V> value that includes only the new value. I >>>>>> haven't been able to dig deep enough to find the internal code >>> that is >>>>>> generating the old value record and the new value record (upon >>>>> receiving an >>>>>> update) to determine if it actually produces those records in that >>>>> order. >>>>>> >>>>>> >>>>> >>>> >>> >> >
