jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1595915596


##########
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java:
##########
@@ -728,83 +730,114 @@ public static MemoryRecords withLeaderChangeMessage(
         ByteBuffer buffer,
         LeaderChangeMessage leaderChangeMessage
     ) {
-        writeLeaderChangeMessage(buffer, initialOffset, timestamp, 
leaderEpoch, leaderChangeMessage);
-        buffer.flip();
-        return MemoryRecords.readableRecords(buffer);
+        try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
+                initialOffset,
+                timestamp,
+                leaderEpoch,
+                buffer
+            )
+        ) {
+            builder.appendLeaderChangeMessage(timestamp, leaderChangeMessage);
+            return builder.build();
+        }
     }
 
-    private static void writeLeaderChangeMessage(
-        ByteBuffer buffer,
+    public static MemoryRecords withSnapshotHeaderRecord(
         long initialOffset,
         long timestamp,
         int leaderEpoch,
-        LeaderChangeMessage leaderChangeMessage
+        ByteBuffer buffer,
+        SnapshotHeaderRecord snapshotHeaderRecord
     ) {
-        try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
-            buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
-            TimestampType.CREATE_TIME, initialOffset, timestamp,
-            RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, 
RecordBatch.NO_SEQUENCE,
-            false, true, leaderEpoch, buffer.capacity())
+        try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
+                initialOffset,
+                timestamp,
+                leaderEpoch,
+                buffer
+            )
         ) {
-            builder.appendLeaderChangeMessage(timestamp, leaderChangeMessage);
+            builder.appendSnapshotHeaderMessage(timestamp, 
snapshotHeaderRecord);
+            return builder.build();
         }
     }
 
-    public static MemoryRecords withSnapshotHeaderRecord(
+    public static MemoryRecords withSnapshotFooterRecord(
         long initialOffset,
         long timestamp,
         int leaderEpoch,
         ByteBuffer buffer,
-        SnapshotHeaderRecord snapshotHeaderRecord
+        SnapshotFooterRecord snapshotFooterRecord
     ) {
-        writeSnapshotHeaderRecord(buffer, initialOffset, timestamp, 
leaderEpoch, snapshotHeaderRecord);
-        buffer.flip();
-        return MemoryRecords.readableRecords(buffer);
+        try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
+                initialOffset,
+                timestamp,
+                leaderEpoch,
+                buffer
+            )
+        ) {
+            builder.appendSnapshotFooterMessage(timestamp, 
snapshotFooterRecord);
+            return builder.build();
+        }
     }
 
-    private static void writeSnapshotHeaderRecord(
-        ByteBuffer buffer,
+    public static MemoryRecords withKRaftVersionRecord(
         long initialOffset,
         long timestamp,
         int leaderEpoch,
-        SnapshotHeaderRecord snapshotHeaderRecord
+        ByteBuffer buffer,
+        KRaftVersionRecord kraftVersionRecord
     ) {
-        try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
-            buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
-            TimestampType.CREATE_TIME, initialOffset, timestamp,
-            RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, 
RecordBatch.NO_SEQUENCE,
-            false, true, leaderEpoch, buffer.capacity())
+        try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
+                initialOffset,
+                timestamp,
+                leaderEpoch,
+                buffer
+            )
         ) {
-            builder.appendSnapshotHeaderMessage(timestamp, 
snapshotHeaderRecord);
+            builder.appendKRaftVersionMessage(timestamp, kraftVersionRecord);
+            return builder.build();
         }
     }
 
-    public static MemoryRecords withSnapshotFooterRecord(
+    public static MemoryRecords withVotersRecord(
         long initialOffset,
         long timestamp,
         int leaderEpoch,
         ByteBuffer buffer,
-        SnapshotFooterRecord snapshotFooterRecord
+        VotersRecord votersRecord
     ) {
-        writeSnapshotFooterRecord(buffer, initialOffset, timestamp, 
leaderEpoch, snapshotFooterRecord);
-        buffer.flip();
-        return MemoryRecords.readableRecords(buffer);
+        try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
+                initialOffset,
+                timestamp,
+                leaderEpoch,
+                buffer
+            )
+        ) {
+            builder.appendVotersMessage(timestamp, votersRecord);
+            return builder.build();
+        }
     }
 
-    private static void writeSnapshotFooterRecord(
-        ByteBuffer buffer,
+    private static MemoryRecordsBuilder createKraftControlReccordBuilder(

Review Comment:
   I submitted this https://github.com/apache/kafka/pull/15912 as a followup to 
these comments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to