wcarlson5 commented on code in PR #13533:
URL: https://github.com/apache/kafka/pull/13533#discussion_r1170320445


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java:
##########
@@ -104,33 +104,40 @@ public byte[] serialize(final String topic, final Headers 
headers, final Change<
         final int oldDataLength = oldValueIsNotNull ? oldData.length : 0;
 
         // The serialization format is:
-        // {BYTE_ARRAY oldValue}{BYTE newOldFlag=0}
-        // {BYTE_ARRAY newValue}{BYTE newOldFlag=1}
-        // {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY 
oldValue}{BYTE newOldFlag=2}
-        final ByteBuffer buf;
+        // {BYTE_ARRAY oldValue}{BYTE encodingFlag=0}
+        // {BYTE_ARRAY newValue}{BYTE encodingFlag=1}
+        // {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY 
oldValue}{BYTE encodingFlag=2}

Review Comment:
   UINT32 or VARINT?



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/ChangedSerdeTest.java:
##########
@@ -141,15 +141,15 @@ private static byte[] serializeVersions3Through5(final 
String topic, final Chang
         final int oldDataLength = oldValueIsNotNull ? oldData.length : 0;
 
         // The serialization format is:
-        // {BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=3}
-        // {BYTE_ARRAY newValue}{BYTE isLatest}{BYTE newOldFlag=4}
-        // {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY 
oldValue}{BYTE isLatest}{BYTE newOldFlag=5}
+        // {BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE encodingFlag=3}
+        // {BYTE_ARRAY newValue}{BYTE isLatest}{BYTE encodingFlag=4}
+        // {VARINT newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY 
oldValue}{BYTE isLatest}{BYTE encodingFlag=5}
         final ByteBuffer buf;
         final byte isLatest = data.isLatest ? (byte) 1 : (byte) 0;
         if (newValueIsNotNull && oldValueIsNotNull) {
-            final int capacity = UINT32_SIZE + newDataLength + oldDataLength + 
IS_LATEST_FLAG_SIZE + NEW_OLD_FLAG_SIZE;
+            final int capacity = MAX_VARINT_LENGTH + newDataLength + 
oldDataLength + IS_LATEST_FLAG_SIZE + NEW_OLD_FLAG_SIZE;

Review Comment:
   You changed the `NEW_OLD_FLAG_SIZE` elsewhere to `ENCODING_FLAG_SIZE` can we 
change that here too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to