kevin-wu24 commented on code in PR #20707:
URL: https://github.com/apache/kafka/pull/20707#discussion_r2433217087


##########
metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java:
##########


Review Comment:
   There is a clearer way to make these changes. This method is what writes the 
`0-0.checkpoint` currently. We should pass the bootstrap metadata object here 
and append the metadata records using 
`RecordsSnapshotWriter.append(bootstrapMetadata.records())`. 



##########
metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java:
##########
@@ -437,14 +438,40 @@ void doFormat(BootstrapMetadata bootstrapMetadata) throws 
Exception {
                     directoryTypes.get(writeLogDir).description(), writeLogDir,
                     MetadataVersion.FEATURE_NAME, releaseVersion);
                 Files.createDirectories(Paths.get(writeLogDir));
-                BootstrapDirectory bootstrapDirectory = new 
BootstrapDirectory(writeLogDir);
+                File parentDir = new File(writeLogDir);
+                File clusterMetadataDirectory = new File(parentDir, 
String.format("%s-%d",
+                        CLUSTER_METADATA_TOPIC_PARTITION.topic(),
+                        CLUSTER_METADATA_TOPIC_PARTITION.partition()));
+                Files.createDirectories(clusterMetadataDirectory.toPath());
+                BootstrapDirectory bootstrapDirectory = new 
BootstrapDirectory(clusterMetadataDirectory.getPath());
                 bootstrapDirectory.writeBinaryFile(bootstrapMetadata);
                 if 
(directoryTypes.get(writeLogDir).isDynamicMetadataDirectory()) {
-                    writeDynamicQuorumSnapshot(writeLogDir,
+                    
writeDynamicQuorumSnapshot(clusterMetadataDirectory.getPath(),
                         initialControllers.get(),
                         featureLevels.get(KRaftVersion.FEATURE_NAME),
                         controllerListenerName);
+                    File createdBoostrapCheckpoint = new 
File(clusterMetadataDirectory.getPath() + "/" + 
BootstrapDirectory.BINARY_BOOTSTRAP_FILENAME);
+                    File created000Checkpoint = new 
File(clusterMetadataDirectory.getPath() + "/" + 
BootstrapDirectory.BINARY_CHECKPOINT_FILENAME);
+                    Files.write(
+                            createdBoostrapCheckpoint.toPath(),
+                            Files.readAllBytes(created000Checkpoint.toPath()),
+                            StandardOpenOption.APPEND);
+                    try {
+                        created000Checkpoint.delete();
+                        
createdBoostrapCheckpoint.renameTo(created000Checkpoint);
+                    } catch (Exception ex) {
+                        throw new RuntimeException("Failed operation to 
combine metadata and kraft records: ", ex);
+                    }
+                } else {
+                    File createdBoostrapCheckpoint = new 
File(clusterMetadataDirectory.getPath() + "/" + 
BootstrapDirectory.BINARY_BOOTSTRAP_FILENAME);
+                    File created000Checkpoint = new 
File(clusterMetadataDirectory.getPath() + "/" + 
BootstrapDirectory.BINARY_CHECKPOINT_FILENAME);
+                    try {
+                        
createdBoostrapCheckpoint.renameTo(created000Checkpoint);
+                    } catch (Exception ex) {
+                        throw new RuntimeException("Failed to rename file: ", 
ex);
+                    }

Review Comment:
   This code is confusing. Instead of doing this renaming and deleting. We 
should instead remove the call to write the bootstrap metadata to disk on Line 
447, since we're no longer writing `bootstrap.checkpoint` anymore, and follow 
the other comments for writing metadata records to `0-0.checkpoint`.
   
   We can check if an old `bootstrap.checkpoint` exists and delete it, since 
IIRC that was part of the KIP.



##########
metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java:
##########
@@ -437,14 +438,40 @@ void doFormat(BootstrapMetadata bootstrapMetadata) throws 
Exception {
                     directoryTypes.get(writeLogDir).description(), writeLogDir,
                     MetadataVersion.FEATURE_NAME, releaseVersion);
                 Files.createDirectories(Paths.get(writeLogDir));
-                BootstrapDirectory bootstrapDirectory = new 
BootstrapDirectory(writeLogDir);
+                File parentDir = new File(writeLogDir);
+                File clusterMetadataDirectory = new File(parentDir, 
String.format("%s-%d",
+                        CLUSTER_METADATA_TOPIC_PARTITION.topic(),
+                        CLUSTER_METADATA_TOPIC_PARTITION.partition()));
+                Files.createDirectories(clusterMetadataDirectory.toPath());
+                BootstrapDirectory bootstrapDirectory = new 
BootstrapDirectory(clusterMetadataDirectory.getPath());
                 bootstrapDirectory.writeBinaryFile(bootstrapMetadata);
                 if 
(directoryTypes.get(writeLogDir).isDynamicMetadataDirectory()) {
-                    writeDynamicQuorumSnapshot(writeLogDir,
+                    
writeDynamicQuorumSnapshot(clusterMetadataDirectory.getPath(),

Review Comment:
   We should rename `writeDynamicQuorumSnapshot` to `writeZeroSnapshot`, and 
always write it going forward. The semantics that change here are that we 
should not write the KRaft control records (KRaft version and voter set) when 
`!isDynamicMetadataDirectory()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to