aliehsaeedii commented on code in PR #21706:
URL: https://github.com/apache/kafka/pull/21706#discussion_r2932783518
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersDeserializer.java:
##########
@@ -44,7 +44,7 @@
* <p>
* This is used by KIP-1271 to deserialize aggregations with headers from
session state stores.
*/
-class AggregationWithHeadersDeserializer<AGG> implements
WrappingNullableDeserializer<AggregationWithHeaders<AGG>, Void, AGG> {
+public class AggregationWithHeadersDeserializer<AGG> implements
WrappingNullableDeserializer<AggregationWithHeaders<AGG>, Void, AGG> {
Review Comment:
I assume you make them `public` to be used in jmh testing?!
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersDeserializer.java:
##########
@@ -98,17 +98,27 @@ static Headers headers(final byte[]
rawAggregationWithHeaders) {
* Extract the raw aggregation bytes from serialized
AggregationWithHeaders,
* stripping the headers prefix.
*/
- static byte[] rawAggregation(final byte[] aggregationWithHeaders) {
+ public static byte[] rawAggregation(final byte[] aggregationWithHeaders) {
if (aggregationWithHeaders == null) {
return null;
}
+ // If the header is empty, then copy the value bytes directly
+ if (aggregationWithHeaders.length > 0 && aggregationWithHeaders[0] ==
0x00) {
+ // Strip header size's varint byte, and empty headers consume no
bytes
+ final byte[] res = new byte[aggregationWithHeaders.length - 1];
+ System.arraycopy(aggregationWithHeaders, 1, res, 0, res.length);
+ return res;
+ }
+
final ByteBuffer buffer = ByteBuffer.wrap(aggregationWithHeaders);
- readHeaders(buffer);
+ // Skip the headers bytes without deserizization or copying
+ final int headersSize = ByteUtils.readVarint(buffer);
+ buffer.position(buffer.position() + headersSize);
return readBytes(buffer, buffer.remaining());
}
- private static Headers readHeaders(final ByteBuffer buffer) {
+ public static Headers readHeaders(final ByteBuffer buffer) {
final int headersSize = ByteUtils.readVarint(buffer);
final byte[] rawHeaders = readBytes(buffer, headersSize);
return HeadersDeserializer.deserialize(rawHeaders);
Review Comment:
I'm wondering if it makes sense to do the `if
(rawAggregationWithHeaders.length > 0 && rawAggregationWithHeaders[0] == 0x00)
{` in `static Headers headers(final byte[] rawAggregationWithHeaders)` as well!
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/Utils.java:
##########
@@ -50,6 +58,21 @@ static byte[] rawPlainValue(final byte[]
rawValueTimestampHeaders) {
return result;
}
+ private static boolean hasEmptyHeadersAndTimestamp(final byte[]
rawValueTimestampHeaders) {
Review Comment:
nit: Should it be `hasEmptyHeaders` only? We don't check empty ts!
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/UtilsTest.java:
##########
@@ -78,6 +78,18 @@ public void shouldReturnNullForNullRawTimestampedValue() {
assertNull(rawPlainValue(null));
}
+ @Test
+ public void shouldExtractRawValueWithEmptyHeaders() {
+ final byte[] data = new byte[1 + StateSerdes.TIMESTAMP_SIZE +
VALUE.length];
+ final ByteBuffer buf = ByteBuffer.wrap(data);
+ buf.put((byte) 0x00); // header size
+ buf.putLong(TIMESTAMP);
+ buf.put(VALUE); // non-header payload
Review Comment:
```suggestion
buf.put(VALUE); // plain value
```
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java:
##########
@@ -139,17 +139,4 @@ static Headers headers(final byte[]
rawValueTimestampHeaders) {
final byte[] rawHeaders = readBytes(buffer, headersSize);
return HeadersDeserializer.deserialize(rawHeaders);
}
- /**
- * Extract raw value from serialized ValueTimestampHeaders.
- */
- static byte[] rawValue(final byte[] rawValueTimestampHeaders) {
- if (rawValueTimestampHeaders == null) {
- return null;
- }
-
- final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
- final int headersSize = ByteUtils.readVarint(buffer);
- buffer.position(buffer.position() + headersSize + Long.BYTES);
- return readBytes(buffer, buffer.remaining());
- }
Review Comment:
Does that make sense to apply the same optimization in the methods of the
class such as `headers()`, `value()`, and `deserialize()`?
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersDeserializer.java:
##########
@@ -98,17 +98,27 @@ static Headers headers(final byte[]
rawAggregationWithHeaders) {
* Extract the raw aggregation bytes from serialized
AggregationWithHeaders,
* stripping the headers prefix.
*/
- static byte[] rawAggregation(final byte[] aggregationWithHeaders) {
+ public static byte[] rawAggregation(final byte[] aggregationWithHeaders) {
if (aggregationWithHeaders == null) {
return null;
}
+ // If the header is empty, then copy the value bytes directly
+ if (aggregationWithHeaders.length > 0 && aggregationWithHeaders[0] ==
0x00) {
+ // Strip header size's varint byte, and empty headers consume no
bytes
+ final byte[] res = new byte[aggregationWithHeaders.length - 1];
+ System.arraycopy(aggregationWithHeaders, 1, res, 0, res.length);
+ return res;
+ }
+
final ByteBuffer buffer = ByteBuffer.wrap(aggregationWithHeaders);
- readHeaders(buffer);
+ // Skip the headers bytes without deserizization or copying
Review Comment:
```suggestion
// Skip the headers bytes without deserialization or copying
```
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersDeserializer.java:
##########
@@ -98,17 +98,27 @@ static Headers headers(final byte[]
rawAggregationWithHeaders) {
* Extract the raw aggregation bytes from serialized
AggregationWithHeaders,
* stripping the headers prefix.
*/
- static byte[] rawAggregation(final byte[] aggregationWithHeaders) {
+ public static byte[] rawAggregation(final byte[] aggregationWithHeaders) {
if (aggregationWithHeaders == null) {
return null;
}
+ // If the header is empty, then copy the value bytes directly
+ if (aggregationWithHeaders.length > 0 && aggregationWithHeaders[0] ==
0x00) {
+ // Strip header size's varint byte, and empty headers consume no
bytes
+ final byte[] res = new byte[aggregationWithHeaders.length - 1];
+ System.arraycopy(aggregationWithHeaders, 1, res, 0, res.length);
+ return res;
+ }
+
final ByteBuffer buffer = ByteBuffer.wrap(aggregationWithHeaders);
- readHeaders(buffer);
+ // Skip the headers bytes without deserizization or copying
+ final int headersSize = ByteUtils.readVarint(buffer);
+ buffer.position(buffer.position() + headersSize);
return readBytes(buffer, buffer.remaining());
}
- private static Headers readHeaders(final ByteBuffer buffer) {
+ public static Headers readHeaders(final ByteBuffer buffer) {
Review Comment:
can we optimize `readHeaders` as well?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]