[ 
https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094900#comment-17094900
 ] 

Georgi Petkov commented on KAFKA-9921:
--------------------------------------

[~ableegoldman]

Yeah, I agree that probably not much can be done in terms of caching (compared 
to the options without _retainDuplicates_).

I totally agree that many of the features like the null value behavior are 
correct and make perfect sense from point of view of the features implemented 
with it. Still, it's strange from the perspective where you use it standalone. 
*1-2 sentences clarifying the behavior with null values in the 
_WindowStateStore_ documentation could definitely help.* In addition, as I said 
if this is the desired behavior *you can easily skip calling RocksDB for null 
values (when using _retainDuplicates)_. This would make both the intention 
clearer and obviously avoid unnecessary calls.*

I do need exactly stream-stream join but without the repartition part. I want 
to get matches when there are new events in whichever stream and I also use 
_WindowStateStore_ only for the retention policy. In fact, due to the lack of 
many examples, I was looking at the stream-stream join implementation to find 
out how to correctly use the _WindowStateStores_. I'm building a library for 
some common yet not trivial at all operations on streams that you may need like 
topological sorting. Therefore I don't know if the user will provide null 
values or not. I was curious about the behavior with null values so I know what 
I'm providing to the user. I've tested it and that's how I found out what is 
the exact behavior.

*I'm not sure that an in-memory or any custom state store will make it.* Yes, 
in-memory will help with the efficient append because it avoids any expensive 
call and serializations/deserializations. Nevertheless, *you will always have 
the serializations/deserializations somewhere and this is the changelog topic 
and there you have also bandwidth* (not just precious processing time). Even if 
the list is fixed to let's say only 5 items you will still have 15 (1 + 2 + 3 + 
4 + 5) events recorded instead of 5. Obviously the size grows pretty fast - 
O(n^2). Combined with the fact that I want to provide a library to many 
different users (and duplicates count may vary a lot between usages) *to me 
it's best to implement just as in the stream-stream join - with duplicates*. 
Still, it was a great discussion and made me more confident in my decisions. 
Thank you for your assistance.

*Regarding the PR - it adds the same code to both _WindowStoreBuilder.java_ and 
_TimestampedWindowStoreBuilder.java_ but adds a test for only one of them.*

> Caching is not working properly with WindowStateStore when rataining 
> duplicates
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-9921
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9921
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.5.0
>            Reporter: Georgi Petkov
>            Assignee: Sophie Blee-Goldman
>            Priority: Major
>             Fix For: 2.6.0, 2.5.1
>
>
> I'm using the current latest version 2.5.0 but this is not something new.
> I have _WindowStateStore_ configured as following (where _true_ stands for 
> the _retainDuplicates_ paramter):
>  _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
> retentionPeriod, windowSize, *true*), keySerde, 
> valueSerde)*.withCachingEnabled()*)_
> If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
> order when reading them through the iterator I'll get the values *4, 2, 3, 4*.
>  I've done a bit of investigation myself and the problem is that *the whole 
> caching feature is written without consideration of the case where duplicates 
> are retained*.
> The observed behavior is due to having the last value in the cache (and it 
> can have only one since it's not aware of the retain duplicates option) and 
> it is read first (while skipping the first from the RocksDB iterator even 
> though the values are different). This can be observed (for version 2.5.0) in 
> _AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 
> values are read from the RocksDB iterator so they are as expected.
> As I said, the whole feature is not considering the _retainDuplicates_ option 
> so there are other examples of incorrect behavior like in 
> _AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you 
> would skip one duplicate entry in the RocksDB iterator for the given key.
> In my use case, I want to persist a list of values for a given key without 
> increasing the complexity to linear for a single event (which would be the 
> case if I was always reading the current list appending one value and writing 
> it back). So I go for _List<KeyValuePair<K, V>>_ instead of _KeyValuePair<K, 
> List<V>>_. The whole use case is more complex than that so I use 
> _#transformValues_ and state stores.
> So as an impact I can't use caching on my state stores. For others - they'll 
> have incorrect behavior that may take a lot of time to be discovered and even 
> more time to fix the results.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to