kevin-wu24 commented on code in PR #20707:
URL: https://github.com/apache/kafka/pull/20707#discussion_r2433217087
##########
metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java:
##########
Review Comment:
There is a clearer way to make these changes. This method is what writes the
`0-0.checkpoint` currently. We should pass the bootstrap metadata object here
and append the metadata records using
`RecordsSnapshotWriter.append(bootstrapMetadata.records())`.
##########
metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java:
##########
@@ -437,14 +438,40 @@ void doFormat(BootstrapMetadata bootstrapMetadata) throws
Exception {
directoryTypes.get(writeLogDir).description(), writeLogDir,
MetadataVersion.FEATURE_NAME, releaseVersion);
Files.createDirectories(Paths.get(writeLogDir));
- BootstrapDirectory bootstrapDirectory = new
BootstrapDirectory(writeLogDir);
+ File parentDir = new File(writeLogDir);
+ File clusterMetadataDirectory = new File(parentDir,
String.format("%s-%d",
+ CLUSTER_METADATA_TOPIC_PARTITION.topic(),
+ CLUSTER_METADATA_TOPIC_PARTITION.partition()));
+ Files.createDirectories(clusterMetadataDirectory.toPath());
+ BootstrapDirectory bootstrapDirectory = new
BootstrapDirectory(clusterMetadataDirectory.getPath());
bootstrapDirectory.writeBinaryFile(bootstrapMetadata);
if
(directoryTypes.get(writeLogDir).isDynamicMetadataDirectory()) {
- writeDynamicQuorumSnapshot(writeLogDir,
+
writeDynamicQuorumSnapshot(clusterMetadataDirectory.getPath(),
initialControllers.get(),
featureLevels.get(KRaftVersion.FEATURE_NAME),
controllerListenerName);
+ File createdBoostrapCheckpoint = new
File(clusterMetadataDirectory.getPath() + "/" +
BootstrapDirectory.BINARY_BOOTSTRAP_FILENAME);
+ File created000Checkpoint = new
File(clusterMetadataDirectory.getPath() + "/" +
BootstrapDirectory.BINARY_CHECKPOINT_FILENAME);
+ Files.write(
+ createdBoostrapCheckpoint.toPath(),
+ Files.readAllBytes(created000Checkpoint.toPath()),
+ StandardOpenOption.APPEND);
+ try {
+ created000Checkpoint.delete();
+
createdBoostrapCheckpoint.renameTo(created000Checkpoint);
+ } catch (Exception ex) {
+ throw new RuntimeException("Failed operation to
combine metadata and kraft records: ", ex);
+ }
+ } else {
+ File createdBoostrapCheckpoint = new
File(clusterMetadataDirectory.getPath() + "/" +
BootstrapDirectory.BINARY_BOOTSTRAP_FILENAME);
+ File created000Checkpoint = new
File(clusterMetadataDirectory.getPath() + "/" +
BootstrapDirectory.BINARY_CHECKPOINT_FILENAME);
+ try {
+
createdBoostrapCheckpoint.renameTo(created000Checkpoint);
+ } catch (Exception ex) {
+ throw new RuntimeException("Failed to rename file: ",
ex);
+ }
Review Comment:
This code is confusing. Instead of doing this renaming and deleting. We
should instead remove the call to write the bootstrap metadata to disk on Line
447, since we're no longer writing `bootstrap.checkpoint` anymore, and follow
the other comments for writing metadata records to `0-0.checkpoint`.
We can check if an old `bootstrap.checkpoint` exists and delete it, since
IIRC that was part of the KIP.
##########
metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java:
##########
@@ -437,14 +438,40 @@ void doFormat(BootstrapMetadata bootstrapMetadata) throws
Exception {
directoryTypes.get(writeLogDir).description(), writeLogDir,
MetadataVersion.FEATURE_NAME, releaseVersion);
Files.createDirectories(Paths.get(writeLogDir));
- BootstrapDirectory bootstrapDirectory = new
BootstrapDirectory(writeLogDir);
+ File parentDir = new File(writeLogDir);
+ File clusterMetadataDirectory = new File(parentDir,
String.format("%s-%d",
+ CLUSTER_METADATA_TOPIC_PARTITION.topic(),
+ CLUSTER_METADATA_TOPIC_PARTITION.partition()));
+ Files.createDirectories(clusterMetadataDirectory.toPath());
+ BootstrapDirectory bootstrapDirectory = new
BootstrapDirectory(clusterMetadataDirectory.getPath());
bootstrapDirectory.writeBinaryFile(bootstrapMetadata);
if
(directoryTypes.get(writeLogDir).isDynamicMetadataDirectory()) {
- writeDynamicQuorumSnapshot(writeLogDir,
+
writeDynamicQuorumSnapshot(clusterMetadataDirectory.getPath(),
Review Comment:
We should rename `writeDynamicQuorumSnapshot` to `writeZeroSnapshot`, and
always write it going forward. The semantics that change here are that we
should not write the KRaft control records (KRaft version and voter set) when
`!isDynamicMetadataDirectory()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]