[ https://issues.apache.org/jira/browse/KAFKA-7158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16567072#comment-16567072 ]
Christian Henry commented on KAFKA-7158: ---------------------------------------- Sorry for the delay, finally got some time to provide a minimal example. I found that the easiest way to set up and share a minimal example was via running stuff through the confluent platform, [https://www.confluent.io/download/]. I'm using v5.0.0 of that, which uses kafka 2.0.0. So my steps are: 1) start platform with defaults {code:java} ./bin/confluent start{code} 2) create input topic {code:java} bin/kafka-topics --create --topic duplicate-examples-input --zookeeper localhost:2181 --partitions 1 --replication-factor 1{code} 3) Run [https://gist.github.com/cah6/adc2c52514f5386597a4bba6c429ff63] to create application and monitor store values. I put my example in a local clone of [https://github.com/confluentinc/kafka-streams-examples] so that I didn't have to worry about project setup at all. Wait for "In init..." to get printed out. 4) Run [https://gist.github.com/cah6/04c09cc9747394d38182078d32b2a2d0] to push some values. Do this a few times, spaced apart by 5-10 seconds. Running the driver 3 times on my machine resulted in this following. Final value SHOULD be 14: {code:java} In init: number of items in store is: 0 Number of items in store is: 0 Number of items in store is: 1 Number of items in store is: 2 Number of items in store is: 3 Number of items in store is: 4 Number of items in store is: 6 Number of items in store is: 7 Number of items in store is: 8 Number of items in store is: 9 Number of items in store is: 10 Number of items in store is: 17 Number of items in store is: 18 Number of items in store is: 19 Number of items in store is: 20 Number of items in store is: 21 {code} Note that the incremental values in each run were fine, but somehow the number of objects in the store jumps between runs. > Duplicates when searching kafka stream state store with caching > --------------------------------------------------------------- > > Key: KAFKA-7158 > URL: https://issues.apache.org/jira/browse/KAFKA-7158 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.1.0 > Reporter: Christian Henry > Priority: Major > > See mailing list email with same name for initial discussion, reposting my > initial email here for convenience: > {noformat} > We have a kafka stream application, and one of our transform steps keeps a > state store to filter out messages with a previously seen GUID. That is, our > transform looks like: > public KeyValue<byte[], String> transform(byte[] key, String guid) { > try (WindowStoreIterator<DuplicateMessageMetadata> iterator = > duplicateStore.fetch(correlationId, start, now)) { > if (iterator.hasNext()) { > return null; > } else { > duplicateStore.put(correlationId, some metadata); > return new KeyValue<>(key, message); > } > } > } > where the duplicateStore is a persistent windowed store with caching enabled. > I was debugging some tests and found that sometimes when calling all() or > fetchAll() on the duplicate store and stepping through the iterator, it would > return the same guid more than once, even if it was only inserted into the > store once. More specifically, if I had the following guids sent to the > stream: [11111, 22222, ... 99999] (for 9 values total), sometimes it would > return 10 values, with one (or more) of the values being returned twice by > the iterator. However, this would not show up with a fetch(guid) on that > specific guid. For instance, if 11111 was being returned twice by fetchAll(), > calling duplicateStore.fetch("11111", start, end) will still return an > iterator with size of 1. > I dug into this a bit more by setting a breakpoint in > SegmentedCacheFunction#compareSegmentedKeys(cacheKey, storeKey) and watching > the two input values as I looped through the iterator using > "while(iterator.hasNext()) { print(iterator.next()) }". In one test, the > duplicate value was 66666, and saw the following behavior (trimming off the > segment values from the byte input): > -- compareSegmentedKeys(cacheKey = 66666, storeKey = 22222) > -- next() returns 66666 > and > -- compareSegmentedKeys(cacheKey = 77777, storeKey = 66666) > -- next() returns 66666 > Besides those, the input values are the same and the output is as expected. > Additionally, a coworker noted that the number of duplicates always matches > the number of times Long.compare(cacheSegmentId, storeSegmentId) returns a > non-zero value, indicating that duplicates are likely arising due to the > segment comparison. {noformat} > > Basically, what we're seeing is that if you have a persistent store with > caching enabled, you will sometimes get duplicate keys when querying for all > keys (using all() or fetchAll()) even though fetch(key) will only return one > result. That is, if you had a fresh store with nothing in it and did > something like: > {code:java} > IntStream.rangeClosed(1, 100).forEach(i -> store.put("key" + i, "value" + i)); > {code} > then calling > {code:java} > store.fetchAll(start, end) > {code} > would return an iterator with MORE than 100 items, whereas if you explicitly > did > {code:java} > store.fetch("key" + i) > {code} > for i = 1 to 100, each fetch would only return a single item in the iterator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)