kevin-wu24 commented on code in PR #20707:
URL: https://github.com/apache/kafka/pull/20707#discussion_r2538773554
##########
metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java:
##########
@@ -442,14 +442,16 @@ SnapshotManifest loadSnapshot(
while (reader.hasNext()) {
Batch<ApiMessageAndVersion> batch = reader.next();
loadControlRecords(batch);
- for (ApiMessageAndVersion record : batch.records()) {
- try {
- delta.replay(record.message());
- } catch (Throwable e) {
- faultHandler.handleFault("Error loading metadata log
record " + snapshotIndex +
- " in snapshot at offset " +
reader.lastContainedLogOffset(), e);
+ if (reader.snapshotId().equals(Snapshots.BOOTSTRAP_SNAPSHOT_ID) &&
!batch.records().isEmpty()) {
+ for (ApiMessageAndVersion record : batch.records()) {
+ try {
+ delta.replay(record.message());
+ } catch (Throwable e) {
+ faultHandler.handleFault("Error loading metadata log
record " + snapshotIndex +
+ " in snapshot at offset " +
reader.lastContainedLogOffset(), e);
+ }
+ snapshotIndex++;
Review Comment:
This is incorrect because the code is replaying records ONLY if the snapshot
being loaded is the `0-0.checkpoint` with metadata records. The code should
skip over replaying records in that case, not replay them. For all other
checkpoints, the code can remain as is.
```suggestion
if
(!reader.snapshotId().equals(Snapshots.BOOTSTRAP_SNAPSHOT_ID)) {
for (ApiMessageAndVersion record : batch.records()) {
try {
delta.replay(record.message());
} catch (Throwable e) {
faultHandler.handleFault("Error loading metadata log
record " + snapshotIndex +
" in snapshot at offset " +
reader.lastContainedLogOffset(), e);
}
snapshotIndex++;
```
##########
metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java:
##########
@@ -442,14 +442,16 @@ SnapshotManifest loadSnapshot(
while (reader.hasNext()) {
Batch<ApiMessageAndVersion> batch = reader.next();
loadControlRecords(batch);
- for (ApiMessageAndVersion record : batch.records()) {
- try {
- delta.replay(record.message());
- } catch (Throwable e) {
- faultHandler.handleFault("Error loading metadata log
record " + snapshotIndex +
- " in snapshot at offset " +
reader.lastContainedLogOffset(), e);
+ if (reader.snapshotId().equals(Snapshots.BOOTSTRAP_SNAPSHOT_ID) &&
!batch.records().isEmpty()) {
+ for (ApiMessageAndVersion record : batch.records()) {
+ try {
+ delta.replay(record.message());
+ } catch (Throwable e) {
+ faultHandler.handleFault("Error loading metadata log
record " + snapshotIndex +
+ " in snapshot at offset " +
reader.lastContainedLogOffset(), e);
+ }
+ snapshotIndex++;
Review Comment:
This is incorrect because the code is replaying records ONLY if the snapshot
being loaded is the `0-0.checkpoint` with metadata records. The code should
skip over replaying records in that case, not replay them. For all other
checkpoints, the code should replay records.
```suggestion
if
(!reader.snapshotId().equals(Snapshots.BOOTSTRAP_SNAPSHOT_ID)) {
for (ApiMessageAndVersion record : batch.records()) {
try {
delta.replay(record.message());
} catch (Throwable e) {
faultHandler.handleFault("Error loading metadata log
record " + snapshotIndex +
" in snapshot at offset " +
reader.lastContainedLogOffset(), e);
}
snapshotIndex++;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]