[ 
https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094176#comment-17094176
 ] 

Georgi Petkov commented on KAFKA-9921:
--------------------------------------

[~ableegoldman] WindowStateStores don't really offer updates (or deletes in 
that matter) at least when using _retainDuplicates_ so `idempotent updates` 
sounds inappropriate to me. For 2 puts I would expect 2 entries regardless if 
they accidentally match.

*I was unable to determine the expected bahavior when putting _null_ values in 
_WindowStateStore_ (from documentation).* It turns out behaving like the 
_KeyValueStore_ - just delete the existing entry unless using 
_retainDuplicates_ - then *nothing happens, neither null is persisted nor any 
entries are deleted*. I've debugged the code and it reaches all the way to 
calling delete in RocksDB, so I'm not sure this is intended (or at least could 
be skipped in this case). What do you think? Should I create a separate bug for 
that? What should be the expected behavior?

Is there some other efficient approach for keeping a list by key? In my case, 
the store key is not the partition key but a relation between events and I 
would like to avoid repartitioning. To be honest I had a really hard time 
finding the appropriate tools for the job. The API is very limited in 
operations or at least no matter how I turn this around it feels that this is 
not the most efficient way to do things. I have already partitioned data by a 
key that serves as a correlation ID (so data within a partition is 
self-contained). The problem could be summarized to "I need stream-stream join 
while avoiding repartitioning".
 * If I go with PAPI then I need an efficient retention policy - go with 
_WindowStateStore_ (and its not that pleasant API when all you need is the 
retention policy). Then I need an efficient persisting of values by key - 
retain duplicates (so you only append new values),  but no optimizations in 
terms of caching are possible. So far this seems like the best approach and 
this is what I'm doing.
 * If I go for stream-stream join this means to repartition both streams first 
since I have different keys to join by. This means 2 extra topics that won't be 
reused for the internally used _WindowStateStores_ (and I know that my data is 
partitioned well enough already). It would have been nice if I had the option 
to avoid repartitioning with a "my data is already properly partitioned / I 
know what I'm doing" option.
 * If I go with Clients API then I'm basically starting from scratch with API 
that is hard to use right and there are no state stores available.

Any advice? Am I missing something?

> Caching is not working properly with WindowStateStore when rataining 
> duplicates
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-9921
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9921
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.5.0
>            Reporter: Georgi Petkov
>            Assignee: Sophie Blee-Goldman
>            Priority: Major
>             Fix For: 2.6.0
>
>
> I'm using the current latest version 2.5.0 but this is not something new.
> I have _WindowStateStore_ configured as following (where _true_ stands for 
> the _retainDuplicates_ paramter):
>  _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
> retentionPeriod, windowSize, *true*), keySerde, 
> valueSerde)*.withCachingEnabled()*)_
> If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
> order when reading them through the iterator I'll get the values *4, 2, 3, 4*.
>  I've done a bit of investigation myself and the problem is that *the whole 
> caching feature is written without consideration of the case where duplicates 
> are retained*.
> The observed behavior is due to having the last value in the cache (and it 
> can have only one since it's not aware of the retain duplicates option) and 
> it is read first (while skipping the first from the RocksDB iterator even 
> though the values are different). This can be observed (for version 2.5.0) in 
> _AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 
> values are read from the RocksDB iterator so they are as expected.
> As I said, the whole feature is not considering the _retainDuplicates_ option 
> so there are other examples of incorrect behavior like in 
> _AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you 
> would skip one duplicate entry in the RocksDB iterator for the given key.
> In my use case, I want to persist a list of values for a given key without 
> increasing the complexity to linear for a single event (which would be the 
> case if I was always reading the current list appending one value and writing 
> it back). So I go for _List<KeyValuePair<K, V>>_ instead of _KeyValuePair<K, 
> List<V>>_. The whole use case is more complex than that so I use 
> _#transformValues_ and state stores.
> So as an impact I can't use caching on my state stores. For others - they'll 
> have incorrect behavior that may take a lot of time to be discovered and even 
> more time to fix the results.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to