[
https://issues.apache.org/jira/browse/KAFKA-6599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17298101#comment-17298101
]
Alexey Osipov commented on KAFKA-6599:
--------------------------------------
Stepped into this bug recently using kafka version 2.7.0.
The point I want to emphasize is that this bug is not only connected with
oldValue propagation. The thing I'm concerned most is, when caching enabled,
KTable-KTable join produces incorrect result in case both A and B are set to
null in a short time frame.
Here is the expected timeline for join result updates given the A and B values:
||Timestamp||A||B||A join B||
|0|null|null|null|
|1|A| |(no change - still null)|
|2| |B|AB|
|3|null| |null|
Here is what I see in reality when caching enabled:
||Timestamp||A||B||A join B||
|0|null|null|null|
|1|A| |(no change - still null)|
|2| |B|AB|
|3|null| |{color:#FF0000}(no change - still AB){color}|
|4| |null|{color:#FF0000}(no change - still AB){color}|
|5| | |{color:#FF0000}(no change - still AB){color}|
|6| | |{color:#FF0000}...{color}|
The problem is that "null" updates in timestamps 3 and 4 are got cached. When
"commit.interval.ms" expires, those nulls start propagation downstream. In
"org.apache.kafka.streams.kstream.internals.KTableKTableInnerJoin.KTableKTableJoinProcessor#process",
when A's "null" update is processed, it tries to get value of B for the same
key by calling "valueGetter.get(key)". That call returns null, because B's
value for the same key is already "null" (*cached*). Since B is null,
downstream propagation doesn't happen:
{noformat}
if (valueRight == null) {
return;
}{noformat}
Same thing happens when B's "null" update is processed. It does lookup for A's
value, it returns null from cache, and propagation doesn't happen.
*Without any further activity in A and B on that key, "A join B" permanently
remains "AB" (while it should be null).*
CC [~damianguy] [~mjsax] Not sure if this is you guys who I should ping on
this, I just found your names in similar tickets (KAFKA-4609, KAFKA-9916).
Please redirect this information to whoever you think is appropriate. Thanks!
> KTable KTable join semantics violated when caching enabled
> ----------------------------------------------------------
>
> Key: KAFKA-6599
> URL: https://issues.apache.org/jira/browse/KAFKA-6599
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Reporter: Jan Filipiak
> Priority: Critical
> Labels: bug
>
> Say a tuple A,B got emmited after joining and the delete for A goes into the
> cache. After that the B record would be deleted aswell. B's join processor
> would look up A and see `null` while computing for old and new value (at this
> point we can execute joiner with A beeing null and still emit something, but
> its not gonna represent the actual oldValue) Then As cache flushes it doesn't
> see B so its also not gonna put a proper oldValue. The output can then not be
> used for say any aggregate as a delete would not reliably find its old
> aggregate where it needs to be removed from filter will also break as it
> stopps null,null changes from propagating. So for me it looks pretty clearly
> that Caching with Join breaks KTable semantics. be it my new join or the
> currently existing once.
>
> this if branch here
> [https://github.com/apache/kafka/blob/1.0/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java#L155]
> is not usefull. I think its there because when one would delegate the true
> case to the underlying. One would get proper semantics for streams, but the
> weiredest cache I've seen.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)