This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 844c125  MINOR: Optimize the OrderedBytes#upperRange for not all query 
cases (#11181)
844c125 is described below

commit 844c1259a9e2ea9caf2635213c4c0a144f6b758f
Author: Luke Chen <show...@gmail.com>
AuthorDate: Fri Aug 27 05:37:34 2021 +0800

    MINOR: Optimize the OrderedBytes#upperRange for not all query cases (#11181)
    
    Currently in OrderedBytes#upperRange method, we'll check key bytes 1 by 1, 
to see if there's a byte value >= first timestamp byte value, so that we can 
skip the following key bytes, because we know compareTo will always return 0 or 
1. However, in most cases, the first timestamp byte is always 0, more 
specifically the upperRange is called for both window store and session store. 
For former, the suffix is in timestamp, Long.MAX_VALUE and for latter the 
suffix is in Long.MAX_VALUE, times [...]
    
    This PR optimizes the not all query cases by not checking the key byte 1 by 
1 (because we know the unsigned integer will always be >= 0), instead, put all 
bytes and timestamp directly. So, we won't have byte array copy in the end 
either.
    
    Reviewers: Guozhang Wang <wangg...@gmail.com>
---
 .../streams/state/internals/OrderedBytes.java      | 36 ++++++++++++++--------
 .../state/internals/WindowKeySchemaTest.java       |  2 +-
 2 files changed, 25 insertions(+), 13 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/OrderedBytes.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/OrderedBytes.java
index cd6b6ad..561f24c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/OrderedBytes.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/OrderedBytes.java
@@ -32,21 +32,33 @@ class OrderedBytes {
     static Bytes upperRange(final Bytes key, final byte[] maxSuffix) {
         final byte[] bytes = key.get();
         final ByteBuffer rangeEnd = ByteBuffer.allocate(bytes.length + 
maxSuffix.length);
+        final int firstTimestampByte = maxSuffix[0] & 0xFF;
 
-        int i = 0;
-        while (i < bytes.length && (
-            i < MIN_KEY_LENGTH // assumes keys are at least one byte long
-            || (bytes[i] & 0xFF) >= (maxSuffix[0] & 0xFF)
-            )) {
-            rangeEnd.put(bytes[i++]);
-        }
+        // if firstTimestampByte is 0, we'll put all key bytes into range 
result because `(bytes[i] & 0xFF) >= firstTimestampByte`
+        // will always be true (this is a byte to unsigned int conversion 
comparison)
+        if (firstTimestampByte == 0) {
+            return Bytes.wrap(
+                rangeEnd
+                    .put(bytes)
+                    .put(maxSuffix)
+                    .array()
+            );
+        } else {
+            int i = 0;
+            while (i < bytes.length && (
+                i < MIN_KEY_LENGTH // assumes keys are at least one byte long
+                || (bytes[i] & 0xFF) >= firstTimestampByte
+                )) {
+                rangeEnd.put(bytes[i++]);
+            }
 
-        rangeEnd.put(maxSuffix);
-        rangeEnd.flip();
+            rangeEnd.put(maxSuffix);
+            rangeEnd.flip();
 
-        final byte[] res = new byte[rangeEnd.remaining()];
-        ByteBuffer.wrap(res).put(rangeEnd);
-        return Bytes.wrap(res);
+            final byte[] res = new byte[rangeEnd.remaining()];
+            ByteBuffer.wrap(res).put(rangeEnd);
+            return Bytes.wrap(res);
+        }
     }
 
     static Bytes lowerRange(final Bytes key, final byte[] minSuffix) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
index 8f0ca83..dc88410 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
@@ -128,7 +128,7 @@ public class WindowKeySchemaTest {
         final Bytes upper = windowKeySchema.upperRange(Bytes.wrap(new byte[] 
{0xC, 0xC, 0x9}), 0x0AffffffffffffffL);
 
         assertThat(
-            "shorter key with max timestamp should be in range",
+            "shorter key with customized timestamp should be in range",
             upper.compareTo(
                 WindowKeySchema.toStoreKeyBinary(
                     new byte[] {0xC, 0xC},

Reply via email to