jsancio commented on code in PR #16230:
URL: https://github.com/apache/kafka/pull/16230#discussion_r1670726026


##########
core/src/main/scala/kafka/server/BrokerServer.scala:
##########
@@ -192,7 +192,7 @@ class BrokerServer(
 
       logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
 
-      metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId)
+      metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId, () => 
raftManager.client.kraftVersion())

Review Comment:
   Did you try this instead? I think Scala should be able to infer that you 
want the lambda and not evaluate the method.
   ```java
         metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId, 
raftManager.client.kraftVersion)
   ```



##########
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##########
@@ -541,8 +545,12 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
 
   override def features(): FinalizedFeatures = {
     val image = _currentImage
+    val finalizedFeatures = new java.util.HashMap[String, java.lang.Short]
+    finalizedFeatures.putAll(image.features().finalizedVersions())

Review Comment:
   Not a big deal given the current size of `finalizedVtersions()` but maybe 
passing it to the constructor is better going forward.
   ```java
       val finalizedFeatures = new java.util.HashMap[String, java.lang.Short](
           image.features().finalizedVersions()
       )
   ```



##########
core/src/main/scala/kafka/server/ControllerServer.scala:
##########
@@ -162,7 +162,7 @@ class ControllerServer(
       authorizer = config.createNewAuthorizer()
       authorizer.foreach(_.configure(config.originals))
 
-      metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId)
+      metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId, () => 
raftManager.client.kraftVersion())

Review Comment:
   Same here. Did you try using this?
   ```java
         metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId, 
raftManager.client.kraftVersion)
   ```



##########
core/src/main/scala/kafka/server/MetadataCache.scala:
##########
@@ -121,7 +122,10 @@ object MetadataCache {
     new ZkMetadataCache(brokerId, metadataVersion, brokerFeatures, 
zkMigrationEnabled)
   }
 
-  def kRaftMetadataCache(brokerId: Int): KRaftMetadataCache = {
-    new KRaftMetadataCache(brokerId)
+  def kRaftMetadataCache(
+    brokerId: Int,
+    kraftVersionSupplier: Supplier[KRaftVersion] = () => 
KRaftVersion.KRAFT_VERSION_0

Review Comment:
   This default parameter value seems risky if used in `src/main`. Why do you 
want it to be a default parameter? Also, not using default parameter values 
will make it easier to refactor this code to Java when the time come to do that.



##########
raft/src/main/java/org/apache/kafka/raft/RaftClient.java:
##########
@@ -250,4 +251,11 @@ default void beginShutdown() {}
      * or 0 if there have not been any records written.
      */
     long logEndOffset();
+
+    /**
+     * Returns the current kraft.version.
+     *
+     * @return the current kraft.version.
+     */

Review Comment:
   If you agree with my previous comments, let's include that this method 
returns the uncommitted kraft.version.



##########
raft/src/test/java/org/apache/kafka/raft/FileQuorumStateStoreTest.java:
##########
@@ -64,7 +66,7 @@ void testWriteReadElectedLeader(short kraftVersion) throws 
IOException {
         );
 
         final Optional<ElectionState> expected;
-        if (kraftVersion == 1) {
+        if (kraftVersion.featureLevel() > 0) {

Review Comment:
   If you agree with one of my previous message about adding 
`KRaftVersion::isReconfigSupported` maybe we can use that predicate here. This 
comment applies to a few lines in `src/test`.



##########
raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java:
##########
@@ -258,7 +261,9 @@ private void handleBatch(Batch<?> batch, OptionalLong 
overrideOffset) {
 
                 case KRAFT_VERSION:
                     synchronized (kraftVersionHistory) {
-                        kraftVersionHistory.addAt(currentOffset, 
((KRaftVersionRecord) record.message()).kRaftVersion());
+                        kraftVersionHistory.addAt(currentOffset,
+                            KRaftVersion.fromFeatureLevel(
+                                ((KRaftVersionRecord) 
record.message()).kRaftVersion()));

Review Comment:
   Do you mind using this indentation in the `raft` module? Each parameter is 
on its own line:
   ```java
                           kraftVersionHistory.addAt(
                               currentOffset,
                               KRaftVersion.fromFeatureLevel(
                                   ((KRaftVersionRecord) 
record.message()).kRaftVersion()
                               )
                           );
   ```



##########
raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java:
##########
@@ -237,12 +238,12 @@ public <T> RecordsSnapshotWriter<T> build(RecordSerde<T> 
serde) {
                             
.setLastContainedLogTimestamp(lastContainedLogTimestamp)
                     );
 
-                    if (kraftVersion > 0) {
+                    if (kraftVersion != KRaftVersion.KRAFT_VERSION_0) {

Review Comment:
   How about moving this predicate to `KRaftVersion::isReconfigSupported`? I 
think we are going to use this predicate in a few places.



##########
server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Collections;
+import java.util.Map;
+
+public enum KRaftVersion implements FeatureVersion {
+
+    // Version 1 enables KIP-853.
+    KRAFT_VERSION_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION),
+    KRAFT_VERSION_1(1, MetadataVersion.IBP_3_8_IV0);
+
+    public static final String FEATURE_NAME = "kraft.version";
+
+    private final short featureLevel;
+    private final MetadataVersion bootstrapMetadataVersion;
+
+    KRaftVersion(
+        int featureLevel,
+        MetadataVersion bootstrapMetadataVersion
+    ) {
+        this.featureLevel = (short) featureLevel;
+        this.bootstrapMetadataVersion = bootstrapMetadataVersion;
+    }
+
+    @Override
+    public short featureLevel() {
+        return featureLevel;
+    }
+
+    public static KRaftVersion fromFeatureLevel(short version) {
+        switch (version) {
+            case 0:
+                return KRAFT_VERSION_0;
+            case 1:
+                return KRAFT_VERSION_1;
+            default:
+                throw new RuntimeException("Unknown KRaft feature level: " + 
(int) version);
+        }
+    }
+
+    @Override
+    public String featureName() {
+        return FEATURE_NAME;
+    }
+
+    @Override
+    public MetadataVersion bootstrapMetadataVersion() {
+        return bootstrapMetadataVersion;
+    }
+
+    @Override
+    public Map<String, Short> dependencies() {
+        return Collections.emptyMap();
+    }
+
+    public short quorumStateVersion() {
+        switch (this) {
+            case KRAFT_VERSION_0:
+                return (short) 0;
+            case KRAFT_VERSION_1:
+                return (short) 1;
+        }
+        throw new RuntimeException("Unknown KRaft feature level: " + this);
+    }

Review Comment:
   I wanted to make sure that there is a test that fails if someone adds a 
KRaftVersion without a matching `quorumStateVersion()` version.



##########
server-common/src/main/java/org/apache/kafka/server/common/Features.java:
##########
@@ -40,7 +40,8 @@ public enum Features {
      * See {@link TestFeatureVersion} as an example. See {@link 
FeatureVersion} when implementing a new feature.
      */
     TEST_VERSION("test.feature.version", TestFeatureVersion.values()),
-    GROUP_VERSION("group.version", GroupVersion.values());
+    GROUP_VERSION("group.version", GroupVersion.values()),
+    KRAFT_VERSION("kraft.version", KRaftVersion.values());

Review Comment:
   Sounds good. Thanks!



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3027,6 +3028,11 @@ public long logEndOffset() {
         return log.endOffset().offset();
     }
 
+    @Override
+    public KRaftVersion kraftVersion() {
+        return 
partitionState.kraftVersionAtOffset(quorum.highWatermark().get().offset());
+    }

Review Comment:
   Hmm, are we guarantee that the HWM is known when `kraftVersion()` is called? 
It is possible that the HWM is not known depending of when this method is 
called.
   
   I was originally thinking of exposing the uncommitted `kraftVersion()`. This 
is probably okay since in most cases it is only used to describe features. In 
that case we can call `partitionState.lastKraftVersion()` and keep the current 
method signature.
   
   If we want to expose the committed kraft.version then we should change the 
signature to `Optional<KRaftVersion> kraftVersion()`. The offset used should be 
`quorum.highWatermark.map(hwm -> hwm.offset() - 1)`. The HWM is an exclusive 
offset, the offset after the latest committed offset. While `...AtOffset(long)` 
is inclusive. I know that HWM is exclusive because this is the invariant for it 
`startOffset <= HWM <= LEO` in the log layer. `LEO` is exclusive and Kafka 
needs to be able to express that the `startOffset` is not committed. E.g.
   
   1. 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/UnifiedLog.scala#L144
   2. 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/UnifiedLog.scala#L1859



##########
raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java:
##########
@@ -140,9 +143,9 @@ public VoterSet lastVoterSet() {
     /**
      * Returns the last kraft version.
      */
-    public short lastKraftVersion() {
+    public KRaftVersion lastKraftVersion() {
         synchronized (kraftVersionHistory) {
-            return 
kraftVersionHistory.lastEntry().map(LogHistory.Entry::value).orElse((short) 0);
+            return 
kraftVersionHistory.lastEntry().map(LogHistory.Entry::value).orElse(KRAFT_VERSION_0);

Review Comment:
   Minor but I tend to prefer removing the `import static` and using 
`KRaftVersion.KRAFT_VERSION_0`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to