Ahh okay I see (re: producer config), I didn't think about that yet. I'm
using StreamsConfig.EXACTLY_ONCE processing for my Kafka Streams app so I
should be protected against re-ordering of messages in case of retries
(since `enable.idempotent` is automatically set to true in
StreamsConfig.EXACTLY_ONCE setting).

Thank you Matthias, this has been very helpful!

Cheers,
Fq

On Tue, 2 Feb 2021 at 02:45, Matthias J. Sax <[email protected]> wrote:

> About the producer config:
>
> By default, the producer will re-try to send record batches in case a
> send failed. Furthermore, the producer default configs allows for 5
> parallel in-flight request per TCP connect. Thus, if one request fails
> while other succeed, and the producer retries the send request, the
> retired request would effectively get re-ordered.
>
> To avoid re-ordering, you can either set config `max.in.flight.request =
> 1`, or you enable idempotent writes. Ie, the default config does _not_
> guard against reordering in case of retries, but is optimized for
> increased throughput.
>
> >> I take it this means that, if I were joining a KStream with this
> KTable, it
> >> is entirely possible that a KStream record could join with the KTable
> in a
> >> state where the subtractor has been already executed but the adder has
> not
> >> yet been executed for the corresponding KTable-update event?
>
> Yes, that would be possible (even without interleaved records, as we
> always send two records). We could maybe close this gap, but the fix
> would depend on a proper implementation of .equals() method for the key
> type. Feel free to create a Jira ticket for this case.
>
> We have work in progress the improve DSL operator semantics. In the
> meantime, if the current implementation of the DSL operators are an
> issue, you could fall back to the PAPI and implement custom operators
> that address the corner cases that are important for your use case.
>
>
> -Matthias
>
> On 2/1/21 10:39 AM, Fq Public wrote:
> > The other follow-up question I have is related to this part of your
> > statement:
> > ```
> > also, even if the key did not change, both records might be interleaved
> by
> > other records...
> > ```
> > I take it this means that, if I were joining a KStream with this KTable,
> it
> > is entirely possible that a KStream record could join with the KTable in
> a
> > state where the subtractor has been already executed but the adder has
> not
> > yet been executed for the corresponding KTable-update event?
> >
> > Cheers,
> > FQ
> >
> > On Mon, 1 Feb 2021 at 11:53, Fq Public <[email protected]> wrote:
> >
> >> Hiya Matthias, Alexandre,
> >>
> >> Thanks for your detailed responses. Your explanation about why the order
> >> of execution in the `KGroupedTable.aggregate` method does not matter (so
> >> much as what's happening in the `KTable.groupBy` method) makes sense to
> me.
> >>
> >> I have one follow-up question regarding this part of your statement:
> >>
> >> The only guarantee we can provide is (given that you configured the
> >> producer correctly to avoid re-ordring during send()), that if the
> >> grouping key does not change, the send of the old and new value will not
> >> be re-ordered relative to each other. The order of the send is
> hard-coded in the upstream processor though:
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java#L93-L99
> >>
> >> What would be an example of an incorrect configuration I could apply
> that
> >> would result in re-ordering during the `send()` call?
> >> As you said, "the order of the send is hard-coded in the upstream
> >> processor" so I'm struggling to come up with an example here.
> >>
> >> Cheers,
> >> FQ
> >>
> >> On Fri, 29 Jan 2021 at 17:11, Matthias J. Sax <[email protected]> wrote:
> >>
> >>> What Alex says.
> >>>
> >>> The order is hard-coded (ie not race condition), but there is no
> >>> guarantee that we won't change the order in future releases without
> >>> notice (ie, it's not a public contract and we don't need to do a KIP to
> >>> change it). I guess there would be a Jira about it... But as a matter
> of
> >>> fact, it does not really matter (detail below).
> >>>
> >>> For the three scenarios you mentioned, the 3rd one cannot happen
> though:
> >>> We execute an aggregator in a single thread (per shard) and thus we
> >>> either call the adder or subtractor first.
> >>>
> >>>
> >>>
> >>>> 1. Seems like an unusual design choice
> >>>
> >>> Why do you think so?
> >>>
> >>>
> >>>
> >>>> first with a Change<V> value that includes only
> >>>> the old value and then the process function is called again with a
> >>> Change<V>
> >>>> value that includes only the new value.
> >>>
> >>> In general, both records might be processed by different threads and
> >>> thus we cannot only send one record. It's just that the TTD simulates a
> >>> single threaded execution thus both records always end up in the same
> >>> processor.
> >>>
> >>> Cf
> >>>
> >>>
> https://stackoverflow.com/questions/54372134/topologytestdriver-sending-incorrect-message-on-ktable-aggregations
> >>>
> >>> However, the order actually only matters if both records really end up
> >>> in the same processor (if the grouping key did not change during the
> >>> upstream update).
> >>>
> >>> Furthermore, the order actually depends not on the downstream aggregate
> >>> implementation, but on the order of writes into the repartitions topic
> >>> of the `groupBy()` and with multiple parallel upstream processor, those
> >>> writes are interleaved anyway. Thus, in general, you should think of
> the
> >>> "add" and "subtract" part as independent entities and not make any
> >>> assumption about their order (also, even if the key did not change,
> both
> >>> records might be interleaved by other records...)
> >>>
> >>> The only guarantee we can provide is (given that you configured the
> >>> producer correctly to avoid re-ordring during send()), that if the
> >>> grouping key does not change, the send of the old and new value will
> not
> >>> be re-ordered relative to each other. The order of the send is
> >>> hard-coded in the upstream processor though:
> >>>
> >>>
> >>>
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java#L93-L99
> >>>
> >>> Thus, the order of the downstream aggregate processor is actually
> >>> meaningless.
> >>>
> >>>
> >>>
> >>> -Matthias
> >>>
> >>> On 1/29/21 6:50 AM, Alexandre Brasil wrote:
> >>>> From the source code in KGroupedTableImpl, the subtractor is always
> >>> called
> >>>> before the adder. By not guaranteeing the order, I think the devs
> meant
> >>>> that it might change on future versions of Kafka Streams (although I'd
> >>>> think it's unlikely to).
> >>>>
> >>>> I have use cases similars with your example, and that phrase worries
> me
> >>> a
> >>>> bit too. :)
> >>>>
> >>>> On Thu, Jan 28, 2021, 22:31 Fq Public <[email protected]> wrote:
> >>>>
> >>>>> Hi everyone! I posted this same question on stackoverflow
> >>>>> <
> >>>>>
> >>>
> https://stackoverflow.com/questions/65888756/clarify-the-order-of-execution-for-the-subtractor-and-adder-is-not-defined
> >>>>>>
> >>>>> a few days ago but didn't get any responses. Was hoping someone here
> >>> might
> >>>>> be able to help clarify this part of the documentation for me :)
> >>>>>
> >>>>> On Thu, 28 Jan 2021 at 19:50, Fq Public <[email protected]>
> wrote:
> >>>>>
> >>>>>> The Streams DSL documentation
> >>>>>> <
> >>>>>
> >>>
> https://kafka.apache.org/27/documentation/streams/developer-guide/dsl-api.html#aggregating
> >>>>
> >>>>> includes
> >>>>>> a caveat about using the aggregate method to transform a
> >>> KGroupedTable →
> >>>>>> KTable, as follows (emphasis mine):
> >>>>>>
> >>>>>> When subsequent non-null values are received for a key (e.g.,
> UPDATE),
> >>>>>> then (1) the subtractor is called with the old value as stored in
> the
> >>>>> table
> >>>>>> and (2) the adder is called with the new value of the input record
> >>> that
> >>>>> was
> >>>>>> just received. *The order of execution for the subtractor and adder
> is
> >>>>>> not defined.*
> >>>>>>
> >>>>>> My interpretation of that last line implies that one of three things
> >>> can
> >>>>>> happen:
> >>>>>>
> >>>>>>    1. subtractor can be called before adder
> >>>>>>    2. adder can be called before subtractor
> >>>>>>    3. adder and subtractor could be called at the same time
> >>>>>>
> >>>>>> Here is the question I'm looking to get answered:
> >>>>>> *Are all 3 scenarios above actually possible when using the
> aggregate
> >>>>>> method on a KGroupedTable?*
> >>>>>> Or am I misinterpreting the documentation? For my use-case (detailed
> >>>>>> below), it would be ideal if the subtractor was always called before
> >>> the
> >>>>>> adder.
> >>>>>> ------------------------------
> >>>>>>
> >>>>>> *Why is this question important?*
> >>>>>>
> >>>>>> If the adder and subtractor are non-commutative operations and the
> >>> order
> >>>>>> in which they are executed can vary, you can end up with different
> >>>>> results
> >>>>>> depending on the order of execution of adder and subtractor. An
> >>> example
> >>>>> of
> >>>>>> a useful non-commutative operation would be something like if we’re
> >>>>>> aggregating records into a Set:
> >>>>>>
> >>>>>> .aggregate[Set[Animal]](Set.empty)(
> >>>>>>   adder = (zooKey, animalValue, setOfAnimals) => setOfAnimals +
> >>>>> animalValue,
> >>>>>>   subtractor = (zooKey, animalValue, setOfAnimals) => setOfAnimals -
> >>>>> animalValue
> >>>>>> )
> >>>>>>
> >>>>>> In this example, for duplicated events, if the adder is called
> before
> >>> the
> >>>>>> subtractor you would end up removing the value entirely from the set
> >>>>> (which
> >>>>>> would be problematic for most use-cases I imagine).
> >>>>>> ------------------------------
> >>>>>>
> >>>>>> *Why am I doubting the documentation (assuming my interpretation of
> >>> it is
> >>>>>> correct)?*
> >>>>>>
> >>>>>>    1. Seems like an unusual design choice
> >>>>>>    2. When I've run unit tests (using TopologyTestDriver and
> >>>>>>    EmbeddedKafka), I always see the subtractor is called before the
> >>>>> adder.
> >>>>>>    Unfortunately, if there is some kind of race condition involved,
> >>> it's
> >>>>>>    entirely possible that I would never hit the other scenarios.
> >>>>>>    3. I did try looking into the kafka-streams codebase as well. The
> >>>>>>    KTableProcessorSupplier that calls the user-supplied
> >>> adder/subtracter
> >>>>>>    functions appears to be this one:
> >>>>>>
> >>>>>
> >>>
> https://github.com/apache/kafka/blob/18547633697a29b690a8fb0c24e2f0289ecf8eeb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java#L81
> >>>>> and
> >>>>>>    on line 92, you can even see a comment saying "first try to
> remove
> >>>>> the old
> >>>>>>    value". Unfortunately, during my own testing, I saw was that the
> >>>>>>    process function itself is called twice; first with a Change<V>
> >>> value
> >>>>> that
> >>>>>>    includes only the old value and then the process function is
> called
> >>>>>>    again with a Change<V> value that includes only the new value. I
> >>>>>>    haven't been able to dig deep enough to find the internal code
> >>> that is
> >>>>>>    generating the old value record and the new value record (upon
> >>>>> receiving an
> >>>>>>    update) to determine if it actually produces those records in
> that
> >>>>> order.
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>

Reply via email to