[ https://issues.apache.org/jira/browse/KAFKA-13309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Luke Chen updated KAFKA-13309: ------------------------------ Description: We supported backward iterator for SessionStore in KAFKA-9929. But we cannot return the correct order when fetch/backwardFetch the key range when there are multiple records in the same session window. For example: We have a session window inactivity gap with 10 ms, and the records: key: "A", value: "AA", timestamp: 0 --> with SessionWindow(0, 0) key: "B", value: "BB", timestamp: 0 --> with SessionWindow(0, 0) key: "C", value: "CC", timestamp: 0 --> with SessionWindow(0, 0) key: "D" value: "DD", timestamp: 100 --> with SessionWindow(100, 100) So, when fetch("A" /\*key from\*/, "D" /\*key to\*/), we expected to have [A, B, C, D], but we'll have [C, B A, D ] And the reason is here: [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java#L276-L295] {code:java} public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo) { return registerNewIterator(keyFrom, keyTo, Long.MAX_VALUE, endTimeMap.entrySet().iterator(), false); // <-- the final param is "isFarwarded", which should be true for "fetch" case, and false for "backwardFetch" case } {code} We pass "false" in the "is forward" parameter for `fetch` method, and "true" for "backwardFetch" method, which obviously is wrong. was: We supported backward iterator for SessionStore in KAFKA-9929. But we cannot return the correct order when fetch/backwardFetch the key range when there are multiple records in the same session window. For example: We have a session window inactivity gap with 10 ms, and the records: key: "A", value: "AA", timestamp: 0 --> with SessionWindow(0, 0) key: "B", value: "BB", timestamp: 0 --> with SessionWindow(0, 0) key: "C", value: "CC", timestamp: 0 --> with SessionWindow(0, 0) key: "D" value: "DD", timestamp: 100 --> with SessionWindow(100, 100) So, when fetch("A" /*key from*/, "D" /*key to*/), we expected to have [A, B, C, D\], but we'll have [C, B A, D \] And the reason is here: [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java#L276-L295] {code:java} public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo) { return registerNewIterator(keyFrom, keyTo, Long.MAX_VALUE, endTimeMap.entrySet().iterator(), false); // <-- the final param is "isFarwarded", which should be true for "fetch" case, and false for "backwardFetch" case } {code} We pass "false" in the "is forward" parameter for `fetch` method, and "true" for "backwardFetch" method, which obviously is wrong. > InMemorySessionStore#fetch/backwardFetch doesn't return in correct order > ------------------------------------------------------------------------ > > Key: KAFKA-13309 > URL: https://issues.apache.org/jira/browse/KAFKA-13309 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.0.0 > Reporter: Luke Chen > Assignee: Luke Chen > Priority: Major > > We supported backward iterator for SessionStore in KAFKA-9929. But we cannot > return the correct order when fetch/backwardFetch the key range when there > are multiple records in the same session window. > For example: > We have a session window inactivity gap with 10 ms, and the records: > key: "A", value: "AA", timestamp: 0 --> with SessionWindow(0, 0) > key: "B", value: "BB", timestamp: 0 --> with SessionWindow(0, 0) > key: "C", value: "CC", timestamp: 0 --> with SessionWindow(0, 0) > key: "D" value: "DD", timestamp: 100 --> with SessionWindow(100, 100) > > So, when fetch("A" /\*key from\*/, "D" /\*key to\*/), we expected to have [A, > B, C, D], but we'll have [C, B A, D ] > > And the reason is here: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java#L276-L295] > > {code:java} > public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes keyFrom, > final Bytes keyTo) { > return registerNewIterator(keyFrom, keyTo, Long.MAX_VALUE, > endTimeMap.entrySet().iterator(), false); // <-- the final param is > "isFarwarded", which should be true for "fetch" case, and false for > "backwardFetch" case > } > {code} > We pass "false" in the "is forward" parameter for `fetch` method, and "true" > for "backwardFetch" method, which obviously is wrong. > -- This message was sent by Atlassian Jira (v8.3.4#803005)