[ https://issues.apache.org/jira/browse/KAFKA-9916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094777#comment-17094777 ]
Matthias J. Sax commented on KAFKA-9916: ---------------------------------------- The original example was slightly different: {code:java} KStream stream = ... stream.filter((k,v) -> { v.setA("a"); return true; }); stream.filter((k,v) -> ...);{code} For this case, the filters are not chained but executed in parallel, what basically is a broadcast pattern, ie, each record of `stream` is piped into both filters; conceptually, we would need the duplicate the input record, however as an optimization, we don't copy by only pass the same object twice. > Materialize Table-Table Join Result to Avoid Performing Same Join Twice > ----------------------------------------------------------------------- > > Key: KAFKA-9916 > URL: https://issues.apache.org/jira/browse/KAFKA-9916 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 2.5.0 > Reporter: Bruno Cadonna > Priority: Major > > If a table-table join processor performs a join and the join needs to forward > downstream the old join result (e.g. due to an aggregation operation > downstream), it performs the same join (i.e. calls the {{ValueJoiner}}) twice. > Given a left value {{L1}}, a right value {{R1}}, and a new right value {{R2}} > with the same keys and input into the join operation in this order, the join > processor at some point will join {{L1}} with {{R1}}. When the new right > value {{R2}} triggers the join, it will join {{L1}} with {{R2}} and again > {{L1}} with {{R1}}. > We could avoid calling the {{ValueJoiner}} twice by materializing the join > result. We would trade a call to the {{ValueJoiner}} with a lookup into a > state store. Depending on the logic in the {{ValueJoiner}} this may or may > not improve the performance. However, calling the {{ValueJoiner}} once will > only access the input values of the {{ValueJoiner}} once, which avoids the > need to copy the input values each time the {{ValueJoiner}} is called. For > example, consider the following {{ValueJoiner}}: > {code:java} > private ComplexValue eventFeesJoin(ComplexValue leftValue, Long rightValue) { > leftValue.setSomeValue(rightValue); > return leftValue; > } > {code} > With this {{ValueJoiner}}, {{setSomeValue(rightValue)}} will be called twice > when {{R2}} trigger the join, the first time with {{R2}} and the second time > with {{R1}}. That means, {{R2}} will be overwritten by {{R1}}, which is > probably not what the users want. To get the correct result, the > {{ValueJoiner}} should be implemented as follows: > > {code:java} > private ComplexValue eventFeesJoin(ComplexValue leftValue, Long rightValue) { > ComplexValue copy = copy(leftValue); > copy.setSomeValue(rightValue); > return copy; > } > {code} > Copying values during joins could be avoided if the join result were > materialized. -- This message was sent by Atlassian Jira (v8.3.4#803005)