aliehsaeedii commented on code in PR #21408:
URL: https://github.com/apache/kafka/pull/21408#discussion_r2770851535
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java:
##########
@@ -113,30 +127,21 @@ static byte[] rawValue(final byte[]
rawValueTimestampHeaders) {
}
final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
- final int headerSize = ByteUtils.readVarint(buffer);
- buffer.position(buffer.position() + headerSize + Long.BYTES);
-
- final byte[] rawValue = new byte[buffer.remaining()];
- buffer.get(rawValue);
+ final int headersSize = ByteUtils.readVarint(buffer);
+ buffer.position(buffer.position() + headersSize + Long.BYTES);
- return rawValue;
+ return readBytes(buffer, buffer.remaining());
}
/**
* Extract timestamp from serialized ValueTimestampHeaders.
*/
static long timestamp(final byte[] rawValueTimestampHeaders) {
- if (rawValueTimestampHeaders == null) {
Review Comment:
Why did you remove this `if (rawValueTimestampHeaders == null)` check?
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java:
##########
@@ -104,6 +98,26 @@ public void setIfUnset(final SerdeGetter getter) {
initNullableDeserializer(valueDeserializer, getter);
}
+ /**
+ * Reads the specified number of bytes from the buffer with validation.
+ *
+ * @param buffer the ByteBuffer to read from
+ * @param length the number of bytes to read
+ * @return the byte array containing the read bytes
+ * @throws IllegalArgumentException if buffer doesn't have enough bytes
+ */
+ private static byte[] readBytes(final ByteBuffer buffer, final int length)
{
+ if (buffer.remaining() < length) {
+ throw new IllegalArgumentException(
Review Comment:
Not sure if `SerializationExcpetion` is more specific and fits more
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]