Ahh okay I see (re: producer config), I didn't think about that yet. I'm using StreamsConfig.EXACTLY_ONCE processing for my Kafka Streams app so I should be protected against re-ordering of messages in case of retries (since `enable.idempotent` is automatically set to true in StreamsConfig.EXACTLY_ONCE setting).
Thank you Matthias, this has been very helpful! Cheers, Fq On Tue, 2 Feb 2021 at 02:45, Matthias J. Sax <[email protected]> wrote: > About the producer config: > > By default, the producer will re-try to send record batches in case a > send failed. Furthermore, the producer default configs allows for 5 > parallel in-flight request per TCP connect. Thus, if one request fails > while other succeed, and the producer retries the send request, the > retired request would effectively get re-ordered. > > To avoid re-ordering, you can either set config `max.in.flight.request = > 1`, or you enable idempotent writes. Ie, the default config does _not_ > guard against reordering in case of retries, but is optimized for > increased throughput. > > >> I take it this means that, if I were joining a KStream with this > KTable, it > >> is entirely possible that a KStream record could join with the KTable > in a > >> state where the subtractor has been already executed but the adder has > not > >> yet been executed for the corresponding KTable-update event? > > Yes, that would be possible (even without interleaved records, as we > always send two records). We could maybe close this gap, but the fix > would depend on a proper implementation of .equals() method for the key > type. Feel free to create a Jira ticket for this case. > > We have work in progress the improve DSL operator semantics. In the > meantime, if the current implementation of the DSL operators are an > issue, you could fall back to the PAPI and implement custom operators > that address the corner cases that are important for your use case. > > > -Matthias > > On 2/1/21 10:39 AM, Fq Public wrote: > > The other follow-up question I have is related to this part of your > > statement: > > ``` > > also, even if the key did not change, both records might be interleaved > by > > other records... > > ``` > > I take it this means that, if I were joining a KStream with this KTable, > it > > is entirely possible that a KStream record could join with the KTable in > a > > state where the subtractor has been already executed but the adder has > not > > yet been executed for the corresponding KTable-update event? > > > > Cheers, > > FQ > > > > On Mon, 1 Feb 2021 at 11:53, Fq Public <[email protected]> wrote: > > > >> Hiya Matthias, Alexandre, > >> > >> Thanks for your detailed responses. Your explanation about why the order > >> of execution in the `KGroupedTable.aggregate` method does not matter (so > >> much as what's happening in the `KTable.groupBy` method) makes sense to > me. > >> > >> I have one follow-up question regarding this part of your statement: > >> > >> The only guarantee we can provide is (given that you configured the > >> producer correctly to avoid re-ordring during send()), that if the > >> grouping key does not change, the send of the old and new value will not > >> be re-ordered relative to each other. The order of the send is > hard-coded in the upstream processor though: > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java#L93-L99 > >> > >> What would be an example of an incorrect configuration I could apply > that > >> would result in re-ordering during the `send()` call? > >> As you said, "the order of the send is hard-coded in the upstream > >> processor" so I'm struggling to come up with an example here. > >> > >> Cheers, > >> FQ > >> > >> On Fri, 29 Jan 2021 at 17:11, Matthias J. Sax <[email protected]> wrote: > >> > >>> What Alex says. > >>> > >>> The order is hard-coded (ie not race condition), but there is no > >>> guarantee that we won't change the order in future releases without > >>> notice (ie, it's not a public contract and we don't need to do a KIP to > >>> change it). I guess there would be a Jira about it... But as a matter > of > >>> fact, it does not really matter (detail below). > >>> > >>> For the three scenarios you mentioned, the 3rd one cannot happen > though: > >>> We execute an aggregator in a single thread (per shard) and thus we > >>> either call the adder or subtractor first. > >>> > >>> > >>> > >>>> 1. Seems like an unusual design choice > >>> > >>> Why do you think so? > >>> > >>> > >>> > >>>> first with a Change<V> value that includes only > >>>> the old value and then the process function is called again with a > >>> Change<V> > >>>> value that includes only the new value. > >>> > >>> In general, both records might be processed by different threads and > >>> thus we cannot only send one record. It's just that the TTD simulates a > >>> single threaded execution thus both records always end up in the same > >>> processor. > >>> > >>> Cf > >>> > >>> > https://stackoverflow.com/questions/54372134/topologytestdriver-sending-incorrect-message-on-ktable-aggregations > >>> > >>> However, the order actually only matters if both records really end up > >>> in the same processor (if the grouping key did not change during the > >>> upstream update). > >>> > >>> Furthermore, the order actually depends not on the downstream aggregate > >>> implementation, but on the order of writes into the repartitions topic > >>> of the `groupBy()` and with multiple parallel upstream processor, those > >>> writes are interleaved anyway. Thus, in general, you should think of > the > >>> "add" and "subtract" part as independent entities and not make any > >>> assumption about their order (also, even if the key did not change, > both > >>> records might be interleaved by other records...) > >>> > >>> The only guarantee we can provide is (given that you configured the > >>> producer correctly to avoid re-ordring during send()), that if the > >>> grouping key does not change, the send of the old and new value will > not > >>> be re-ordered relative to each other. The order of the send is > >>> hard-coded in the upstream processor though: > >>> > >>> > >>> > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java#L93-L99 > >>> > >>> Thus, the order of the downstream aggregate processor is actually > >>> meaningless. > >>> > >>> > >>> > >>> -Matthias > >>> > >>> On 1/29/21 6:50 AM, Alexandre Brasil wrote: > >>>> From the source code in KGroupedTableImpl, the subtractor is always > >>> called > >>>> before the adder. By not guaranteeing the order, I think the devs > meant > >>>> that it might change on future versions of Kafka Streams (although I'd > >>>> think it's unlikely to). > >>>> > >>>> I have use cases similars with your example, and that phrase worries > me > >>> a > >>>> bit too. :) > >>>> > >>>> On Thu, Jan 28, 2021, 22:31 Fq Public <[email protected]> wrote: > >>>> > >>>>> Hi everyone! I posted this same question on stackoverflow > >>>>> < > >>>>> > >>> > https://stackoverflow.com/questions/65888756/clarify-the-order-of-execution-for-the-subtractor-and-adder-is-not-defined > >>>>>> > >>>>> a few days ago but didn't get any responses. Was hoping someone here > >>> might > >>>>> be able to help clarify this part of the documentation for me :) > >>>>> > >>>>> On Thu, 28 Jan 2021 at 19:50, Fq Public <[email protected]> > wrote: > >>>>> > >>>>>> The Streams DSL documentation > >>>>>> < > >>>>> > >>> > https://kafka.apache.org/27/documentation/streams/developer-guide/dsl-api.html#aggregating > >>>> > >>>>> includes > >>>>>> a caveat about using the aggregate method to transform a > >>> KGroupedTable → > >>>>>> KTable, as follows (emphasis mine): > >>>>>> > >>>>>> When subsequent non-null values are received for a key (e.g., > UPDATE), > >>>>>> then (1) the subtractor is called with the old value as stored in > the > >>>>> table > >>>>>> and (2) the adder is called with the new value of the input record > >>> that > >>>>> was > >>>>>> just received. *The order of execution for the subtractor and adder > is > >>>>>> not defined.* > >>>>>> > >>>>>> My interpretation of that last line implies that one of three things > >>> can > >>>>>> happen: > >>>>>> > >>>>>> 1. subtractor can be called before adder > >>>>>> 2. adder can be called before subtractor > >>>>>> 3. adder and subtractor could be called at the same time > >>>>>> > >>>>>> Here is the question I'm looking to get answered: > >>>>>> *Are all 3 scenarios above actually possible when using the > aggregate > >>>>>> method on a KGroupedTable?* > >>>>>> Or am I misinterpreting the documentation? For my use-case (detailed > >>>>>> below), it would be ideal if the subtractor was always called before > >>> the > >>>>>> adder. > >>>>>> ------------------------------ > >>>>>> > >>>>>> *Why is this question important?* > >>>>>> > >>>>>> If the adder and subtractor are non-commutative operations and the > >>> order > >>>>>> in which they are executed can vary, you can end up with different > >>>>> results > >>>>>> depending on the order of execution of adder and subtractor. An > >>> example > >>>>> of > >>>>>> a useful non-commutative operation would be something like if we’re > >>>>>> aggregating records into a Set: > >>>>>> > >>>>>> .aggregate[Set[Animal]](Set.empty)( > >>>>>> adder = (zooKey, animalValue, setOfAnimals) => setOfAnimals + > >>>>> animalValue, > >>>>>> subtractor = (zooKey, animalValue, setOfAnimals) => setOfAnimals - > >>>>> animalValue > >>>>>> ) > >>>>>> > >>>>>> In this example, for duplicated events, if the adder is called > before > >>> the > >>>>>> subtractor you would end up removing the value entirely from the set > >>>>> (which > >>>>>> would be problematic for most use-cases I imagine). > >>>>>> ------------------------------ > >>>>>> > >>>>>> *Why am I doubting the documentation (assuming my interpretation of > >>> it is > >>>>>> correct)?* > >>>>>> > >>>>>> 1. Seems like an unusual design choice > >>>>>> 2. When I've run unit tests (using TopologyTestDriver and > >>>>>> EmbeddedKafka), I always see the subtractor is called before the > >>>>> adder. > >>>>>> Unfortunately, if there is some kind of race condition involved, > >>> it's > >>>>>> entirely possible that I would never hit the other scenarios. > >>>>>> 3. I did try looking into the kafka-streams codebase as well. The > >>>>>> KTableProcessorSupplier that calls the user-supplied > >>> adder/subtracter > >>>>>> functions appears to be this one: > >>>>>> > >>>>> > >>> > https://github.com/apache/kafka/blob/18547633697a29b690a8fb0c24e2f0289ecf8eeb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java#L81 > >>>>> and > >>>>>> on line 92, you can even see a comment saying "first try to > remove > >>>>> the old > >>>>>> value". Unfortunately, during my own testing, I saw was that the > >>>>>> process function itself is called twice; first with a Change<V> > >>> value > >>>>> that > >>>>>> includes only the old value and then the process function is > called > >>>>>> again with a Change<V> value that includes only the new value. I > >>>>>> haven't been able to dig deep enough to find the internal code > >>> that is > >>>>>> generating the old value record and the new value record (upon > >>>>> receiving an > >>>>>> update) to determine if it actually produces those records in > that > >>>>> order. > >>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > > >
