[jira] [Commented] (KAFKA-5285) Optimize upper / lower byte range for key range scan on windowed stores
[ https://issues.apache.org/jira/browse/KAFKA-5285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16370505#comment-16370505 ] Guozhang Wang commented on KAFKA-5285: -- [~davispw][~xvrl] Thanks for your inputs. I think there are some mis-communications here about my proposal. I understand the issue that you raised, and hence proposing to always use the keyFrom only for lower range. For upper range I think Xavier's point is valid and I'd like to slightly modify my proposal accordingly. This time I'd try to propose them in a clearer way: 0. The following bullet points 1) and 2) are ONLY discussed for window stores, NOT session stores. 1. Note that within the [lowerRange, upperRange] iterating, we will still check the validity of each key within the range to determine whether or not if it should be returned. So the goal of setting lowerRange and upperRange is to make sure they are "safe" to include all possible keys. And from that point setting lowerRange as keyFrom should be safe, since lexicograpically all keys larger than keyFrom should have the its [key, timestamp, sequence] bytes representation larger than just [keyFrom] byte representation. Our current lowerRange, [keyFrom + ] is also safe but it is not "more efficient" than [keyFrom] since we know there will be not records at all in between these two lower bounds. On the other hand, the [keyFrom + timestampFrom] is not safe, as you pointed out: {code} The smallest byte sequence for that key will be keyFrom + S + minSuffix which is greater than keyFrom, but still smaller than keyFrom + minSuffix {code} As you mentioned, this key is still larger than {{keyFrom}} bytes. *I think {{keyFrom}} should be actually safe under all circumstances such that, any key larger than keyFrom should have its complete [key, timestamp, sequence] larger than the keyFrom bytes only, and hence they would all be included in the range for validation, please let me know if this is wrong.* 2. For the upper range, again the goal is to be "safe" first, and then see if we can do better on "efficiency". What I was proposing is MAX({{keyFrom + timestampFrom}}, {{keyTo + timestampTo}}), note it is NOT MAX({{keyFrom + timeEndEarliest}}, {{keyTo + timeStartLatest}}), as {{timeStartLatest}} and {{timeEndEarliest}} is only from session windows. So going back to your example, {{keyFrom + timestampFrom = A1000}}, {{keyTo + timestampTo = AABDFFF}}, so their larger value, {{AABDFFF}} will be selected as upper range. But Xavier's example exposed that {{AABDFFF}} is not a safe upper range either, since same records like {{AADFFF}} should be included in consideration but their bytes are larger than {{AABDFFF}}. *So I'm proposing to consider the following combinations, and pick the largest*: a). keyFrom + timestampTo b). keyTo + timestampTo Please let me know if you think this works. 3. This bullet point is for session store: for session store the current byte representation is {{key, session-start-time, session-end-time}}, where {{session-end-time >= session-start-time}}. Similarly to window store, for the lower range I'm proposing to use only {{key}} bytes, which is safe. For the upper range, similarly to my previous modifications on window stores, *we can consider the following combinations and choose the largest one*: a) keyFrom + timeStartLatest + MAX_LONG b) keyTo + timeStartLatest + MAX_LONG > Optimize upper / lower byte range for key range scan on windowed stores > --- > > Key: KAFKA-5285 > URL: https://issues.apache.org/jira/browse/KAFKA-5285 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Xavier Léauté >Assignee: Guozhang Wang >Priority: Major > Labels: performance > > The current implementation of {{WindowKeySchema}} / {{SessionKeySchema}} > {{upperRange}} and {{lowerRange}} does not make any assumptions with respect > to the other key bound (e.g. the upper byte bound does not depends on lower > key bound). > It should be possible to optimize the byte range somewhat further using the > information provided by the lower bound. > More specifically, by incorporating that information, we should be able to > eliminate the corresponding {{upperRangeFixedSize}} and > {{lowerRangeFixedSize}}, since the result should be the same if we implement > that optimization. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5285) Optimize upper / lower byte range for key range scan on windowed stores
[ https://issues.apache.org/jira/browse/KAFKA-5285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16369654#comment-16369654 ] Peter Davis commented on KAFKA-5285: Noting that after upgrading from 0.11.0.1 to 1.0.1 today, I'm seeing severely degraded performance of `(ReadOnly)SessionStore.fetch(key)` as well. Before we were only seeing the problem with `fetch(from,to)`. Browsed the source code and I didn't immediately see what changed between 0.11 and 1.0 there. (Another guess is it's a subtle side effect of some other change like perhaps https://issues.apache.org/jira/browse/KAFKA-4868 resulting in different compacted DB levels somehow?) Anyway, workaround for me is to use `findSessions(key, 0, System.currentTimeMillis() + )`, since the 0x00 bytes in a timestamp < Long.MAX_VALUE yield a few extra usable bytes of maxKey prefix. Both `ReadOnlySessionStore.fetch(...)` variants are entirely unusable for me at this time. > Without any additional information about the key length or or the lower > bound, we can only assume that keys are at least 1 byte, and that byte has to > be smaller or equal to the first byte of keyTo (i.e. our upper bound has to > start with the first byte of keyTo), so our best guess for and upper bound in > that case is ADFFF. Doing a range query with *one byte* of prefix will never give acceptable performance for any database with more than 8 keys(!), or in use cases where key prefixes are not randomly distributed (common in business applications). May I suggest a few options, not mutually exclusive, but in order of preference: 1. Optimize where fromKey and toKey are the same or have a common prefix. (Isn't that your minimum key length right there? I'm not really sure I understand why it's not just this simple. Note, this is the only case I personally care about.) 2. Deprecate the `fetch` variants in favor of `findSessions`, and document that using max=Long.MAX_VALUE is not recommended. Promote findSessions to ReadOnlySessionStore. (This at least gives a few more bytes of usable key prefix.) 3. Configuration for default timeStartLatest = currentTimeMillis() + . (Same benefit as #2) 4. Configure minimum key length. I don't like this because if natural keys are used (user names, human-readable business object references like "file number", etc.) then there isn't necessarily a good minimum key length that can be enforced by the application. And if there were, it'd likely vary by store, raising the question of how do you easily configure per-store configs. > Optimize upper / lower byte range for key range scan on windowed stores > --- > > Key: KAFKA-5285 > URL: https://issues.apache.org/jira/browse/KAFKA-5285 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Xavier Léauté >Assignee: Guozhang Wang >Priority: Major > Labels: performance > > The current implementation of {{WindowKeySchema}} / {{SessionKeySchema}} > {{upperRange}} and {{lowerRange}} does not make any assumptions with respect > to the other key bound (e.g. the upper byte bound does not depends on lower > key bound). > It should be possible to optimize the byte range somewhat further using the > information provided by the lower bound. > More specifically, by incorporating that information, we should be able to > eliminate the corresponding {{upperRangeFixedSize}} and > {{lowerRangeFixedSize}}, since the result should be the same if we implement > that optimization. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5285) Optimize upper / lower byte range for key range scan on windowed stores
[ https://issues.apache.org/jira/browse/KAFKA-5285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16367892#comment-16367892 ] Xavier Léauté commented on KAFKA-5285: -- [~davispw] yes I understand your concern, and that's part the reason I filed this JIRA in the first place. Currently our internal APIs assume that knowing keyTo and timeTo is sufficient to determine the upper bound. However, without information about the lower key, or timeFrom it is not possible to determine an optimal upper bound for the byte range. Let me illustrate the problem that [~guozhang] is trying to solve: Let's say we query the key range {{A1}} through {{AAB}}, and timestamps {{}} through {{DFFF}} (let's assume timestamps consist of 4 bytes for this example). We could have the following order: {code} A1 (A1 | ) AA (AA | ) AAA (AAA | ) AAB1234 (AAB | 1234) AAC321 (AA | C321) AADFFF (AA | DFFF) {code} That is, it is possible for keys smaller than keyTo to show up after the bytes for (keyTo+maxSuffix). This happens only if: # keys within the range can be shorter than keyTo. # The suffix (a.k.a. timestamp) bytes are greater than the bytes in the same position for keyTo Without any additional information about the key length or or the lower bound, we can only assume that keys are at least 1 byte, and that byte has to be smaller or equal to the first byte of keyTo (i.e. our upper bound has to start with the first byte of keyTo), so our best guess for and upper bound in that case is ADFFF. NB: [~guozhang] in this particular case MAX(keyFrom + timeEndEarliest, keyTo + timeStartLatest) = max(A1DFFF, AAB) = AAB would not work since it does not include the full time range for the AAB keys, or maybe I did not understand your suggestion. Any key smaller than keyTo that appears after (keyTo+maxSuffix) is necessarily a prefix of keyTo, so knowing the lower bound is only helpful if it shares a prefix with keyTo (e.g. if in this case keyFrom was {{7F}}, then the lower bound would not be helpful). However because the lower bound is {{A1}}, we now know keys starting with A have at least two bytes, and the second byte has to be an A based on the previous statement. This means me can reduce our upper bound to {{AAD}}. Also, if we have a smaller timeTo we can include any prefix of keyTo that does not contain any byte larger than the first byte of of timeTo, but it gets trickier once you have matching bytes in the timestamp and the key[ Let me think this though a bit more and I'll try to come up with something that will work optimally in most cases, but my hunch is that unless we introduce optimizations for stores with fixed size keys, or have some information about the minimum key length, we will end up scanning a large portion of the table for long time ranges. > Optimize upper / lower byte range for key range scan on windowed stores > --- > > Key: KAFKA-5285 > URL: https://issues.apache.org/jira/browse/KAFKA-5285 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Xavier Léauté >Assignee: Guozhang Wang >Priority: Major > Labels: performance > > The current implementation of {{WindowKeySchema}} / {{SessionKeySchema}} > {{upperRange}} and {{lowerRange}} does not make any assumptions with respect > to the other key bound (e.g. the upper byte bound does not depends on lower > key bound). > It should be possible to optimize the byte range somewhat further using the > information provided by the lower bound. > More specifically, by incorporating that information, we should be able to > eliminate the corresponding {{upperRangeFixedSize}} and > {{lowerRangeFixedSize}}, since the result should be the same if we implement > that optimization. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5285) Optimize upper / lower byte range for key range scan on windowed stores
[ https://issues.apache.org/jira/browse/KAFKA-5285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16367673#comment-16367673 ] Peter Davis commented on KAFKA-5285: Thank you Guozhang for looking into this. Good point Xavier; for my part, the serious issue was when using `ReadOnlySessionStore.fetch(from,to)` which I believe implies a minSuffix of 0. Anything would be better than the current behavior of querying the range [keyFrom, infinity], which loads not just a few extra sessions outside the timestamp range, but *half the entire database* :-) > Optimize upper / lower byte range for key range scan on windowed stores > --- > > Key: KAFKA-5285 > URL: https://issues.apache.org/jira/browse/KAFKA-5285 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Xavier Léauté >Assignee: Guozhang Wang >Priority: Major > Labels: performance > > The current implementation of {{WindowKeySchema}} / {{SessionKeySchema}} > {{upperRange}} and {{lowerRange}} does not make any assumptions with respect > to the other key bound (e.g. the upper byte bound does not depends on lower > key bound). > It should be possible to optimize the byte range somewhat further using the > information provided by the lower bound. > More specifically, by incorporating that information, we should be able to > eliminate the corresponding {{upperRangeFixedSize}} and > {{lowerRangeFixedSize}}, since the result should be the same if we implement > that optimization. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5285) Optimize upper / lower byte range for key range scan on windowed stores
[ https://issues.apache.org/jira/browse/KAFKA-5285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16367553#comment-16367553 ] Xavier Léauté commented on KAFKA-5285: -- {quote}# For lower range, we just use the {{keyFrom bytes}}, instead of {{keyFrom + minSuffix (s)}}. Since we know there would be no data in between these two keys at all, we can save some overhead of bytes alloc and puts.{quote} This assumption only holds if {{minSuffix}} only consists of zeros. If that is not the case, then there exists a suffix S that is smaller in order than minSuffix and is not a prefix of minSuffix. In that case I can construct a new key by appending this suffix to {{keyFrom}}, i.e. my new key is {{keyFrom + S}}. The smallest byte sequence for that key will be {{keyFrom + S + minSuffix}} which is greater than {{keyFrom}}, but still smaller than {{keyFrom + minSuffix}} > Optimize upper / lower byte range for key range scan on windowed stores > --- > > Key: KAFKA-5285 > URL: https://issues.apache.org/jira/browse/KAFKA-5285 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Xavier Léauté >Assignee: Guozhang Wang >Priority: Major > Labels: performance > > The current implementation of {{WindowKeySchema}} / {{SessionKeySchema}} > {{upperRange}} and {{lowerRange}} does not make any assumptions with respect > to the other key bound (e.g. the upper byte bound does not depends on lower > key bound). > It should be possible to optimize the byte range somewhat further using the > information provided by the lower bound. > More specifically, by incorporating that information, we should be able to > eliminate the corresponding {{upperRangeFixedSize}} and > {{lowerRangeFixedSize}}, since the result should be the same if we implement > that optimization. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5285) Optimize upper / lower byte range for key range scan on windowed stores
[ https://issues.apache.org/jira/browse/KAFKA-5285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16366331#comment-16366331 ] ASF GitHub Bot commented on KAFKA-5285: --- guozhangwang opened a new pull request #4576: KAFKA-5285: Use optimized upper and lower bound for window key schema [WIP] URL: https://github.com/apache/kafka/pull/4576 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Optimize upper / lower byte range for key range scan on windowed stores > --- > > Key: KAFKA-5285 > URL: https://issues.apache.org/jira/browse/KAFKA-5285 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Xavier Léauté >Assignee: Xavier Léauté >Priority: Major > Labels: performance > > The current implementation of {{WindowKeySchema}} / {{SessionKeySchema}} > {{upperRange}} and {{lowerRange}} does not make any assumptions with respect > to the other key bound (e.g. the upper byte bound does not depends on lower > key bound). > It should be possible to optimize the byte range somewhat further using the > information provided by the lower bound. > More specifically, by incorporating that information, we should be able to > eliminate the corresponding {{upperRangeFixedSize}} and > {{lowerRangeFixedSize}}, since the result should be the same if we implement > that optimization. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5285) Optimize upper / lower byte range for key range scan on windowed stores
[ https://issues.apache.org/jira/browse/KAFKA-5285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16366286#comment-16366286 ] Guozhang Wang commented on KAFKA-5285: -- [~davispw] [~xvrl] Thinking about this a bit more, I think we can provide a better heuristic to define the upper and lower bound for window store {{fetch(keyFrom, keyTo, timeFrom, timeTo)}} indeed, which is the following: 0. First of all, documentation wise we require users to make sure the serialized bytes of {{keyFrom}} is smaller than serialized bytes of {{keyTo}} lexicograpically. And inside this call, we first check the bytes for this condition. 1. For lower range, we just use the {{keyFrom bytes}}, instead of {{keyFrom + minSuffix (s)}}. Since we know there would be no data in between these two keys at all, we can save some overhead of bytes alloc and puts. 2. For upper range, we just consider the following two combinations: a). keyFrom + minSuffix b). keyTo + maxSuffix where minSuffix = timeFrom + seq0, maxSuffix = timeTo + seqMAX. And then we just pick the largest among these two. In all we would replace this with the current {{OrderedBytes}} implementations. Similarly, for session store {{fetch}}, we do the same thing, except for single key fetch we handle it more in a more optimized way: 0. For single key fetch, lower range = key + timeFrom + timFrom, upper range = key + timeTo + timeTo (this is because timeTo <= timeFrom, so they can be the bound of each other). 1. For range key fetch, lower range = keyFrom. 2. For range key fetch, upper range = MAX (keyFrom + timeFrom + timeFrom, keyTo + timeTo + timeTo). WDYT? > Optimize upper / lower byte range for key range scan on windowed stores > --- > > Key: KAFKA-5285 > URL: https://issues.apache.org/jira/browse/KAFKA-5285 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Xavier Léauté >Assignee: Xavier Léauté >Priority: Major > Labels: performance > > The current implementation of {{WindowKeySchema}} / {{SessionKeySchema}} > {{upperRange}} and {{lowerRange}} does not make any assumptions with respect > to the other key bound (e.g. the upper byte bound does not depends on lower > key bound). > It should be possible to optimize the byte range somewhat further using the > information provided by the lower bound. > More specifically, by incorporating that information, we should be able to > eliminate the corresponding {{upperRangeFixedSize}} and > {{lowerRangeFixedSize}}, since the result should be the same if we implement > that optimization. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5285) Optimize upper / lower byte range for key range scan on windowed stores
[ https://issues.apache.org/jira/browse/KAFKA-5285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16366165#comment-16366165 ] Guozhang Wang commented on KAFKA-5285: -- [~davispw] [~xvrl] Thinking about this a bit more, I think we can provide a better heuristic to define the upper and lower bound for window store {{fetch(keyFrom, keyTo, timeFrom, timeTo)}} indeed, which is to just consider the following four combinations: 1. keyFrom + minSuffix 2. keyTo + minSuffix 3. keyFrom + maxSuffix 4. keyTo + maxSuffix where minSuffix = timeFrom + seq0, and maxSuffix = timeTo + seqMAX. And then we just pick the smallest and largest among these four as lower and upper range, to replace the current {{OrderedBytes}} implementations. WDYT? > Optimize upper / lower byte range for key range scan on windowed stores > --- > > Key: KAFKA-5285 > URL: https://issues.apache.org/jira/browse/KAFKA-5285 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Xavier Léauté >Assignee: Xavier Léauté >Priority: Major > Labels: performance > > The current implementation of {{WindowKeySchema}} / {{SessionKeySchema}} > {{upperRange}} and {{lowerRange}} does not make any assumptions with respect > to the other key bound (e.g. the upper byte bound does not depends on lower > key bound). > It should be possible to optimize the byte range somewhat further using the > information provided by the lower bound. > More specifically, by incorporating that information, we should be able to > eliminate the corresponding {{upperRangeFixedSize}} and > {{lowerRangeFixedSize}}, since the result should be the same if we implement > that optimization. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5285) optimize upper / lower byte range for key range scan on windowed stores
[ https://issues.apache.org/jira/browse/KAFKA-5285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16295649#comment-16295649 ] Guozhang Wang commented on KAFKA-5285: -- I'll let [~damianguy] to chime in more on this issue. Though I think [~xvrl]'s reported one and [~davispw]'s reported one are correlated but not exactly the same. My understanding is that, for session windows where the session's ending timestamps cannot be defined until the time the window is closed / merged, because of the way we lay out the data in the underlying key value store, we have to be conservative to determine the range that we need to search for applicable sessions. I do not have a concrete idea on how to improve this observed issues, but if you guys have some idea on how to improve, let's discuss them here. > optimize upper / lower byte range for key range scan on windowed stores > --- > > Key: KAFKA-5285 > URL: https://issues.apache.org/jira/browse/KAFKA-5285 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Xavier Léauté >Assignee: Xavier Léauté > Labels: performance > > The current implementation of {{WindowKeySchema}} / {{SessionKeySchema}} > {{upperRange}} and {{lowerRange}} does not make any assumptions with respect > to the other key bound (e.g. the upper byte bound does not depends on lower > key bound). > It should be possible to optimize the byte range somewhat further using the > information provided by the lower bound. > More specifically, by incorporating that information, we should be able to > eliminate the corresponding {{upperRangeFixedSize}} and > {{lowerRangeFixedSize}}, since the result should be the same if we implement > that optimization. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5285) optimize upper / lower byte range for key range scan on windowed stores
[ https://issues.apache.org/jira/browse/KAFKA-5285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291962#comment-16291962 ] Peter Davis commented on KAFKA-5285: In debugging a recent performance blocker in an app of mine, I'm suspecting that when calling `ReadOnlySessionStore.fetch(from,to)`, which uses from (Timestamp) = Long.MAX_VALUE, it calls `upperRange` with a `maxSuffix` filled with mostly 0xFF's. The resulting upperRange is therefore also mostly 0xFF and the resulting RocksDB iterator effectively iterates over (binaryKeyFrom...infinity). With a large number of keys, this is much worse than a mere performance issue (though the result appears "correct" since SessionKeySchema.hasNextCondition filters out the bogus results). It iterates over thousands of unnecessary records and is slow as molasses. It looks like the issue dates to KIP-155. In [`SessionKeySchema#upperRange`](https://github.com/apache/kafka/commit/e28752357705568219315375c666f8e500db9c12#diff-52e7d2701ecab21b32621d9b13b7f33bR57), why is `putLong(to)` (timestamp) repeated twice and it does not use `put(key)` to build the `maxRange`? When using a timestamp less than `Long.MAX_VALUE`, the issue is avoided because `OrderedBytes.upperRange` copies more of the real key. But `ReadOnlySessionStore.fetch` does not let one specify a different timestamp. > optimize upper / lower byte range for key range scan on windowed stores > --- > > Key: KAFKA-5285 > URL: https://issues.apache.org/jira/browse/KAFKA-5285 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Xavier Léauté >Assignee: Xavier Léauté > Labels: performance > > The current implementation of {{WindowKeySchema}} / {{SessionKeySchema}} > {{upperRange}} and {{lowerRange}} does not make any assumptions with respect > to the other key bound (e.g. the upper byte bound does not depends on lower > key bound). > It should be possible to optimize the byte range somewhat further using the > information provided by the lower bound. > More specifically, by incorporating that information, we should be able to > eliminate the corresponding {{upperRangeFixedSize}} and > {{lowerRangeFixedSize}}, since the result should be the same if we implement > that optimization. -- This message was sent by Atlassian JIRA (v6.4.14#64029)