[ 
https://issues.apache.org/jira/browse/KAFKA-9916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094777#comment-17094777
 ] 

Matthias J. Sax commented on KAFKA-9916:
----------------------------------------

The original example was slightly different:
{code:java}
KStream stream = ...
stream.filter((k,v) -> { v.setA("a"); return true; });
stream.filter((k,v) -> ...);{code}
For this case, the filters are not chained but executed in parallel, what 
basically is a broadcast pattern, ie, each record of `stream` is piped into 
both filters; conceptually, we would need the duplicate the input record, 
however as an optimization, we don't copy by only pass the same object twice.

> Materialize Table-Table Join Result to Avoid Performing Same Join Twice
> -----------------------------------------------------------------------
>
>                 Key: KAFKA-9916
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9916
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 2.5.0
>            Reporter: Bruno Cadonna
>            Priority: Major
>
> If a table-table join processor performs a join and the join needs to forward 
> downstream the old join result (e.g. due to an aggregation operation 
> downstream), it performs the same join (i.e. calls the {{ValueJoiner}}) twice.
> Given a left value {{L1}}, a right value {{R1}}, and a new right value {{R2}} 
> with the same keys and input into the join operation in this order, the join 
> processor at some point will join {{L1}} with {{R1}}. When the new right 
> value {{R2}} triggers the join, it will join {{L1}} with {{R2}} and again 
> {{L1}} with {{R1}}.
> We could avoid calling the {{ValueJoiner}} twice by materializing the join 
> result. We would trade a call to the {{ValueJoiner}} with a lookup into a 
> state store. Depending on the logic in the {{ValueJoiner}} this may or may 
> not improve the performance. However, calling the {{ValueJoiner}} once will 
> only access the input values of the {{ValueJoiner}} once, which avoids the 
> need to copy the input values each time the {{ValueJoiner}} is called. For 
> example, consider the following {{ValueJoiner}}:
> {code:java}
> private ComplexValue eventFeesJoin(ComplexValue leftValue, Long rightValue) {
>         leftValue.setSomeValue(rightValue);
>         return leftValue;
> }
> {code}
> With this {{ValueJoiner}}, {{setSomeValue(rightValue)}} will be called twice 
> when {{R2}} trigger the join, the first time with {{R2}} and the second time 
> with {{R1}}. That means, {{R2}} will be overwritten by {{R1}}, which is 
> probably not what the users want. To get the correct result,  the 
> {{ValueJoiner}} should be implemented as follows:
>   
> {code:java}
> private ComplexValue eventFeesJoin(ComplexValue leftValue, Long rightValue) {
>         ComplexValue copy = copy(leftValue);
>         copy.setSomeValue(rightValue);
>         return copy;
> }
> {code}
> Copying values during joins could be avoided if the join result were 
> materialized. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to