The other follow-up question I have is related to this part of your
statement:
```
also, even if the key did not change, both records might be interleaved by
other records...
```
I take it this means that, if I were joining a KStream with this KTable, it
is entirely possible that a KStream record could join with the KTable in a
state where the subtractor has been already executed but the adder has not
yet been executed for the corresponding KTable-update event?

Cheers,
FQ

On Mon, 1 Feb 2021 at 11:53, Fq Public <[email protected]> wrote:

> Hiya Matthias, Alexandre,
>
> Thanks for your detailed responses. Your explanation about why the order
> of execution in the `KGroupedTable.aggregate` method does not matter (so
> much as what's happening in the `KTable.groupBy` method) makes sense to me.
>
> I have one follow-up question regarding this part of your statement:
>
> The only guarantee we can provide is (given that you configured the
> producer correctly to avoid re-ordring during send()), that if the
> grouping key does not change, the send of the old and new value will not
> be re-ordered relative to each other. The order of the send is hard-coded in 
> the upstream processor though: 
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java#L93-L99
>
> What would be an example of an incorrect configuration I could apply that
> would result in re-ordering during the `send()` call?
> As you said, "the order of the send is hard-coded in the upstream
> processor" so I'm struggling to come up with an example here.
>
> Cheers,
> FQ
>
> On Fri, 29 Jan 2021 at 17:11, Matthias J. Sax <[email protected]> wrote:
>
>> What Alex says.
>>
>> The order is hard-coded (ie not race condition), but there is no
>> guarantee that we won't change the order in future releases without
>> notice (ie, it's not a public contract and we don't need to do a KIP to
>> change it). I guess there would be a Jira about it... But as a matter of
>> fact, it does not really matter (detail below).
>>
>> For the three scenarios you mentioned, the 3rd one cannot happen though:
>> We execute an aggregator in a single thread (per shard) and thus we
>> either call the adder or subtractor first.
>>
>>
>>
>> > 1. Seems like an unusual design choice
>>
>> Why do you think so?
>>
>>
>>
>> > first with a Change<V> value that includes only
>> > the old value and then the process function is called again with a
>> Change<V>
>> > value that includes only the new value.
>>
>> In general, both records might be processed by different threads and
>> thus we cannot only send one record. It's just that the TTD simulates a
>> single threaded execution thus both records always end up in the same
>> processor.
>>
>> Cf
>>
>> https://stackoverflow.com/questions/54372134/topologytestdriver-sending-incorrect-message-on-ktable-aggregations
>>
>> However, the order actually only matters if both records really end up
>> in the same processor (if the grouping key did not change during the
>> upstream update).
>>
>> Furthermore, the order actually depends not on the downstream aggregate
>> implementation, but on the order of writes into the repartitions topic
>> of the `groupBy()` and with multiple parallel upstream processor, those
>> writes are interleaved anyway. Thus, in general, you should think of the
>> "add" and "subtract" part as independent entities and not make any
>> assumption about their order (also, even if the key did not change, both
>> records might be interleaved by other records...)
>>
>> The only guarantee we can provide is (given that you configured the
>> producer correctly to avoid re-ordring during send()), that if the
>> grouping key does not change, the send of the old and new value will not
>> be re-ordered relative to each other. The order of the send is
>> hard-coded in the upstream processor though:
>>
>>
>> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java#L93-L99
>>
>> Thus, the order of the downstream aggregate processor is actually
>> meaningless.
>>
>>
>>
>> -Matthias
>>
>> On 1/29/21 6:50 AM, Alexandre Brasil wrote:
>> > From the source code in KGroupedTableImpl, the subtractor is always
>> called
>> > before the adder. By not guaranteeing the order, I think the devs meant
>> > that it might change on future versions of Kafka Streams (although I'd
>> > think it's unlikely to).
>> >
>> > I have use cases similars with your example, and that phrase worries me
>> a
>> > bit too. :)
>> >
>> > On Thu, Jan 28, 2021, 22:31 Fq Public <[email protected]> wrote:
>> >
>> >> Hi everyone! I posted this same question on stackoverflow
>> >> <
>> >>
>> https://stackoverflow.com/questions/65888756/clarify-the-order-of-execution-for-the-subtractor-and-adder-is-not-defined
>> >>>
>> >> a few days ago but didn't get any responses. Was hoping someone here
>> might
>> >> be able to help clarify this part of the documentation for me :)
>> >>
>> >> On Thu, 28 Jan 2021 at 19:50, Fq Public <[email protected]> wrote:
>> >>
>> >>> The Streams DSL documentation
>> >>> <
>> >>
>> https://kafka.apache.org/27/documentation/streams/developer-guide/dsl-api.html#aggregating
>> >
>> >> includes
>> >>> a caveat about using the aggregate method to transform a
>> KGroupedTable →
>> >>> KTable, as follows (emphasis mine):
>> >>>
>> >>> When subsequent non-null values are received for a key (e.g., UPDATE),
>> >>> then (1) the subtractor is called with the old value as stored in the
>> >> table
>> >>> and (2) the adder is called with the new value of the input record
>> that
>> >> was
>> >>> just received. *The order of execution for the subtractor and adder is
>> >>> not defined.*
>> >>>
>> >>> My interpretation of that last line implies that one of three things
>> can
>> >>> happen:
>> >>>
>> >>>    1. subtractor can be called before adder
>> >>>    2. adder can be called before subtractor
>> >>>    3. adder and subtractor could be called at the same time
>> >>>
>> >>> Here is the question I'm looking to get answered:
>> >>> *Are all 3 scenarios above actually possible when using the aggregate
>> >>> method on a KGroupedTable?*
>> >>> Or am I misinterpreting the documentation? For my use-case (detailed
>> >>> below), it would be ideal if the subtractor was always called before
>> the
>> >>> adder.
>> >>> ------------------------------
>> >>>
>> >>> *Why is this question important?*
>> >>>
>> >>> If the adder and subtractor are non-commutative operations and the
>> order
>> >>> in which they are executed can vary, you can end up with different
>> >> results
>> >>> depending on the order of execution of adder and subtractor. An
>> example
>> >> of
>> >>> a useful non-commutative operation would be something like if we’re
>> >>> aggregating records into a Set:
>> >>>
>> >>> .aggregate[Set[Animal]](Set.empty)(
>> >>>   adder = (zooKey, animalValue, setOfAnimals) => setOfAnimals +
>> >> animalValue,
>> >>>   subtractor = (zooKey, animalValue, setOfAnimals) => setOfAnimals -
>> >> animalValue
>> >>> )
>> >>>
>> >>> In this example, for duplicated events, if the adder is called before
>> the
>> >>> subtractor you would end up removing the value entirely from the set
>> >> (which
>> >>> would be problematic for most use-cases I imagine).
>> >>> ------------------------------
>> >>>
>> >>> *Why am I doubting the documentation (assuming my interpretation of
>> it is
>> >>> correct)?*
>> >>>
>> >>>    1. Seems like an unusual design choice
>> >>>    2. When I've run unit tests (using TopologyTestDriver and
>> >>>    EmbeddedKafka), I always see the subtractor is called before the
>> >> adder.
>> >>>    Unfortunately, if there is some kind of race condition involved,
>> it's
>> >>>    entirely possible that I would never hit the other scenarios.
>> >>>    3. I did try looking into the kafka-streams codebase as well. The
>> >>>    KTableProcessorSupplier that calls the user-supplied
>> adder/subtracter
>> >>>    functions appears to be this one:
>> >>>
>> >>
>> https://github.com/apache/kafka/blob/18547633697a29b690a8fb0c24e2f0289ecf8eeb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java#L81
>> >> and
>> >>>    on line 92, you can even see a comment saying "first try to remove
>> >> the old
>> >>>    value". Unfortunately, during my own testing, I saw was that the
>> >>>    process function itself is called twice; first with a Change<V>
>> value
>> >> that
>> >>>    includes only the old value and then the process function is called
>> >>>    again with a Change<V> value that includes only the new value. I
>> >>>    haven't been able to dig deep enough to find the internal code
>> that is
>> >>>    generating the old value record and the new value record (upon
>> >> receiving an
>> >>>    update) to determine if it actually produces those records in that
>> >> order.
>> >>>
>> >>>
>> >>
>> >
>>
>

Reply via email to