[ 
https://issues.apache.org/jira/browse/KAFKA-7158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16567072#comment-16567072
 ] 

Christian Henry commented on KAFKA-7158:
----------------------------------------

Sorry for the delay, finally got some time to provide a minimal example. I 
found that the easiest way to set up and share a minimal example was via 
running stuff through the confluent platform, 
[https://www.confluent.io/download/]. I'm using v5.0.0 of that, which uses 
kafka 2.0.0.

So my steps are:

1) start platform with defaults
{code:java}
./bin/confluent start{code}
2) create input topic
{code:java}
bin/kafka-topics --create --topic duplicate-examples-input --zookeeper 
localhost:2181 --partitions 1 --replication-factor 1{code}
3) Run [https://gist.github.com/cah6/adc2c52514f5386597a4bba6c429ff63] to 
create application and monitor store values. I put my example in a local clone 
of [https://github.com/confluentinc/kafka-streams-examples] so that I didn't 
have to worry about project setup at all. Wait for "In init..." to get printed 
out.

4) Run [https://gist.github.com/cah6/04c09cc9747394d38182078d32b2a2d0] to push 
some values. Do this a few times, spaced apart by 5-10 seconds.

Running the driver 3 times on my machine resulted in this following. Final 
value SHOULD be 14:
{code:java}
In init: number of items in store is: 0
Number of items in store is: 0
Number of items in store is: 1
Number of items in store is: 2
Number of items in store is: 3
Number of items in store is: 4
Number of items in store is: 6
Number of items in store is: 7
Number of items in store is: 8
Number of items in store is: 9
Number of items in store is: 10
Number of items in store is: 17
Number of items in store is: 18
Number of items in store is: 19
Number of items in store is: 20
Number of items in store is: 21
{code}
Note that the incremental values in each run were fine, but somehow the number 
of objects in the store jumps between runs. 

 

> Duplicates when searching kafka stream state store with caching
> ---------------------------------------------------------------
>
>                 Key: KAFKA-7158
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7158
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0
>            Reporter: Christian Henry
>            Priority: Major
>
> See mailing list email with same name for initial discussion, reposting my 
> initial email here for convenience:
> {noformat}
> We have a kafka stream application, and one of our transform steps keeps a 
> state store to filter out messages with a previously seen GUID. That is, our 
> transform looks like:
> public KeyValue<byte[], String> transform(byte[] key, String guid) {
>     try (WindowStoreIterator<DuplicateMessageMetadata> iterator = 
> duplicateStore.fetch(correlationId, start, now)) {
>         if (iterator.hasNext()) {
>             return null;
>         } else {
>             duplicateStore.put(correlationId, some metadata);
>             return new KeyValue<>(key, message);
>         }
>     }
> }
> where the duplicateStore is a persistent windowed store with caching enabled. 
> I was debugging some tests and found that sometimes when calling all() or 
> fetchAll() on the duplicate store and stepping through the iterator, it would 
> return the same guid more than once, even if it was only inserted into the 
> store once. More specifically, if I had the following guids sent to the 
> stream: [11111, 22222, ... 99999] (for 9 values total), sometimes it would 
> return 10 values, with one (or more) of the values being returned twice by 
> the iterator. However, this would not show up with a fetch(guid) on that 
> specific guid. For instance, if 11111 was being returned twice by fetchAll(), 
> calling duplicateStore.fetch("11111", start, end) will still return an 
> iterator with size of 1. 
> I dug into this a bit more by setting a breakpoint in 
> SegmentedCacheFunction#compareSegmentedKeys(cacheKey, storeKey) and watching 
> the two input values as I looped through the iterator using 
> "while(iterator.hasNext()) { print(iterator.next()) }". In one test, the 
> duplicate value was 66666, and saw the following behavior (trimming off the 
> segment values from the byte input):
> -- compareSegmentedKeys(cacheKey = 66666, storeKey = 22222)
> -- next() returns 66666
> and 
> -- compareSegmentedKeys(cacheKey = 77777, storeKey = 66666)
> -- next() returns 66666
> Besides those, the input values are the same and the output is as expected. 
> Additionally, a coworker noted that the number of duplicates always matches 
> the number of times Long.compare(cacheSegmentId, storeSegmentId) returns a 
> non-zero value, indicating that duplicates are likely arising due to the 
> segment comparison. {noformat}
>  
> Basically, what we're seeing is that if you have a persistent store with 
> caching enabled, you will sometimes get duplicate keys when querying for all 
> keys (using all() or fetchAll()) even though fetch(key) will only return one 
> result. That is, if you had a fresh store with nothing in it and did 
> something like:
> {code:java}
> IntStream.rangeClosed(1, 100).forEach(i -> store.put("key" + i, "value" + i));
> {code}
> then calling
> {code:java}
> store.fetchAll(start, end)
> {code}
> would return an iterator with MORE than 100 items, whereas if you explicitly 
> did
> {code:java}
> store.fetch("key" + i)
> {code}
> for i = 1 to 100, each fetch would only return a single item in the iterator. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to