mjsax commented on code in PR #21706:
URL: https://github.com/apache/kafka/pull/21706#discussion_r2944599096
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersDeserializer.java:
##########
@@ -98,17 +104,27 @@ static Headers headers(final byte[]
rawAggregationWithHeaders) {
* Extract the raw aggregation bytes from serialized
AggregationWithHeaders,
* stripping the headers prefix.
*/
- static byte[] rawAggregation(final byte[] aggregationWithHeaders) {
+ public static byte[] rawAggregation(final byte[] aggregationWithHeaders) {
if (aggregationWithHeaders == null) {
return null;
}
+ // If the header is empty, then copy the value bytes directly
+ if (aggregationWithHeaders.length > 0 && aggregationWithHeaders[0] ==
0x00) {
+ // Strip header size's varint byte, and empty headers consume no
bytes
+ final byte[] res = new byte[aggregationWithHeaders.length - 1];
Review Comment:
```suggestion
final byte[] aggregation = new
byte[aggregationWithHeaders.length - 1];
```
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java:
##########
@@ -129,27 +131,19 @@ static long timestamp(final byte[]
rawValueTimestampHeaders) {
/**
* Extract headers from serialized ValueTimestampHeaders.
*/
- static Headers headers(final byte[] rawValueTimestampHeaders) {
+ public static Headers headers(final byte[] rawValueTimestampHeaders) {
if (rawValueTimestampHeaders == null) {
return null;
}
- final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
- final int headersSize = ByteUtils.readVarint(buffer);
- final byte[] rawHeaders = readBytes(buffer, headersSize);
- return HeadersDeserializer.deserialize(rawHeaders);
- }
- /**
- * Extract raw value from serialized ValueTimestampHeaders.
- */
- static byte[] rawValue(final byte[] rawValueTimestampHeaders) {
- if (rawValueTimestampHeaders == null) {
- return null;
+ // If the header is empty, simply return it
+ if (rawValueTimestampHeaders.length > 0 && rawValueTimestampHeaders[0]
== 0x00) {
+ return new RecordHeaders();
}
final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
final int headersSize = ByteUtils.readVarint(buffer);
- buffer.position(buffer.position() + headersSize + Long.BYTES);
- return readBytes(buffer, buffer.remaining());
+ final byte[] rawHeaders = readBytes(buffer, headersSize);
Review Comment:
Same question: why no use `arraycopy` here, too?
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/Utils.java:
##########
@@ -35,11 +35,19 @@ public class Utils {
* Input: [headersSize(varint)][headers][timestamp(8)][value]
* Output: [value]
*/
- static byte[] rawPlainValue(final byte[] rawValueTimestampHeaders) {
+ public static byte[] rawPlainValue(final byte[] rawValueTimestampHeaders) {
if (rawValueTimestampHeaders == null) {
return null;
}
+ // If the header is empty, then copy the value bytes directly
+ if (hasEmptyHeadersAndTimestamp(rawValueTimestampHeaders)) {
+ // Strip header size (varint 1 byte), empty headers (no bytes),
and timestamp
+ final byte[] res = new byte[rawValueTimestampHeaders.length - 1 -
StateSerdes.TIMESTAMP_SIZE];
Review Comment:
```suggestion
final byte[] rawValue = new byte[rawValueTimestampHeaders.length
- 1 - StateSerdes.TIMESTAMP_SIZE];
```
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java:
##########
Review Comment:
Why can't we apply the same optimization here, and use `arraycopy` instead
of `readBytes` ?
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/Utils.java:
##########
@@ -50,6 +58,21 @@ static byte[] rawPlainValue(final byte[]
rawValueTimestampHeaders) {
return result;
}
+ public static boolean hasEmptyHeadersAndTimestamp(final byte[]
rawValueTimestampHeaders) {
Review Comment:
The method checks if headers are empty. The name is confusing.
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/Utils.java:
##########
@@ -50,6 +58,21 @@ static byte[] rawPlainValue(final byte[]
rawValueTimestampHeaders) {
return result;
}
+ public static boolean hasEmptyHeadersAndTimestamp(final byte[]
rawValueTimestampHeaders) {
+ if (rawValueTimestampHeaders.length > 0 && rawValueTimestampHeaders[0]
== 0x00) {
+ // Header size (varint 1 byte), empty headers (no bytes), and
timestamp
+ if (rawValueTimestampHeaders.length - 1 -
StateSerdes.TIMESTAMP_SIZE < 0) {
Review Comment:
Why do we need this check? Seems unnecessary to me?
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/Utils.java:
##########
@@ -58,11 +81,19 @@ static byte[] rawPlainValue(final byte[]
rawValueTimestampHeaders) {
* Input: [headersSize(varint)][headers][timestamp(8)][value]
* Output: [timestamp(8)][value]
*/
- static byte[] rawTimestampedValue(final byte[] rawValueTimestampHeaders) {
+ public static byte[] rawTimestampedValue(final byte[]
rawValueTimestampHeaders) {
if (rawValueTimestampHeaders == null) {
return null;
}
+ // If the header is empty, then copy the value and timestamp bytes
directly
+ if (hasEmptyHeadersAndTimestamp(rawValueTimestampHeaders)) {
+ // Strip header size (varint 1 byte), empty headers (no bytes)
+ final byte[] res = new byte[rawValueTimestampHeaders.length - 1];
Review Comment:
```suggestion
final byte[] rawValueAndTimestamp = new
byte[rawValueTimestampHeaders.length - 1];
```
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/Utils.java:
##########
@@ -50,6 +58,21 @@ static byte[] rawPlainValue(final byte[]
rawValueTimestampHeaders) {
return result;
}
+ public static boolean hasEmptyHeadersAndTimestamp(final byte[]
rawValueTimestampHeaders) {
Review Comment:
```suggestion
public static boolean hasEmptyHeaders(final byte[]
rawValueTimestampHeaders) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]