[ 
https://issues.apache.org/jira/browse/KAFKA-6599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17299960#comment-17299960
 ] 

Guozhang Wang commented on KAFKA-6599:
--------------------------------------

Thanks [~aosipov] for your analysis! Though decouple caching v.s. flushing 
would help resolve this, I think a more general issue here is that the joined 
table is not materialized, and hence the join processor cannot determine the 
case if:

* the old joined value == new joined value, hence we do not need to emit 
anything.
* the old joined value != new joined value (including the case if new value is 
null), hence we should emit.

We've also discussed about this in 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams
 that today, for stateless and table-table join operators, since the result 
table is not materialized we cannot easily tackle this issue.

I think we could consider as an opt-in to make any operators that generate a 
table (including stateless and t-t joins) materialize as another state store, 
in that case we can resolve this issue, PLUS we do not need to execute the 
operators again to generate old values to send downstream when needed. cc 
@Chittaranjan Prasad who's currently thinking about this regard.

> KTable KTable join semantics violated when caching enabled
> ----------------------------------------------------------
>
>                 Key: KAFKA-6599
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6599
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Jan Filipiak
>            Priority: Critical
>              Labels: bug
>
> Say a tuple A,B got emmited after joining and the delete for A goes into the 
> cache. After that the B record would be deleted aswell. B's join processor 
> would look up A and see `null` while computing for old and new value (at this 
> point we can execute joiner with A beeing null and still emit something, but 
> its not gonna represent the actual oldValue) Then As cache flushes it doesn't 
> see B so its also not gonna put a proper oldValue. The output can then not be 
> used for say any aggregate as a delete would not reliably find its old 
> aggregate where it needs to be removed from filter will also break as it 
> stopps null,null changes from propagating. So for me it looks pretty clearly 
> that Caching with Join breaks KTable semantics. be it my new join or the 
> currently existing once.
>  
> this if branch here
> [https://github.com/apache/kafka/blob/1.0/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java#L155]
> is not usefull. I think its there because when one would delegate the true 
> case to the underlying. One would get proper semantics for streams, but the 
> weiredest cache I've seen.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to