Bruno Cadonna created KAFKA-9916:
------------------------------------

             Summary: Materialize Table-Table Join Result to Avoid Performing 
Same Join Twice
                 Key: KAFKA-9916
                 URL: https://issues.apache.org/jira/browse/KAFKA-9916
             Project: Kafka
          Issue Type: Improvement
          Components: streams
    Affects Versions: 2.5.0
            Reporter: Bruno Cadonna


If a table-table join processor performs a join and the join needs to forward 
downstream the old join result (e.g. due to an aggregation operation 
downstream), it performs the same join (i.e. calls the {{ValueJoiner}}) twice.

Given a left value {{L1}}, a right value {{R1}}, and a new right value {{R2}} 
with the same keys and input into the join operation in this order, the join 
processor at some point will join {{L1}} with {{R1}}. When the new right value 
{{R2}} triggers the join, it will join {{L1}} with {{R2}} and again {{L1}} with 
{{R1}}.

We could avoid calling the {{ValueJoiner}} twice by materializing the join 
result. We would trade a call to the {{ValueJoiner}} with a lookup into a state 
store. Depending on the logic in the {{ValueJoiner}} this may or may not 
improve the performance. However, calling the {{ValueJoiner}} once will only 
access the input values of the {{ValueJoiner}} once, which avoids the need to 
copy the input values each time the {{ValueJoiner}} is called. For example, 
consider the following {{ValueJoiner}}:

{code:java}
private ComplexValue eventFeesJoin(ComplexValue leftValue, Long rightValue) {
        leftValue.setSomeValue(rightValue);
        return leftValue;
}
{code}

With this {{ValueJoiner}}, {{setSomeValue(rightValue)}} will be called twice 
when {{R2}} trigger the join, the first time with {{R2}} and the second time 
with {{R1}}. That means, {{R2}} will be overwritten by {{R1}}, which is 
probably not what the users want. To get the correct result,  the 
{{ValueJoiner}} should be implemented as follows:
  
{code:java}
private ComplexValue eventFeesJoin(ComplexValue leftValue, Long rightValue) {
        ComplexValue copy = copy(leftValue);
        copy.setSomeValue(rightValue);
        return copy;
}
{code}

Copying values during joins could be avoided if the join result were 
materialized. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to