Christian Henry created KAFKA-7158:
--------------------------------------
Summary: Duplicates when searching kafka stream state store with
caching
Key: KAFKA-7158
URL: https://issues.apache.org/jira/browse/KAFKA-7158
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 1.1.0
Reporter: Christian Henry
See mailing list email with same name for initial discussion, reposting my
initial email here for convenience:
{noformat}
We have a kafka stream application, and one of our transform steps keeps a
state store to filter out messages with a previously seen GUID. That is, our
transform looks like:
public KeyValue<byte[], String> transform(byte[] key, String guid) {
try (WindowStoreIterator<DuplicateMessageMetadata> iterator =
duplicateStore.fetch(correlationId, start, now)) {
if (iterator.hasNext()) {
return null;
} else {
duplicateStore.put(correlationId, some metadata);
return new KeyValue<>(key, message);
}
}
}
where the duplicateStore is a persistent windowed store with caching enabled.
I was debugging some tests and found that sometimes when calling all() or
fetchAll() on the duplicate store and stepping through the iterator, it would
return the same guid more than once, even if it was only inserted into the
store once. More specifically, if I had the following guids sent to the stream:
[11111, 22222, ... 99999] (for 9 values total), sometimes it would return 10
values, with one (or more) of the values being returned twice by the iterator.
However, this would not show up with a fetch(guid) on that specific guid. For
instance, if 11111 was being returned twice by fetchAll(), calling
duplicateStore.fetch("11111", start, end) will still return an iterator with
size of 1.
I dug into this a bit more by setting a breakpoint in
SegmentedCacheFunction#compareSegmentedKeys(cacheKey, storeKey) and watching
the two input values as I looped through the iterator using
"while(iterator.hasNext()) { print(iterator.next()) }". In one test, the
duplicate value was 66666, and saw the following behavior (trimming off the
segment values from the byte input):
-- compareSegmentedKeys(cacheKey = 66666, storeKey = 22222)
-- next() returns 66666
and
-- compareSegmentedKeys(cacheKey = 77777, storeKey = 66666)
-- next() returns 66666
Besides those, the input values are the same and the output is as expected.
Additionally, a coworker noted that the number of duplicates always matches the
number of times Long.compare(cacheSegmentId, storeSegmentId) returns a non-zero
value, indicating that duplicates are likely arising due to the segment
comparison. {noformat}
Basically, what we're seeing is that if you have a persistent store with
caching enabled, you will sometimes get duplicate keys when querying for all
keys (using all() or fetchAll()) even though fetch(key) will only return one
result. That is, if you had a fresh store with nothing in it and did something
like:
{code:java}
IntStream.rangeClosed(1, 100).forEach(i -> store.put("key" + i, "value" + i));
{code}
then calling
{code:java}
store.fetchAll(start, end)
{code}
would return an iterator with MORE than 100 items, whereas if you explicitly did
{code:java}
store.fetch("key" + i)
{code}
for i = 1 to 100, each fetch would only return a single item in the iterator.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)