This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 844c125 MINOR: Optimize the OrderedBytes#upperRange for not all query cases (#11181) 844c125 is described below commit 844c1259a9e2ea9caf2635213c4c0a144f6b758f Author: Luke Chen <show...@gmail.com> AuthorDate: Fri Aug 27 05:37:34 2021 +0800 MINOR: Optimize the OrderedBytes#upperRange for not all query cases (#11181) Currently in OrderedBytes#upperRange method, we'll check key bytes 1 by 1, to see if there's a byte value >= first timestamp byte value, so that we can skip the following key bytes, because we know compareTo will always return 0 or 1. However, in most cases, the first timestamp byte is always 0, more specifically the upperRange is called for both window store and session store. For former, the suffix is in timestamp, Long.MAX_VALUE and for latter the suffix is in Long.MAX_VALUE, times [...] This PR optimizes the not all query cases by not checking the key byte 1 by 1 (because we know the unsigned integer will always be >= 0), instead, put all bytes and timestamp directly. So, we won't have byte array copy in the end either. Reviewers: Guozhang Wang <wangg...@gmail.com> --- .../streams/state/internals/OrderedBytes.java | 36 ++++++++++++++-------- .../state/internals/WindowKeySchemaTest.java | 2 +- 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/OrderedBytes.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/OrderedBytes.java index cd6b6ad..561f24c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/OrderedBytes.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/OrderedBytes.java @@ -32,21 +32,33 @@ class OrderedBytes { static Bytes upperRange(final Bytes key, final byte[] maxSuffix) { final byte[] bytes = key.get(); final ByteBuffer rangeEnd = ByteBuffer.allocate(bytes.length + maxSuffix.length); + final int firstTimestampByte = maxSuffix[0] & 0xFF; - int i = 0; - while (i < bytes.length && ( - i < MIN_KEY_LENGTH // assumes keys are at least one byte long - || (bytes[i] & 0xFF) >= (maxSuffix[0] & 0xFF) - )) { - rangeEnd.put(bytes[i++]); - } + // if firstTimestampByte is 0, we'll put all key bytes into range result because `(bytes[i] & 0xFF) >= firstTimestampByte` + // will always be true (this is a byte to unsigned int conversion comparison) + if (firstTimestampByte == 0) { + return Bytes.wrap( + rangeEnd + .put(bytes) + .put(maxSuffix) + .array() + ); + } else { + int i = 0; + while (i < bytes.length && ( + i < MIN_KEY_LENGTH // assumes keys are at least one byte long + || (bytes[i] & 0xFF) >= firstTimestampByte + )) { + rangeEnd.put(bytes[i++]); + } - rangeEnd.put(maxSuffix); - rangeEnd.flip(); + rangeEnd.put(maxSuffix); + rangeEnd.flip(); - final byte[] res = new byte[rangeEnd.remaining()]; - ByteBuffer.wrap(res).put(rangeEnd); - return Bytes.wrap(res); + final byte[] res = new byte[rangeEnd.remaining()]; + ByteBuffer.wrap(res).put(rangeEnd); + return Bytes.wrap(res); + } } static Bytes lowerRange(final Bytes key, final byte[] minSuffix) { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java index 8f0ca83..dc88410 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java @@ -128,7 +128,7 @@ public class WindowKeySchemaTest { final Bytes upper = windowKeySchema.upperRange(Bytes.wrap(new byte[] {0xC, 0xC, 0x9}), 0x0AffffffffffffffL); assertThat( - "shorter key with max timestamp should be in range", + "shorter key with customized timestamp should be in range", upper.compareTo( WindowKeySchema.toStoreKeyBinary( new byte[] {0xC, 0xC},