mingyen066 commented on code in PR #21439:
URL: https://github.com/apache/kafka/pull/21439#discussion_r2804735091


##########
clients/src/main/java/org/apache/kafka/common/record/internal/ControlRecordType.java:
##########
@@ -59,46 +58,45 @@ public enum ControlRecordType {
 
     private static final Logger log = 
LoggerFactory.getLogger(ControlRecordType.class);
 
-    static final short CURRENT_CONTROL_RECORD_KEY_VERSION = 0;
-    static final int CURRENT_CONTROL_RECORD_KEY_SIZE = 4;
-    private static final Schema CONTROL_RECORD_KEY_SCHEMA_VERSION_V0 = new 
Schema(
-            new Field("version", Type.INT16),
-            new Field("type", Type.INT16));
-
     private final short type;
+    private final ByteBuffer buffer;
 
     ControlRecordType(short type) {
         this.type = type;
+        ControlRecordTypeSchema schema = new 
ControlRecordTypeSchema().setType(type);
+        buffer = 
MessageUtil.toVersionPrefixedByteBuffer(ControlRecordTypeSchema.HIGHEST_SUPPORTED_VERSION,
 schema);
     }
 
     public short type() {
         return type;
     }
 
-    public Struct recordKey() {
+    public ByteBuffer recordKey() {
         if (this == UNKNOWN)
             throw new IllegalArgumentException("Cannot serialize UNKNOWN 
control record type");
+        return buffer.duplicate();
+    }
 
-        Struct struct = new Struct(CONTROL_RECORD_KEY_SCHEMA_VERSION_V0);
-        struct.set("version", CURRENT_CONTROL_RECORD_KEY_VERSION);
-        struct.set("type", type);
-        return struct;
+    public int controlRecordKeySize() {
+        return buffer.remaining();
     }
 
     public static short parseTypeId(ByteBuffer key) {
-        if (key.remaining() < CURRENT_CONTROL_RECORD_KEY_SIZE)
-            throw new InvalidRecordException("Invalid value size found for end 
control record key. Must have " +
-                    "at least " + CURRENT_CONTROL_RECORD_KEY_SIZE + " bytes, 
but found only " + key.remaining());
-
-        short version = key.getShort(0);
-        if (version < 0)
+        // We should duplicate the original buffer since it will be read again 
in some cases, for example,
+        // read by KafkaRaftClient and RaftClient.Listener
+        ByteBuffer buffer = key.duplicate();
+        short version = buffer.getShort();

Review Comment:
   Sure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to