mannoopj commented on code in PR #20707:
URL: https://github.com/apache/kafka/pull/20707#discussion_r2728216580
##########
metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java:
##########
@@ -492,27 +497,36 @@ static DirectoryType calculate(
}
}
- static void writeDynamicQuorumSnapshot(
+ public static void writeBoostrapSnapshot(
String writeLogDir,
- DynamicVoters initialControllers,
+ BootstrapMetadata bootstrapMetadata,
+ Optional<DynamicVoters> initialControllers,
short kraftVersion,
String controllerListenerName
) {
- File parentDir = new File(writeLogDir);
- File clusterMetadataDirectory = new File(parentDir,
String.format("%s-%d",
- CLUSTER_METADATA_TOPIC_PARTITION.topic(),
- CLUSTER_METADATA_TOPIC_PARTITION.partition()));
- VoterSet voterSet =
initialControllers.toVoterSet(controllerListenerName);
- RecordsSnapshotWriter.Builder builder = new
RecordsSnapshotWriter.Builder().
- setLastContainedLogTimestamp(Time.SYSTEM.milliseconds()).
- setMaxBatchSizeBytes(KafkaRaftClient.MAX_BATCH_SIZE_BYTES).
- setRawSnapshotWriter(FileRawSnapshotWriter.create(
- clusterMetadataDirectory.toPath(),
- Snapshots.BOOTSTRAP_SNAPSHOT_ID)).
- setKraftVersion(KRaftVersion.fromFeatureLevel(kraftVersion)).
- setVoterSet(Optional.of(voterSet));
- try (RecordsSnapshotWriter<ApiMessageAndVersion> writer =
builder.build(new MetadataRecordSerde())) {
- writer.freeze();
+ try {
+ File parentDir = new File(writeLogDir);
+ File clusterMetadataDirectory = new File(parentDir,
String.format("%s-%d",
+ CLUSTER_METADATA_TOPIC_PARTITION.topic(),
+ CLUSTER_METADATA_TOPIC_PARTITION.partition()));
+ RecordsSnapshotWriter.Builder builder = new
RecordsSnapshotWriter.Builder().
+ setLastContainedLogTimestamp(Time.SYSTEM.milliseconds()).
+ setMaxBatchSizeBytes(KafkaRaftClient.MAX_BATCH_SIZE_BYTES).
+ setRawSnapshotWriter(FileRawSnapshotWriter.create(
+ clusterMetadataDirectory.toPath(),
+ Snapshots.BOOTSTRAP_SNAPSHOT_ID)).
+ setKraftVersion(KRaftVersion.fromFeatureLevel(kraftVersion));
+ if (initialControllers.isPresent()) {
+ VoterSet voterSet =
initialControllers.get().toVoterSet(controllerListenerName);
+ builder.setVoterSet(Optional.of(voterSet));
+ }
+ try (RecordsSnapshotWriter<ApiMessageAndVersion> writer =
builder.build(new MetadataRecordSerde())) {
+ writer.append(bootstrapMetadata.records());
+ writer.freeze();
+ }
+ } catch (UncheckedIOException e) {
Review Comment:
I added it I think originally to add more context for
(disk/path/permissions) causes of UncheckedIOException.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]