[ 
https://issues.apache.org/jira/browse/KAFKA-18326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17909114#comment-17909114
 ] 

Stanislav Savulchik commented on KAFKA-18326:
---------------------------------------------

[~guozhang] , thanks for reviewing the PR.

Let me answer your 
[question|https://github.com/apache/kafka/pull/18287#pullrequestreview-2526664288]
 here:
{quote}For the original reported scenario where if deleting the second key "1" 
this bug gets exposed bug if deleting the first key "0" this bug will not be 
exposed, I'm a bit unclear how it would be the case. The only case I can think 
of is that the cache is small enough such that both "0" and "1" are flushed to 
store while "1" is in the cache.
{quote}
Indeed, my original example didn't explain how I created the state store 
records in the first place before deleting one of them to reproduce the bug.

There is a topology with two custom processors and a shared key value state 
store connected to both processors.

The first processor handles a very small rate of input records and does (in 
order):
 # store.put(key, value or null)
 # store.prefixScan(keyPrefix)

The second processor handles a very large rate of input records (various 
keyPrefixes) and does (in order):
 # store.prefixScan(keyPrefix)
 # store.putAll(keyValues)

I believe that by the time the first processor did a delete in a form of 
store.put(key, null) and store.prefixScan(keyPrefix) that key wasn't in the 
cache (and other keys with its keyPrefix) because it had already been evicted 
from cache by other reads and updates in the second processor.

I hope that helps to clarify the scenario I had. 

> Cached stores may return deleted values
> ---------------------------------------
>
>                 Key: KAFKA-18326
>                 URL: https://issues.apache.org/jira/browse/KAFKA-18326
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Almog Gavra
>            Assignee: Almog Gavra
>            Priority: Critical
>         Attachments: range-scan-fix.patch
>
>
> Reported in community Slack by Stanislav Savulchik. I've attached a patch 
> fix, and am waiting for the reporter to submit a PR if they want to since 
> they were the first to identify it!
> It affects basically every version of Kafka Streams out there...
> -----
> Hi everyone,
> I’m investigating an unexpected behavior of a KeyValueStore.prefixScan  
> method that sometimes returns previously deleted keys if caching is enabled. 
> Example pseudocode:
> {code:java}
>  val keyPrefixSerializer: Serializer[Int] = ??? // 4 bytes big endian
> val store: KeyValueStore[(Int, String), String] = ???
> // store contents
> // (1, "A") -> "A"
> // (1, "B") -> "B"
> // using put instead of delete to avoid reading previous value
> store.put((1, "B"), null)
> // reading all key value pairs using key prefix
> val result: List[KeyValue[(Int, String), String]] = 
>     store.prefixScan(1, keyPrefixSerializer).asScala.toList
> // expected result 
> // (1, "A") -> "A"
> // actual result 
> // (1, "A") -> "A"
> // (1, "B") -> "B" (was previously deleted, but returned by the 
> iterator){code}
> I tried to come up with a unit test for 
> MergedSortedCacheKeyValueBytesStoreIterator (returned by 
> KeyValueStore.prefixScan and other methods like range, all) in order to 
> reproduce the behavior. And it also showed that the iterator returns more 
> items than expected if I delete a larger key:
> {code:java}
>  @Test
>     public void shouldSkipAllDeletedFromCache1() {
>         final byte[][] bytes = {{0}, {1}};
>         for (final byte[] aByte : bytes) {
>             store.put(Bytes.wrap(aByte), aByte);
>         }
>         cache.put(namespace, Bytes.wrap(bytes[1]), new LRUCacheEntry(null)); 
> // simulate key deletion from store that is cached
>         try (final MergedSortedCacheKeyValueBytesStoreIterator iterator = 
> createIterator()) {
>             assertArrayEquals(bytes[0], iterator.next().key.get());
>             assertFalse(iterator.hasNext()); // 
> org.opentest4j.AssertionFailedError: expected: <false> but was: <true>
>         }
>     }{code}
>   
> But if I delete a smaller key the test is successful:
> {code:java}
>  @Test
>     public void shouldSkipAllDeletedFromCache0() {
>         final byte[][] bytes = {{0}, {1}};
>         for (final byte[] aByte : bytes) {
>             store.put(Bytes.wrap(aByte), aByte);
>         }
>         cache.put(namespace, Bytes.wrap(bytes[0]), new LRUCacheEntry(null));
>         try (final MergedSortedCacheKeyValueBytesStoreIterator iterator = 
> createIterator()) {
>             assertArrayEquals(bytes[1], iterator.next().key.get());
>             assertFalse(iterator.hasNext());
>         }
>     }    
> Could someone help me verify if it is a bug or am I missing something?
> Thank you.{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to