kevin-wu24 commented on code in PR #20707:
URL: https://github.com/apache/kafka/pull/20707#discussion_r2511539025


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1022,7 +1020,13 @@ public void 
handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
                         Batch<ApiMessageAndVersion> batch = reader.next();
                         long offset = batch.lastOffset();
                         List<ApiMessageAndVersion> messages = batch.records();
-
+                        if 
(reader.snapshotId().equals(Snapshots.BOOTSTRAP_SNAPSHOT_ID)) {
+                            // For bootstrap snapshots, extract feature levels 
from all data records
+                            if (!messages.isEmpty()) {
+                                bootstrapMetadata = 
BootstrapMetadata.fromRecords(messages, "bootstrap");
+                                return;
+                            }
+                        }

Review Comment:
   Let's add an in-line comment describing what we're doing here for KIP-1170, 
since it is not super obvious to the reader.
   
   ```suggestion
   // KIP-1170: The 0-0.checkpoint can contain metadata records. If it does, 
they should be considered the bootstrap metadata for the cluster.
   if (reader.snapshotId().equals(Snapshots.BOOTSTRAP_SNAPSHOT_ID) && 
!messages.isEmpty()) {
        bootstrapMetadata = BootstrapMetadata.fromRecords(messages, 
"bootstrap");
        return;
   }
   ```



##########
metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java:
##########
@@ -396,32 +396,34 @@ public void 
handleCommit(BatchReader<ApiMessageAndVersion> reader) {
 
     @Override
     public void handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> 
reader) {
-        eventQueue.append(() -> {
-            try {
-                long numLoaded = metrics.incrementHandleLoadSnapshotCount();
-                String snapshotName = 
Snapshots.filenameFromSnapshotId(reader.snapshotId());
-                log.info("handleLoadSnapshot({}): incrementing 
HandleLoadSnapshotCount to {}.",
-                    snapshotName, numLoaded);
-                MetadataDelta delta = new MetadataDelta.Builder().
-                    setImage(image).
-                    build();
-                SnapshotManifest manifest = loadSnapshot(delta, reader);
-                log.info("handleLoadSnapshot({}): generated a metadata delta 
between offset {} " +
-                        "and this snapshot in {} us.", snapshotName,
-                        image.provenance().lastContainedOffset(),
-                        NANOSECONDS.toMicros(manifest.elapsedNs()));
-                MetadataImage image = delta.apply(manifest.provenance());
-                batchLoader.resetToImage(image);
-                maybePublishMetadata(delta, image, manifest);
-            } catch (Throwable e) {
-                // This is a general catch-all block where we don't expect to 
end up;
-                // failure-prone operations should have individual try/catch 
blocks around them.
-                faultHandler.handleFault("Unhandled fault in 
MetadataLoader#handleLoadSnapshot. " +
-                        "Snapshot offset was " + 
reader.lastContainedLogOffset(), e);
-            } finally {
-                reader.close();
-            }
-        });
+        if (!reader.snapshotId().equals(Snapshots.BOOTSTRAP_SNAPSHOT_ID)) {
+            eventQueue.append(() -> {
+                try {
+                    long numLoaded = 
metrics.incrementHandleLoadSnapshotCount();
+                    String snapshotName = 
Snapshots.filenameFromSnapshotId(reader.snapshotId());
+                    log.info("handleLoadSnapshot({}): incrementing 
HandleLoadSnapshotCount to {}.",
+                        snapshotName, numLoaded);
+                    MetadataDelta delta = new MetadataDelta.Builder().
+                        setImage(image).
+                        build();
+                    SnapshotManifest manifest = loadSnapshot(delta, reader);

Review Comment:
   I think the better way to "skip loading metadata records for 0-0.checkpoint" 
is to modify `MetadataLoader#loadSnapshot()`, since this method is also doing 
other things like updating metrics that we want to keep doing for the 
`0-0.checkpoint`. 
   
   In `loadSnapshot()`, we can skip over iterating the `batch.records()` if the 
snapshot is the `0-0.checkpoint`, similar to what we're doing in 
`QuorumController#handleLoadSnapshot()`.



##########
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java:
##########
@@ -1628,6 +1629,7 @@ public void testActivationRecordsPartialBootstrap() {
             logMsg -> { },
             0L,
             BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_6_IV1, "test"),
+            false,

Review Comment:
   We should remove these booleans in this test file.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -443,7 +443,7 @@ private void updateListenersProgress(long highWatermark) {
     }
 
     private Optional<SnapshotReader<T>> latestSnapshot() {
-        return log.latestSnapshot().map(reader ->
+        Optional<SnapshotReader<T>> snapshot = log.latestSnapshot().map(reader 
->

Review Comment:
   Please revert the changes to this file.



##########
metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java:
##########
@@ -152,6 +154,19 @@ public void testDirectories(int numDirs) throws Exception {
         }
     }
 
+    @Test
+    public void testBrokerRoleSkipsBootstrapSnapshot() throws Exception {
+        try (TestEnv testEnv = new TestEnv(1)) {
+            FormatterContext context = testEnv.newFormatter();
+            context.formatter.setWriteBootstrapSnapshot(false);
+            context.formatter.run();
+            File clusterMetadataDir = new File(testEnv.directory(0), 
String.format("%s-%d",

Review Comment:
   `testEnv.directory(0)` is the cluster metadata directory in these tests. 
Look at `testEnv.newFormatter()`. When you create the file here, you are 
looking for `tempDirectoryPath/__cluster-metadata-0`, which does not exist.
   ```suggestion
               File clusterMetadataDir = new File(testEnv.directory(0), 
"00000000000000000000-0000000000.checkpoint");
   ```



##########
metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java:
##########
@@ -122,6 +124,17 @@ private BatchAndType 
nextControlBatch(FileChannelRecordBatch input) {
                         messages.add(new ApiMessageAndVersion(message, (short) 
0));
                         break;
                     }
+                    case KRAFT_VERSION: {
+                        KRaftVersionRecord message = new KRaftVersionRecord();
+                        message.read(new ByteBufferAccessor(record.value()), 
(short) 0);
+                        messages.add(new ApiMessageAndVersion(message, (short) 
0));
+                        break;
+                    }
+                    case KRAFT_VOTERS:
+                        VotersRecord message =  new VotersRecord();
+                        message.read(new ByteBufferAccessor(record.value()), 
(short) 0);
+                        messages.add(new ApiMessageAndVersion(message, (short) 
0));
+                        break;

Review Comment:
   Why did we add this? I think this should be removed. If it is causing a test 
failure, that indicates that we are reading KRaft control records in the 
`metadata` module, which is incorrect.



##########
metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectoryTest.java:
##########
@@ -39,9 +38,7 @@ public class BootstrapDirectoryTest {
     static final List<ApiMessageAndVersion> SAMPLE_RECORDS1 = List.of(
             new ApiMessageAndVersion(new FeatureLevelRecord().
                     setName(MetadataVersion.FEATURE_NAME).
-                    setFeatureLevel((short) 7), (short) 0),
-            new ApiMessageAndVersion(new NoOpRecord(), (short) 0),
-            new ApiMessageAndVersion(new NoOpRecord(), (short) 0));
+                    setFeatureLevel((short) 7), (short) 0));

Review Comment:
   We should revert the changes to this file, since I think we're in agreement 
this should only deal with the old `bootstrap.checkpoint` file.



##########
test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java:
##########
@@ -479,6 +482,14 @@ private void formatNode(
                 return;
             }
             
formatter.setReleaseVersion(nodes.bootstrapMetadata().metadataVersion());
+            Feature.PRODUCTION_FEATURES.forEach(feature -> {
+                String featureName = feature.featureName();
+                if (!MetadataVersion.FEATURE_NAME.equals(featureName)
+                    && !KRaftVersion.FEATURE_NAME.equals(featureName)) {
+                    short level = 
nodes.bootstrapMetadata().featureLevel(featureName);
+                    formatter.setFeatureLevel(featureName, level);
+                }
+            });

Review Comment:
   We should be able to remove the changes to this file.



##########
metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java:
##########
@@ -73,6 +76,28 @@ public BootstrapMetadata read() throws Exception {
         }
     }
 
+    public BootstrapMetadata read() throws Exception {
+        Path path = Paths.get(directoryPath);
+        if (!Files.isDirectory(path)) {
+            if (Files.exists(path)) {
+                throw new RuntimeException("Path " + directoryPath + " exists, 
but is not " +
+                        "a directory.");
+            } else {
+                throw new RuntimeException("No such directory as " + 
directoryPath);
+            }
+        }
+        Path binaryBootstrapPath = Paths.get(directoryPath, 
String.format("%s-%d",
+            CLUSTER_METADATA_TOPIC_PARTITION.topic(),
+            CLUSTER_METADATA_TOPIC_PARTITION.partition()),
+            BINARY_BOOTSTRAP_CHECKPOINT_FILENAME);
+        if (!Files.exists(binaryBootstrapPath)) {
+            return readFromConfiguration();
+        } else {
+            return readFromBinaryFile(binaryBootstrapPath.toString());
+        }
+        
+    }
+

Review Comment:
   It looks like a lot of tests call `BotostrapDirectory#read()`, but we want 
to avoid explicitly reading in `0-0.checkpoint` twice in the actual 
implementation, since we'll load it from KRaft in `QuorumController`.
   
   I think I'm okay with having 2 separate methods, since we should only call 
`read()` from tests to assert things about the `0-0.checkpoint` (which would 
otherwise be unreachable until KRaft loads it into memory) going forward.



##########
metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java:
##########
@@ -55,7 +58,7 @@ public BootstrapDirectory(
         this.directoryPath = Objects.requireNonNull(directoryPath);
     }
 
-    public BootstrapMetadata read() throws Exception {
+    public BootstrapMetadata readBootstrapCheckpoint() throws Exception {

Review Comment:
   Let's rename this to `maybeReadLegacyBootstrapCheckpoint()`



##########
metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java:
##########
@@ -42,6 +43,8 @@
 public class BootstrapDirectory {

Review Comment:
   Hmm, looked at `StorageToolTest` and `FormatterTest` again, and I am okay 
with your solution of having two separate methods. In that case, we should 
deprecate `maybeReadLegacyBootstrapCheckpoint()` and 
`BINARY_BOOTSTRAP_FILENAME`.



##########
kafka.code-workspace:
##########
@@ -0,0 +1,56 @@
+{

Review Comment:
   Please remove this and the other gradle files. I assume that cursor probably 
created them to run things locally? Look at the `README.md` for the `./gradlew` 
commands to run for testing/verifying locally.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to