mumrah commented on code in PR #16892:
URL: https://github.com/apache/kafka/pull/16892#discussion_r1719889001


##########
core/src/main/scala/kafka/server/KafkaConfig.scala:
##########
@@ -890,47 +890,49 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
 
     // validate KRaft-related configs
     val voterIds = QuorumConfig.parseVoterIds(quorumVoters)
-    def validateNonEmptyQuorumVotersForKRaft(): Unit = {
-      if (voterIds.isEmpty) {
-        throw new ConfigException(s"If using 
${KRaftConfigs.PROCESS_ROLES_CONFIG}, ${QuorumConfig.QUORUM_VOTERS_CONFIG} must 
contain a parseable set of voters.")
+    def validateQuorumVotersAndQuorumBootstrapServerForKRaft(): Unit = {
+      if (voterIds.isEmpty && quorumBootstrapServers.isEmpty) {
+        throw new ConfigException(
+          s"""If using ${KRaftConfigs.PROCESS_ROLES_CONFIG}, either 
${QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG} must
+          | contain the set of bootstrap controllers or 
${QuorumConfig.QUORUM_VOTERS_CONFIG} must contain a parseable
+          | set of controllers.""".stripMargin
+        )
       }
     }
-    def validateNonEmptyQuorumVotersForMigration(): Unit = {
-      if (voterIds.isEmpty) {
-        throw new ConfigException(s"If using 
${KRaftConfigs.MIGRATION_ENABLED_CONFIG}, ${QuorumConfig.QUORUM_VOTERS_CONFIG} 
must contain a parseable set of voters.")
+    def validateQuorumVotersAndQuorumBootstrapServerForMigration(): Unit = {
+      if (voterIds.isEmpty && quorumBootstrapServers.isEmpty) {
+        throw new ConfigException(
+          s"""If using ${KRaftConfigs.MIGRATION_ENABLED_CONFIG}, either 
${QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG} must
+          | contain the set of bootstrap controllers or 
${QuorumConfig.QUORUM_VOTERS_CONFIG} must contain a parseable
+          | set of controllers.""".stripMargin)
       }
     }
     def validateControlPlaneListenerEmptyForKRaft(): Unit = {
       require(controlPlaneListenerName.isEmpty,
         s"${SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG} is not 
supported in KRaft mode.")
     }
-    def 
validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker(): 
Unit = {
-      require(advertisedBrokerListenerNames.forall(aln => 
!controllerListenerNames.contains(aln.value())),
-        s"The advertised.listeners config must not contain KRaft controller 
listeners from ${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} when 
${KRaftConfigs.PROCESS_ROLES_CONFIG} contains the broker role because Kafka 
clients that send requests via advertised listeners do not send requests to 
KRaft controllers -- they only send requests to KRaft brokers.")
-    }
     def validateControllerQuorumVotersMustContainNodeIdForKRaftController(): 
Unit = {
-      require(voterIds.contains(nodeId),
+      require(voterIds.isEmpty || voterIds.contains(nodeId),
         s"If ${KRaftConfigs.PROCESS_ROLES_CONFIG} contains the 'controller' 
role, the node id $nodeId must be included in the set of voters 
${QuorumConfig.QUORUM_VOTERS_CONFIG}=${voterIds.asScala.toSet}")
     }
-    def validateControllerListenerExistsForKRaftController(): Unit = {
-      require(controllerListeners.nonEmpty,
+    def validateAdvertisedControllerListenersNonEmptyForKRaftController(): 
Unit = {
+      require(effectiveAdvertisedControllerListeners.nonEmpty,
         s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must contain at 
least one value appearing in the '${SocketServerConfigs.LISTENERS_CONFIG}' 
configuration when running the KRaft controller role")
     }
     def 
validateControllerListenerNamesMustAppearInListenersForKRaftController(): Unit 
= {
       val listenerNameValues = listeners.map(_.listenerName.value).toSet
       require(controllerListenerNames.forall(cln => 
listenerNameValues.contains(cln)),
         s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must only contain 
values appearing in the '${SocketServerConfigs.LISTENERS_CONFIG}' configuration 
when running the KRaft controller role")
     }
-    def validateAdvertisedListenersNonEmptyForBroker(): Unit = {
+    def validateAdvertisedBrokerListenersNonEmptyForBroker(): Unit = {
       require(advertisedBrokerListenerNames.nonEmpty,
-        "There must be at least one advertised listener." + (
+        "There must be at least one broker advertised listener." + (
           if (processRoles.contains(ProcessRole.BrokerRole)) s" Perhaps all 
listeners appear in ${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG}?" else ""))
     }
     if (processRoles == Set(ProcessRole.BrokerRole)) {
       // KRaft broker-only
-      validateNonEmptyQuorumVotersForKRaft()
+      validateQuorumVotersAndQuorumBootstrapServerForKRaft()
       validateControlPlaneListenerEmptyForKRaft()
-      
validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker()

Review Comment:
   Gotcha, so basically anything in `advertised.listeners` that is _not_ 
present in `controller.listener.names` will be considered a broker listener. 
Makes sense.



##########
raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java:
##########
@@ -82,7 +82,7 @@ public final class KRaftControlRecordStateMachine {
      * @param logContext the log context
      */
     public KRaftControlRecordStateMachine(
-        Optional<VoterSet> staticVoterSet,
+        VoterSet staticVoterSet,

Review Comment:
   I like how you coalesced the "empty" behavior down into VoterSet 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to