jsancio commented on code in PR #16230: URL: https://github.com/apache/kafka/pull/16230#discussion_r1639921798
########## raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java: ########## @@ -146,7 +147,7 @@ final public static class Builder { private Time time = Time.SYSTEM; private int maxBatchSize = 1024; private MemoryPool memoryPool = MemoryPool.NONE; - private short kraftVersion = 1; + private KRaftVersion kraftVersion = KRaftVersion.KRAFT_VERSION_0; // TODO Review Comment: TODO reminder. What it is missing to make 1 the default for the builder? ########## metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java: ########## @@ -73,6 +80,22 @@ public static Optional<MetadataVersion> recordToMetadataVersion(ApiMessage recor return Optional.empty(); } + private static ApiMessageAndVersion kraftVersionToRecord(KRaftVersion version) { + return new ApiMessageAndVersion(new FeatureLevelRecord(). + setName(KRaftVersion.FEATURE_NAME). + setFeatureLevel(version.featureLevel()), (short) 0); + } + + private static Optional<KRaftVersion> recordToKRaftVersion(ApiMessage record) { + if (record instanceof FeatureLevelRecord) { + FeatureLevelRecord featureLevel = (FeatureLevelRecord) record; + if (featureLevel.name().equals(KRaftVersion.FEATURE_NAME)) { + return Optional.of(KRaftVersion.fromFeatureLevel(featureLevel.featureLevel())); + } + } + return Optional.empty(); + } Review Comment: I don't think we should do this. The kraft.version is written to the cluster metadata log using the KRaftVersion control record. These are the cases when KRaft will write this control record: 1. The kafka-storage tool will write it to the checkpoint/snapshot if it being formatted to support kraft.version 1. [Storage tool changes for KIP-853](https://issues.apache.org/jira/browse/KAFKA-16518) 2. `KafkaRaftClient` will write the KRaftVersion control record to the checkpoint/snapshot using `RecordsSnapshotWriter.Builder` if the control record is part of the log segments. This is already in trunk. 3. `KafkaRaftClient` will write the `KRaftVersion` control record when handling `UpdateFeatures`. [UpdateFeatures for kraft.version](https://issues.apache.org/jira/browse/KAFKA-16538) 4. The first KRaft leader will write the `KRaftVersion` control record if it is in the bootstrapping checkpoint/snapshot but it is has never been written to the log segments. [Support for first leader bootstrapping the voter set](https://issues.apache.org/jira/browse/KAFKA-16532) I think that KRaftVersion should behave different from other feature level in that the KRaft version should be persisted and replicated using control records instead of metadata records. This is needed mainly because KRaft needs to read and process this record as uncommitted data. The controller and the metadata layer only sees committed records. ########## server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java: ########## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Collections; +import java.util.Map; + +public enum KRaftVersion implements FeatureVersion { + + // Version 1 enables KIP-853. + KRAFT_VERSION_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION), + KRAFT_VERSION_1(1, MetadataVersion.IBP_3_8_IV0); + + public static final String FEATURE_NAME = "kraft.version"; + + private final short featureLevel; + private final MetadataVersion bootstrapMetadataVersion; + + KRaftVersion( + int featureLevel, + MetadataVersion bootstrapMetadataVersion + ) { + this.featureLevel = (short) featureLevel; + this.bootstrapMetadataVersion = bootstrapMetadataVersion; + } + + @Override + public short featureLevel() { + return featureLevel; + } + + public static KRaftVersion fromFeatureLevel(short version) { + switch (version) { + case 0: + return KRAFT_VERSION_0; + case 1: + return KRAFT_VERSION_1; + default: + throw new RuntimeException("Unknown KRaft feature level: " + (int) version); + } + } + + @Override + public String featureName() { + return FEATURE_NAME; + } + + @Override + public MetadataVersion bootstrapMetadataVersion() { + return bootstrapMetadataVersion; + } + + @Override + public Map<String, Short> dependencies() { + return Collections.emptyMap(); + } + + public short quorumStateVersion() { + switch (this) { + case KRAFT_VERSION_0: + return (short) 0; + case KRAFT_VERSION_1: + return (short) 1; + } + throw new RuntimeException("Unknown KRaft feature level: " + this); + } Review Comment: Can we add a test for this? I want to warn developers from changing this method in an incompatible way. ########## server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java: ########## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Collections; +import java.util.Map; + +public enum KRaftVersion implements FeatureVersion { + + // Version 1 enables KIP-853. Review Comment: How about moving this comment right above `KRAFT_VERSION_1`? ########## metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java: ########## @@ -116,6 +139,34 @@ public BootstrapMetadata copyWithOnlyVersion() { metadataVersion, source); } + public BootstrapMetadata withMetadataVersion(MetadataVersion metadataVersion) { + List<ApiMessageAndVersion> newRecords = new ArrayList<>(); + for (ApiMessageAndVersion record : records) { + if (recordToMetadataVersion(record.message()).isPresent()) { + newRecords.add(metadataVersionToRecord(metadataVersion)); + } else { + newRecords.add(record); + } + } + return new BootstrapMetadata(newRecords, metadataVersion, source); + } + + public BootstrapMetadata withKRaftVersion(KRaftVersion version) { + List<ApiMessageAndVersion> newRecords = new ArrayList<>(); + boolean foundKRaftVersion = false; + for (ApiMessageAndVersion record : records) { + if (recordToKRaftVersion(record.message()).isPresent()) { + newRecords.add(kraftVersionToRecord(version)); + } else { + newRecords.add(record); + } + } + if (!foundKRaftVersion) { + newRecords.add(kraftVersionToRecord(version)); Review Comment: Yes. @dengziming 's comment is correct. See my other comment. ########## server-common/src/main/java/org/apache/kafka/server/common/Features.java: ########## @@ -40,7 +40,8 @@ public enum Features { * See {@link TestFeatureVersion} as an example. See {@link FeatureVersion} when implementing a new feature. */ TEST_VERSION("test.feature.version", TestFeatureVersion.values()), - GROUP_VERSION("group.version", GroupVersion.values()); + GROUP_VERSION("group.version", GroupVersion.values()), + KRAFT_VERSION("kraft.version", KRaftVersion.values()); Review Comment: Should we add a test that shows that ApiVersionsResponse returns what we expect and it includes the kraft.version? ########## metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java: ########## @@ -116,6 +139,34 @@ public BootstrapMetadata copyWithOnlyVersion() { metadataVersion, source); } + public BootstrapMetadata withMetadataVersion(MetadataVersion metadataVersion) { + List<ApiMessageAndVersion> newRecords = new ArrayList<>(); + for (ApiMessageAndVersion record : records) { + if (recordToMetadataVersion(record.message()).isPresent()) { + newRecords.add(metadataVersionToRecord(metadataVersion)); + } else { + newRecords.add(record); + } + } + return new BootstrapMetadata(newRecords, metadataVersion, source); + } + + public BootstrapMetadata withKRaftVersion(KRaftVersion version) { + List<ApiMessageAndVersion> newRecords = new ArrayList<>(); + boolean foundKRaftVersion = false; + for (ApiMessageAndVersion record : records) { + if (recordToKRaftVersion(record.message()).isPresent()) { + newRecords.add(kraftVersionToRecord(version)); Review Comment: See my comment but we should implement kraft.version in the metadata layer (metadata module). It should be implemented in the kraft layer (module). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org