[
https://issues.apache.org/jira/browse/KAFKA-9916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17091963#comment-17091963
]
Guozhang Wang edited comment on KAFKA-9916 at 4/24/20, 11:15 PM:
-----------------------------------------------------------------
Thanks [~cadonna] for filing the JIRA! I'd like to leave some more thoughts on
this:
I think the key here is not to always materialize the join-results, but only
materialize when the downstream operators requires its parent to
"sendOldValues". On the high-level argument, today we may propagate
"sendOldValues" a long way to parent and ancestor operators until
materialization is introduced. For example, if there's a topology:
{{A -> B -> C -> D}}
where D requires its parent C to sendOldValues, this may be traced back to B
and A, and then eventually requiring A to "materialize", and then sendOldValues
to B, then to C, then to D. Instead of doing that, we should consider either
materializing {{C}} directly and cut the trace from {{A -> B -> ..}}, or even
just let D to materialize itself to avoid requiring its direct parent {{C}} to
sendOldValues.
Today there are only the following scenarios that would require parents to send
old values:
1. KTable-KTable left / outer joins: we want to avoid sending unnecessary
tombstones and hence would need to know the old values. This is explained in
KIP-77.
2. KTable aggregation: we would want to require old values to be sent so that
we can subtract the old values.
3. KTable foreign-key join: we need both parents to send their old values for
optimization / correctness purposes.
---------------------------
The idea is that:
For 1, instead of requiring its parent to sendOldValues, we just materialize
the joined result and hence avoid the second `joiner.apply(... oldValue)` (and
hence can help fixing the issue described in this JIRA ticket). Of course with
chained joins this means we may materialize intermediate results and hence
incur larger IO cost; I think this can be further address by supporting N-way
joins more naturally than the chained manner.
For 2, the direct parent of the aggregation would then just materialize and not
propagate this `sendOldValue` to ancestors. I think may be we already did this
today, but if not yet, then we should fix this.
For 3, the left hand side sendOldValues are required for correctness, and we
can consider doing the same as 2); the right hand side sendOldValues are for
optimization and is not necessary anymore after KIP-557, and hence we can
remove that requiring statement later.
--------------------------------------
Also [~mjsax] pointed out to me that this issue of "modifying the field
directly" may still exist in other operators. For example:
{{stream.filter((k,v) -> { v.setA(“a”); return true: }).filter((k,v) -> …)
}}
The first filter would modify the object, and the second filter hence would not
see the original object but the "modified" object, which then could possibly
break the correctness.
was (Author: guozhang):
Thanks [~cadonna] for filing the JIRA! I'd like to leave some more thoughts on
this:
I think the key here is not to always materialize the join-results, but only
materialize when the downstream operators requires its parent to
"sendOldValues". On the high-level argument, today we may propagate
"sendOldValues" a long way to parent and ancestor operators until
materialization is introduced. For example, if there's a topology:
{{A -> B -> C -> D}}
where D requires its parent C to sendOldValues, this may be traced back to B
and A, and then eventually requiring A to "materialize", and then sendOldValues
to B, then to C, then to D. Instead of doing that, we should consider either
materializing {{C}} directly and cut the trace from {{A -> B -> ..}}, or even
just let D to materialize itself to avoid requiring its direct parent {{C}} to
sendOldValues.
Today there are only the following scenarios that would require parents to send
old values:
1. KTable-KTable left / outer joins: we want to avoid sending unnecessary
tombstones and hence would need to know the old values. This is explained in
KIP-77.
2. KTable aggregation: we would want to require old values to be sent so that
we can subtract the old values.
3. KTable foreign-key join: we need both parents to send their old values for
optimization / correctness purposes.
---------------------------
The idea is that:
For 1, instead of requiring its parent to sendOldValues, we just materialize
the joined result and hence avoid the second `joiner.apply(... oldValue)` (and
hence can help fixing the issue described in this JIRA ticket). Of course with
chained joins this means we may materialize intermediate results and hence
incur larger IO cost; I think this can be further address by supporting N-way
joins more naturally than the chained manner.
For 2, the direct parent of the aggregation would then just materialize and not
propagate this `sendOldValue` to ancestors. I think may be we already did this
today, but if not yet, then we should fix this.
For 3, the left hand side sendOldValues are required for correctness, and we
can consider doing the same as 2); the right hand side sendOldValues are for
optimization (and it seems we did not do this yet), and we should re-consider
whether this is a good trade-off to make.
--------------------------------------
Also [~mjsax] pointed out to me that this issue of "modifying the field
directly" may still exist in other operators. For example:
{{stream.filter((k,v) -> { v.setA(“a”); return true: }).filter((k,v) -> …)
}}
The first filter would modify the object, and the second filter hence would not
see the original object but the "modified" object, which then could possibly
break the correctness.
> Materialize Table-Table Join Result to Avoid Performing Same Join Twice
> -----------------------------------------------------------------------
>
> Key: KAFKA-9916
> URL: https://issues.apache.org/jira/browse/KAFKA-9916
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Affects Versions: 2.5.0
> Reporter: Bruno Cadonna
> Priority: Major
>
> If a table-table join processor performs a join and the join needs to forward
> downstream the old join result (e.g. due to an aggregation operation
> downstream), it performs the same join (i.e. calls the {{ValueJoiner}}) twice.
> Given a left value {{L1}}, a right value {{R1}}, and a new right value {{R2}}
> with the same keys and input into the join operation in this order, the join
> processor at some point will join {{L1}} with {{R1}}. When the new right
> value {{R2}} triggers the join, it will join {{L1}} with {{R2}} and again
> {{L1}} with {{R1}}.
> We could avoid calling the {{ValueJoiner}} twice by materializing the join
> result. We would trade a call to the {{ValueJoiner}} with a lookup into a
> state store. Depending on the logic in the {{ValueJoiner}} this may or may
> not improve the performance. However, calling the {{ValueJoiner}} once will
> only access the input values of the {{ValueJoiner}} once, which avoids the
> need to copy the input values each time the {{ValueJoiner}} is called. For
> example, consider the following {{ValueJoiner}}:
> {code:java}
> private ComplexValue eventFeesJoin(ComplexValue leftValue, Long rightValue) {
> leftValue.setSomeValue(rightValue);
> return leftValue;
> }
> {code}
> With this {{ValueJoiner}}, {{setSomeValue(rightValue)}} will be called twice
> when {{R2}} trigger the join, the first time with {{R2}} and the second time
> with {{R1}}. That means, {{R2}} will be overwritten by {{R1}}, which is
> probably not what the users want. To get the correct result, the
> {{ValueJoiner}} should be implemented as follows:
>
> {code:java}
> private ComplexValue eventFeesJoin(ComplexValue leftValue, Long rightValue) {
> ComplexValue copy = copy(leftValue);
> copy.setSomeValue(rightValue);
> return copy;
> }
> {code}
> Copying values during joins could be avoided if the join result were
> materialized.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)