[ 
https://issues.apache.org/jira/browse/KAFKA-13309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-13309:
------------------------------
    Description: 
We supported backward iterator for SessionStore in KAFKA-9929. But we cannot 
return the correct order when fetch/backwardFetch the key range when there are 
multiple records in the same session window.

For example:

We have a session window inactivity gap with 10 ms, and the records:

key: "A", value: "AA", timestamp: 0 --> with SessionWindow(0, 0)

key: "B", value: "BB", timestamp: 0 --> with SessionWindow(0, 0)

key: "C", value: "CC", timestamp: 0 --> with SessionWindow(0, 0)

key: "D" value: "DD", timestamp: 100 --> with SessionWindow(100, 100)

 

So, when fetch("A" /\*key from\*/, "D" /\*key to\*/), we expected to have [A, 
B, C, D], but we'll have [C, B A, D ]

 

And the reason is here:

[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java#L276-L295]

 
{code:java}
public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes keyFrom, 
final Bytes keyTo) {
                return registerNewIterator(keyFrom, keyTo, Long.MAX_VALUE, 
endTimeMap.entrySet().iterator(), false); // <-- the final param is 
"isFarwarded", which should be true for "fetch" case, and false for 
"backwardFetch" case
            }
{code}
We pass "false" in the "is forward" parameter for `fetch` method, and "true" 
for "backwardFetch" method, which obviously is wrong.

 

  was:
We supported backward iterator for SessionStore in KAFKA-9929. But we cannot 
return the correct order when fetch/backwardFetch the key range when there are 
multiple records in the same session window.

For example:

We have a session window inactivity gap with 10 ms, and the records:

key: "A", value: "AA", timestamp: 0 --> with SessionWindow(0, 0)

key: "B", value: "BB", timestamp: 0 --> with SessionWindow(0, 0)

key: "C", value: "CC", timestamp: 0 --> with SessionWindow(0, 0)

key: "D" value: "DD", timestamp: 100 --> with SessionWindow(100, 100)

 

So, when fetch("A" /*key from*/, "D" /*key to*/), we expected to have [A, B, C, 
D\], but we'll have [C, B A, D \]

 

And the reason is here:

[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java#L276-L295]

 
{code:java}
public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes keyFrom, 
final Bytes keyTo) {
                return registerNewIterator(keyFrom, keyTo, Long.MAX_VALUE, 
endTimeMap.entrySet().iterator(), false); // <-- the final param is 
"isFarwarded", which should be true for "fetch" case, and false for 
"backwardFetch" case
            }
{code}
We pass "false" in the "is forward" parameter for `fetch` method, and "true" 
for "backwardFetch" method, which obviously is wrong.

 


> InMemorySessionStore#fetch/backwardFetch doesn't return in correct order
> ------------------------------------------------------------------------
>
>                 Key: KAFKA-13309
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13309
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.0.0
>            Reporter: Luke Chen
>            Assignee: Luke Chen
>            Priority: Major
>
> We supported backward iterator for SessionStore in KAFKA-9929. But we cannot 
> return the correct order when fetch/backwardFetch the key range when there 
> are multiple records in the same session window.
> For example:
> We have a session window inactivity gap with 10 ms, and the records:
> key: "A", value: "AA", timestamp: 0 --> with SessionWindow(0, 0)
> key: "B", value: "BB", timestamp: 0 --> with SessionWindow(0, 0)
> key: "C", value: "CC", timestamp: 0 --> with SessionWindow(0, 0)
> key: "D" value: "DD", timestamp: 100 --> with SessionWindow(100, 100)
>  
> So, when fetch("A" /\*key from\*/, "D" /\*key to\*/), we expected to have [A, 
> B, C, D], but we'll have [C, B A, D ]
>  
> And the reason is here:
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java#L276-L295]
>  
> {code:java}
> public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes keyFrom, 
> final Bytes keyTo) {
>                 return registerNewIterator(keyFrom, keyTo, Long.MAX_VALUE, 
> endTimeMap.entrySet().iterator(), false); // <-- the final param is 
> "isFarwarded", which should be true for "fetch" case, and false for 
> "backwardFetch" case
>             }
> {code}
> We pass "false" in the "is forward" parameter for `fetch` method, and "true" 
> for "backwardFetch" method, which obviously is wrong.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to