kevin-wu24 commented on code in PR #20707:
URL: https://github.com/apache/kafka/pull/20707#discussion_r2538773554


##########
metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java:
##########
@@ -442,14 +442,16 @@ SnapshotManifest loadSnapshot(
         while (reader.hasNext()) {
             Batch<ApiMessageAndVersion> batch = reader.next();
             loadControlRecords(batch);
-            for (ApiMessageAndVersion record : batch.records()) {
-                try {
-                    delta.replay(record.message());
-                } catch (Throwable e) {
-                    faultHandler.handleFault("Error loading metadata log 
record " + snapshotIndex +
-                            " in snapshot at offset " + 
reader.lastContainedLogOffset(), e);
+            if (reader.snapshotId().equals(Snapshots.BOOTSTRAP_SNAPSHOT_ID) && 
!batch.records().isEmpty()) {
+                for (ApiMessageAndVersion record : batch.records()) {
+                    try {
+                        delta.replay(record.message());
+                    } catch (Throwable e) {
+                        faultHandler.handleFault("Error loading metadata log 
record " + snapshotIndex +
+                                " in snapshot at offset " + 
reader.lastContainedLogOffset(), e);
+                    }
+                    snapshotIndex++;

Review Comment:
   This is incorrect because the code is replaying records ONLY if the snapshot 
being loaded is the `0-0.checkpoint` with metadata records. The code should 
skip over replaying records in that case, not replay them. For all other 
checkpoints, the code can remain as is.
   
   ```suggestion
               if 
(!reader.snapshotId().equals(Snapshots.BOOTSTRAP_SNAPSHOT_ID)) {
                   for (ApiMessageAndVersion record : batch.records()) {
                       try {
                           delta.replay(record.message());
                       } catch (Throwable e) {
                           faultHandler.handleFault("Error loading metadata log 
record " + snapshotIndex +
                                   " in snapshot at offset " + 
reader.lastContainedLogOffset(), e);
                       }
                       snapshotIndex++;
   ```



##########
metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java:
##########
@@ -442,14 +442,16 @@ SnapshotManifest loadSnapshot(
         while (reader.hasNext()) {
             Batch<ApiMessageAndVersion> batch = reader.next();
             loadControlRecords(batch);
-            for (ApiMessageAndVersion record : batch.records()) {
-                try {
-                    delta.replay(record.message());
-                } catch (Throwable e) {
-                    faultHandler.handleFault("Error loading metadata log 
record " + snapshotIndex +
-                            " in snapshot at offset " + 
reader.lastContainedLogOffset(), e);
+            if (reader.snapshotId().equals(Snapshots.BOOTSTRAP_SNAPSHOT_ID) && 
!batch.records().isEmpty()) {
+                for (ApiMessageAndVersion record : batch.records()) {
+                    try {
+                        delta.replay(record.message());
+                    } catch (Throwable e) {
+                        faultHandler.handleFault("Error loading metadata log 
record " + snapshotIndex +
+                                " in snapshot at offset " + 
reader.lastContainedLogOffset(), e);
+                    }
+                    snapshotIndex++;

Review Comment:
   This is incorrect because the code is replaying records ONLY if the snapshot 
being loaded is the `0-0.checkpoint` with metadata records. The code should 
skip over replaying records in that case, not replay them. For all other 
checkpoints, the code should replay records.
   
   ```suggestion
               if 
(!reader.snapshotId().equals(Snapshots.BOOTSTRAP_SNAPSHOT_ID)) {
                   for (ApiMessageAndVersion record : batch.records()) {
                       try {
                           delta.replay(record.message());
                       } catch (Throwable e) {
                           faultHandler.handleFault("Error loading metadata log 
record " + snapshotIndex +
                                   " in snapshot at offset " + 
reader.lastContainedLogOffset(), e);
                       }
                       snapshotIndex++;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to