kevin-wu24 commented on code in PR #20707:
URL: https://github.com/apache/kafka/pull/20707#discussion_r2511539025
##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1022,7 +1020,13 @@ public void
handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
Batch<ApiMessageAndVersion> batch = reader.next();
long offset = batch.lastOffset();
List<ApiMessageAndVersion> messages = batch.records();
-
+ if
(reader.snapshotId().equals(Snapshots.BOOTSTRAP_SNAPSHOT_ID)) {
+ // For bootstrap snapshots, extract feature levels
from all data records
+ if (!messages.isEmpty()) {
+ bootstrapMetadata =
BootstrapMetadata.fromRecords(messages, "bootstrap");
+ return;
+ }
+ }
Review Comment:
Let's add an in-line comment describing what we're doing here for KIP-1170,
since it is not super obvious to the reader.
```suggestion
// KIP-1170: The 0-0.checkpoint can contain metadata records. If it does,
they should be considered the bootstrap metadata for the cluster.
if (reader.snapshotId().equals(Snapshots.BOOTSTRAP_SNAPSHOT_ID) &&
!messages.isEmpty()) {
bootstrapMetadata = BootstrapMetadata.fromRecords(messages,
"bootstrap");
return;
}
```
##########
metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java:
##########
@@ -396,32 +396,34 @@ public void
handleCommit(BatchReader<ApiMessageAndVersion> reader) {
@Override
public void handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion>
reader) {
- eventQueue.append(() -> {
- try {
- long numLoaded = metrics.incrementHandleLoadSnapshotCount();
- String snapshotName =
Snapshots.filenameFromSnapshotId(reader.snapshotId());
- log.info("handleLoadSnapshot({}): incrementing
HandleLoadSnapshotCount to {}.",
- snapshotName, numLoaded);
- MetadataDelta delta = new MetadataDelta.Builder().
- setImage(image).
- build();
- SnapshotManifest manifest = loadSnapshot(delta, reader);
- log.info("handleLoadSnapshot({}): generated a metadata delta
between offset {} " +
- "and this snapshot in {} us.", snapshotName,
- image.provenance().lastContainedOffset(),
- NANOSECONDS.toMicros(manifest.elapsedNs()));
- MetadataImage image = delta.apply(manifest.provenance());
- batchLoader.resetToImage(image);
- maybePublishMetadata(delta, image, manifest);
- } catch (Throwable e) {
- // This is a general catch-all block where we don't expect to
end up;
- // failure-prone operations should have individual try/catch
blocks around them.
- faultHandler.handleFault("Unhandled fault in
MetadataLoader#handleLoadSnapshot. " +
- "Snapshot offset was " +
reader.lastContainedLogOffset(), e);
- } finally {
- reader.close();
- }
- });
+ if (!reader.snapshotId().equals(Snapshots.BOOTSTRAP_SNAPSHOT_ID)) {
+ eventQueue.append(() -> {
+ try {
+ long numLoaded =
metrics.incrementHandleLoadSnapshotCount();
+ String snapshotName =
Snapshots.filenameFromSnapshotId(reader.snapshotId());
+ log.info("handleLoadSnapshot({}): incrementing
HandleLoadSnapshotCount to {}.",
+ snapshotName, numLoaded);
+ MetadataDelta delta = new MetadataDelta.Builder().
+ setImage(image).
+ build();
+ SnapshotManifest manifest = loadSnapshot(delta, reader);
Review Comment:
I think the better way to "skip loading metadata records for 0-0.checkpoint"
is to modify `MetadataLoader#loadSnapshot()`, since this method is also doing
other things like updating metrics that we want to keep doing for the
`0-0.checkpoint`.
In `loadSnapshot()`, we can skip over iterating the `batch.records()` if the
snapshot is the `0-0.checkpoint`, similar to what we're doing in
`QuorumController#handleLoadSnapshot()`.
##########
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java:
##########
@@ -1628,6 +1629,7 @@ public void testActivationRecordsPartialBootstrap() {
logMsg -> { },
0L,
BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_6_IV1, "test"),
+ false,
Review Comment:
We should remove these booleans in this test file.
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -443,7 +443,7 @@ private void updateListenersProgress(long highWatermark) {
}
private Optional<SnapshotReader<T>> latestSnapshot() {
- return log.latestSnapshot().map(reader ->
+ Optional<SnapshotReader<T>> snapshot = log.latestSnapshot().map(reader
->
Review Comment:
Please revert the changes to this file.
##########
metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java:
##########
@@ -152,6 +154,19 @@ public void testDirectories(int numDirs) throws Exception {
}
}
+ @Test
+ public void testBrokerRoleSkipsBootstrapSnapshot() throws Exception {
+ try (TestEnv testEnv = new TestEnv(1)) {
+ FormatterContext context = testEnv.newFormatter();
+ context.formatter.setWriteBootstrapSnapshot(false);
+ context.formatter.run();
+ File clusterMetadataDir = new File(testEnv.directory(0),
String.format("%s-%d",
Review Comment:
`testEnv.directory(0)` is the cluster metadata directory in these tests.
Look at `testEnv.newFormatter()`. When you create the file here, you are
looking for `tempDirectoryPath/__cluster-metadata-0`, which does not exist.
```suggestion
File clusterMetadataDir = new File(testEnv.directory(0),
"00000000000000000000-0000000000.checkpoint");
```
##########
metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java:
##########
@@ -122,6 +124,17 @@ private BatchAndType
nextControlBatch(FileChannelRecordBatch input) {
messages.add(new ApiMessageAndVersion(message, (short)
0));
break;
}
+ case KRAFT_VERSION: {
+ KRaftVersionRecord message = new KRaftVersionRecord();
+ message.read(new ByteBufferAccessor(record.value()),
(short) 0);
+ messages.add(new ApiMessageAndVersion(message, (short)
0));
+ break;
+ }
+ case KRAFT_VOTERS:
+ VotersRecord message = new VotersRecord();
+ message.read(new ByteBufferAccessor(record.value()),
(short) 0);
+ messages.add(new ApiMessageAndVersion(message, (short)
0));
+ break;
Review Comment:
Why did we add this? I think this should be removed. If it is causing a test
failure, that indicates that we are reading KRaft control records in the
`metadata` module, which is incorrect.
##########
metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectoryTest.java:
##########
@@ -39,9 +38,7 @@ public class BootstrapDirectoryTest {
static final List<ApiMessageAndVersion> SAMPLE_RECORDS1 = List.of(
new ApiMessageAndVersion(new FeatureLevelRecord().
setName(MetadataVersion.FEATURE_NAME).
- setFeatureLevel((short) 7), (short) 0),
- new ApiMessageAndVersion(new NoOpRecord(), (short) 0),
- new ApiMessageAndVersion(new NoOpRecord(), (short) 0));
+ setFeatureLevel((short) 7), (short) 0));
Review Comment:
We should revert the changes to this file, since I think we're in agreement
this should only deal with the old `bootstrap.checkpoint` file.
##########
test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java:
##########
@@ -479,6 +482,14 @@ private void formatNode(
return;
}
formatter.setReleaseVersion(nodes.bootstrapMetadata().metadataVersion());
+ Feature.PRODUCTION_FEATURES.forEach(feature -> {
+ String featureName = feature.featureName();
+ if (!MetadataVersion.FEATURE_NAME.equals(featureName)
+ && !KRaftVersion.FEATURE_NAME.equals(featureName)) {
+ short level =
nodes.bootstrapMetadata().featureLevel(featureName);
+ formatter.setFeatureLevel(featureName, level);
+ }
+ });
Review Comment:
We should be able to remove the changes to this file.
##########
metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java:
##########
@@ -73,6 +76,28 @@ public BootstrapMetadata read() throws Exception {
}
}
+ public BootstrapMetadata read() throws Exception {
+ Path path = Paths.get(directoryPath);
+ if (!Files.isDirectory(path)) {
+ if (Files.exists(path)) {
+ throw new RuntimeException("Path " + directoryPath + " exists,
but is not " +
+ "a directory.");
+ } else {
+ throw new RuntimeException("No such directory as " +
directoryPath);
+ }
+ }
+ Path binaryBootstrapPath = Paths.get(directoryPath,
String.format("%s-%d",
+ CLUSTER_METADATA_TOPIC_PARTITION.topic(),
+ CLUSTER_METADATA_TOPIC_PARTITION.partition()),
+ BINARY_BOOTSTRAP_CHECKPOINT_FILENAME);
+ if (!Files.exists(binaryBootstrapPath)) {
+ return readFromConfiguration();
+ } else {
+ return readFromBinaryFile(binaryBootstrapPath.toString());
+ }
+
+ }
+
Review Comment:
It looks like a lot of tests call `BotostrapDirectory#read()`, but we want
to avoid explicitly reading in `0-0.checkpoint` twice in the actual
implementation, since we'll load it from KRaft in `QuorumController`.
I think I'm okay with having 2 separate methods, since we should only call
`read()` from tests to assert things about the `0-0.checkpoint` (which would
otherwise be unreachable until KRaft loads it into memory) going forward.
##########
metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java:
##########
@@ -55,7 +58,7 @@ public BootstrapDirectory(
this.directoryPath = Objects.requireNonNull(directoryPath);
}
- public BootstrapMetadata read() throws Exception {
+ public BootstrapMetadata readBootstrapCheckpoint() throws Exception {
Review Comment:
Let's rename this to `maybeReadLegacyBootstrapCheckpoint()`
##########
metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java:
##########
@@ -42,6 +43,8 @@
public class BootstrapDirectory {
Review Comment:
Hmm, looked at `StorageToolTest` and `FormatterTest` again, and I am okay
with your solution of having two separate methods. In that case, we should
deprecate `maybeReadLegacyBootstrapCheckpoint()` and
`BINARY_BOOTSTRAP_FILENAME`.
##########
kafka.code-workspace:
##########
@@ -0,0 +1,56 @@
+{
Review Comment:
Please remove this and the other gradle files. I assume that cursor probably
created them to run things locally? Look at the `README.md` for the `./gradlew`
commands to run for testing/verifying locally.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]