kevin-wu24 commented on code in PR #20707:
URL: https://github.com/apache/kafka/pull/20707#discussion_r2538749900


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1022,7 +1022,10 @@ public void 
handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
                         Batch<ApiMessageAndVersion> batch = reader.next();
                         long offset = batch.lastOffset();
                         List<ApiMessageAndVersion> messages = batch.records();
-
+                        // KIP-1170: The 0-0.checkpoint can contain metadata 
records. If it does, they should be considered the bootstrap metadata for the 
cluster.
+                        if 
(reader.snapshotId().equals(Snapshots.BOOTSTRAP_SNAPSHOT_ID) && 
!messages.isEmpty()) {
+                            bootstrapMetadata = 
BootstrapMetadata.fromRecords(messages, "bootstrap");
+                        }

Review Comment:
   This is incorrect. Please follow my suggestion from 
https://github.com/apache/kafka/pull/20707#discussion_r2511539025. This code 
still iterates through `messages` when loading a `0-0.checkpoint` with metadata 
records.



##########
metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java:
##########
@@ -442,14 +442,16 @@ SnapshotManifest loadSnapshot(
         while (reader.hasNext()) {
             Batch<ApiMessageAndVersion> batch = reader.next();
             loadControlRecords(batch);
-            for (ApiMessageAndVersion record : batch.records()) {
-                try {
-                    delta.replay(record.message());
-                } catch (Throwable e) {
-                    faultHandler.handleFault("Error loading metadata log 
record " + snapshotIndex +
-                            " in snapshot at offset " + 
reader.lastContainedLogOffset(), e);
+            if (reader.snapshotId().equals(Snapshots.BOOTSTRAP_SNAPSHOT_ID) && 
!batch.records().isEmpty()) {
+                for (ApiMessageAndVersion record : batch.records()) {
+                    try {
+                        delta.replay(record.message());
+                    } catch (Throwable e) {
+                        faultHandler.handleFault("Error loading metadata log 
record " + snapshotIndex +
+                                " in snapshot at offset " + 
reader.lastContainedLogOffset(), e);
+                    }
+                    snapshotIndex++;

Review Comment:
   This is incorrect. This code replays records ONLY if the snapshot being 
loaded is the `0-0.checkpoint` with metadata records. The code should skip over 
replaying records in that case, not replay them. For all other checkpoints, the 
code can remain as is.
   
   ```suggestion
               if 
(!reader.snapshotId().equals(Snapshots.BOOTSTRAP_SNAPSHOT_ID)) {
                   for (ApiMessageAndVersion record : batch.records()) {
                       try {
                           delta.replay(record.message());
                       } catch (Throwable e) {
                           faultHandler.handleFault("Error loading metadata log 
record " + snapshotIndex +
                                   " in snapshot at offset " + 
reader.lastContainedLogOffset(), e);
                       }
                       snapshotIndex++;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to