Gerrrr commented on a change in pull request #11945:
URL: https://github.com/apache/kafka/pull/11945#discussion_r836431097
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
##########
@@ -92,6 +103,32 @@ public void setIfUnset(final SerdeGetter getter) {
return buf.array();
}
+ private byte[] serializeV1(final String topic, final
SubscriptionResponseWrapper<V> data) {
+ final byte[] serializedData = data.getForeignValue() == null ?
null : serializer.serialize(topic, data.getForeignValue());
+ final int serializedDataLength = serializedData == null ? 0 :
serializedData.length;
+ final long[] originalHash = data.getOriginalValueHash();
+ final int hashLength = originalHash == null ? 0 : 2 * Long.BYTES;
+ final int primaryPartitionLength = Integer.BYTES;
+ final int dataLength = 1 + hashLength + serializedDataLength +
primaryPartitionLength;
+
+ final ByteBuffer buf = ByteBuffer.allocate(dataLength);
+
+ if (originalHash != null) {
+ buf.put(data.getVersion());
+ } else {
+ buf.put((byte) (data.getVersion() | (byte) 0x80));
+ }
+ buf.putInt(data.getPrimaryPartition());
Review comment:
I am not sure if this is a good idea. As an advantage, we will remove
duplicated parts in `serialize`. However, there are 2 downsides:
* We use v0 path in 2 situations - when it is *real* v0 and when we upgrade
from v0 (i.e. a data record can actually have a primary partition and v1 as
part of its state). This means that we can't just put `data.getVersion()` into
the buffer, but have to hardcode it to `0`. Making this logic generic enough to
handle v0, upgrade-from, and v1 will be cumbersome.
* Moving a fixed-size field to the end of the byte array will complicate
deserialization. Consider
https://github.com/apache/kafka/blob/d9f6c52e2fa3a7ebd46efdade63a0c392acc8d0c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java#L234-L242
Right now, the logic for finding PK size is straightforward. It can be
summarized as "take it from here and until the end of the buffer". If we move
primary partition to the end, we'll have to remove another 4 bytes iff the
version is greater than 0.
I'd say that code duplication is lesser evil in this case. WDYT? I am happy
to move primary partition to the end if you don't agree.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]