mumrah commented on code in PR #16892:
URL: https://github.com/apache/kafka/pull/16892#discussion_r1718794105


##########
raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java:
##########
@@ -279,19 +279,25 @@ private void handleBatch(Batch<?> batch, OptionalLong 
overrideOffset) {
             long currentOffset = overrideOffset.orElse(batch.baseOffset() + 
offsetDelta);
             switch (record.type()) {
                 case KRAFT_VOTERS:
+                    VoterSet voters = VoterSet.fromVotersRecord((VotersRecord) 
record.message());
+                    logger.info("Latest set of voters is {} at {}", voters, 
currentOffset);
                     synchronized (voterSetHistory) {
-                        voterSetHistory.addAt(currentOffset, 
VoterSet.fromVotersRecord((VotersRecord) record.message()));
+                        voterSetHistory.addAt(currentOffset, voters);
                     }
                     break;
 
                 case KRAFT_VERSION:
+                    KRaftVersion kraftVersion = KRaftVersion.fromFeatureLevel(
+                        ((KRaftVersionRecord) record.message()).kRaftVersion()
+                    );
+                    logger.info(
+                        "Latest {} is {} at {}",

Review Comment:
   These will look like: `Lastest kraft.version is 1 at 100`. Can we add 
"offset" in there to make it clear what the last number is?



##########
core/src/main/scala/kafka/server/KafkaConfig.scala:
##########
@@ -890,47 +890,49 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
 
     // validate KRaft-related configs
     val voterIds = QuorumConfig.parseVoterIds(quorumVoters)
-    def validateNonEmptyQuorumVotersForKRaft(): Unit = {
-      if (voterIds.isEmpty) {
-        throw new ConfigException(s"If using 
${KRaftConfigs.PROCESS_ROLES_CONFIG}, ${QuorumConfig.QUORUM_VOTERS_CONFIG} must 
contain a parseable set of voters.")
+    def validateQuorumVotersAndQuorumBootstrapServerForKRaft(): Unit = {
+      if (voterIds.isEmpty && quorumBootstrapServers.isEmpty) {
+        throw new ConfigException(
+          s"""If using ${KRaftConfigs.PROCESS_ROLES_CONFIG}, either 
${QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG} must
+          | contain the set of bootstrap controllers or 
${QuorumConfig.QUORUM_VOTERS_CONFIG} must contain a parseable
+          | set of controllers.""".stripMargin
+        )
       }
     }
-    def validateNonEmptyQuorumVotersForMigration(): Unit = {
-      if (voterIds.isEmpty) {
-        throw new ConfigException(s"If using 
${KRaftConfigs.MIGRATION_ENABLED_CONFIG}, ${QuorumConfig.QUORUM_VOTERS_CONFIG} 
must contain a parseable set of voters.")
+    def validateQuorumVotersAndQuorumBootstrapServerForMigration(): Unit = {
+      if (voterIds.isEmpty && quorumBootstrapServers.isEmpty) {
+        throw new ConfigException(
+          s"""If using ${KRaftConfigs.MIGRATION_ENABLED_CONFIG}, either 
${QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG} must
+          | contain the set of bootstrap controllers or 
${QuorumConfig.QUORUM_VOTERS_CONFIG} must contain a parseable
+          | set of controllers.""".stripMargin)
       }
     }
     def validateControlPlaneListenerEmptyForKRaft(): Unit = {
       require(controlPlaneListenerName.isEmpty,
         s"${SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG} is not 
supported in KRaft mode.")
     }
-    def 
validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker(): 
Unit = {
-      require(advertisedBrokerListenerNames.forall(aln => 
!controllerListenerNames.contains(aln.value())),
-        s"The advertised.listeners config must not contain KRaft controller 
listeners from ${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} when 
${KRaftConfigs.PROCESS_ROLES_CONFIG} contains the broker role because Kafka 
clients that send requests via advertised listeners do not send requests to 
KRaft controllers -- they only send requests to KRaft brokers.")
-    }
     def validateControllerQuorumVotersMustContainNodeIdForKRaftController(): 
Unit = {
-      require(voterIds.contains(nodeId),
+      require(voterIds.isEmpty || voterIds.contains(nodeId),
         s"If ${KRaftConfigs.PROCESS_ROLES_CONFIG} contains the 'controller' 
role, the node id $nodeId must be included in the set of voters 
${QuorumConfig.QUORUM_VOTERS_CONFIG}=${voterIds.asScala.toSet}")
     }
-    def validateControllerListenerExistsForKRaftController(): Unit = {
-      require(controllerListeners.nonEmpty,
+    def validateAdvertisedControllerListenersNonEmptyForKRaftController(): 
Unit = {
+      require(effectiveAdvertisedControllerListeners.nonEmpty,
         s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must contain at 
least one value appearing in the '${SocketServerConfigs.LISTENERS_CONFIG}' 
configuration when running the KRaft controller role")
     }
     def 
validateControllerListenerNamesMustAppearInListenersForKRaftController(): Unit 
= {
       val listenerNameValues = listeners.map(_.listenerName.value).toSet
       require(controllerListenerNames.forall(cln => 
listenerNameValues.contains(cln)),
         s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must only contain 
values appearing in the '${SocketServerConfigs.LISTENERS_CONFIG}' configuration 
when running the KRaft controller role")
     }
-    def validateAdvertisedListenersNonEmptyForBroker(): Unit = {
+    def validateAdvertisedBrokerListenersNonEmptyForBroker(): Unit = {
       require(advertisedBrokerListenerNames.nonEmpty,
-        "There must be at least one advertised listener." + (
+        "There must be at least one broker advertised listener." + (
           if (processRoles.contains(ProcessRole.BrokerRole)) s" Perhaps all 
listeners appear in ${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG}?" else ""))
     }
     if (processRoles == Set(ProcessRole.BrokerRole)) {
       // KRaft broker-only
-      validateNonEmptyQuorumVotersForKRaft()
+      validateQuorumVotersAndQuorumBootstrapServerForKRaft()
       validateControlPlaneListenerEmptyForKRaft()
-      
validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker()

Review Comment:
   Does remove this allow the broker to advertise the controller listener?



##########
raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java:
##########
@@ -83,13 +83,9 @@ public Optional<VoterSet> valueAtOrBefore(long offset) {
      * Returns the latest set of voters.
      */
     public VoterSet lastValue() {
-        Optional<LogHistory.Entry<VoterSet>> result = 
votersHistory.lastEntry();
-        if (result.isPresent()) {
-            return result.get().value();
-        }
-
-        return staticVoterSet
-            .orElseThrow(() -> new IllegalStateException("No voter set 
found"));
+        return votersHistory.lastEntry()
+            .map(LogHistory.Entry::value)
+            .orElse(staticVoterSet);

Review Comment:
   Is there any reason the caller would need to know that they are getting the 
static voters? 



##########
core/src/main/scala/kafka/server/KafkaConfig.scala:
##########
@@ -952,32 +954,29 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
       if (controllerListenerNames.size > 1) {
         warn(s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} has multiple 
entries; only the first will be used since 
${KRaftConfigs.PROCESS_ROLES_CONFIG}=broker: ${controllerListenerNames.asJava}")
       }
-      validateAdvertisedListenersNonEmptyForBroker()

Review Comment:
   Don't we still need brokers to advertise their listeners for clients?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to