jsancio commented on a change in pull request #11503: URL: https://github.com/apache/kafka/pull/11503#discussion_r751630771
########## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ########## @@ -2007,12 +2008,47 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami "offsets.commit.required.acks must be greater or equal -1 and less or equal to offsets.topic.replication.factor") require(BrokerCompressionCodec.isValid(compressionType), "compression.type : " + compressionType + " is not valid." + " Valid options are " + BrokerCompressionCodec.brokerCompressionOptions.mkString(",")) - require(!processRoles.contains(ControllerRole) || controllerListeners.nonEmpty, - s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty if the server has the controller role") + if (usesSelfManagedQuorum) { + require(controlPlaneListenerName.isEmpty, + s"${KafkaConfig.ControlPlaneListenerNameProp} is not supported in KRaft mode.") + if (processRoles.contains(ControllerRole)) { + // has controller role (and optionally broker role as well) + // controller.listener.names must be non-empty + // every one must appear in listeners + // each port appearing in controller.quorum.voters must match the port in exactly one controller listener + require(controllerListeners.nonEmpty, + s"${KafkaConfig.ControllerListenerNamesProp} must contain at least one value appearing in the '${KafkaConfig.ListenersProp}' configuration when running the KRaft controller role") + val listenerNameValues = listeners.map(_.listenerName.value).toSet + require(controllerListenerNames.forall(cln => listenerNameValues.contains(cln)), + s"${KafkaConfig.ControllerListenerNamesProp} must only contain values appearing in the '${KafkaConfig.ListenersProp}' configuration when running the KRaft controller role") + RaftConfig.parseVoterConnections(quorumVoters).asScala.foreach { case (nodeId, addressSpec) => + addressSpec match { + case inetAddressSpec: RaftConfig.InetAddressSpec => { + val quorumVotersPort = inetAddressSpec.address.getPort + val controllerListenersWithSamePort = controllerListeners.filter(_.port == quorumVotersPort) + require(controllerListenersWithSamePort.size == 1, + s"Port in ${KafkaConfig.QuorumVotersProp} for controller node with ${KafkaConfig.NodeIdProp}=$nodeId ($quorumVotersPort) does not match the port for any controller listener in ${KafkaConfig.ControllerListenerNamesProp}") + } + case _ => + } + } + } else { + // only broker role + // controller.listener.names must be non-empty + // none of them can appear in listeners + require(controllerListenerNames.exists(_.nonEmpty), + s"${KafkaConfig.ControllerListenerNamesProp} must contain at least one value when running KRaft with just the broker role") + require(controllerListeners.isEmpty, + s"${KafkaConfig.ControllerListenerNamesProp} must not contain a value appearing in the '${KafkaConfig.ListenersProp}' configuration when running KRaft with just the broker role") + } + } val advertisedListenerNames = advertisedListeners.map(_.listenerName).toSet val listenerNames = listeners.map(_.listenerName).toSet if (processRoles.isEmpty || processRoles.contains(BrokerRole)) { + require(advertisedListenerNames.nonEmpty, + "There must be at least one advertised listener." + ( + if (processRoles.contains(BrokerRole)) s" Perhaps all listeners appear in ${ControllerListenerNamesProp}?" else "")) Review comment: Why do we need this? Doesn't Kafka advertise all of the names in `listeners` if `advertisedListenerNames` is not set? ########## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ########## @@ -2007,12 +2008,47 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami "offsets.commit.required.acks must be greater or equal -1 and less or equal to offsets.topic.replication.factor") require(BrokerCompressionCodec.isValid(compressionType), "compression.type : " + compressionType + " is not valid." + " Valid options are " + BrokerCompressionCodec.brokerCompressionOptions.mkString(",")) - require(!processRoles.contains(ControllerRole) || controllerListeners.nonEmpty, - s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty if the server has the controller role") + if (usesSelfManagedQuorum) { + require(controlPlaneListenerName.isEmpty, + s"${KafkaConfig.ControlPlaneListenerNameProp} is not supported in KRaft mode.") + if (processRoles.contains(ControllerRole)) { + // has controller role (and optionally broker role as well) + // controller.listener.names must be non-empty + // every one must appear in listeners + // each port appearing in controller.quorum.voters must match the port in exactly one controller listener + require(controllerListeners.nonEmpty, + s"${KafkaConfig.ControllerListenerNamesProp} must contain at least one value appearing in the '${KafkaConfig.ListenersProp}' configuration when running the KRaft controller role") + val listenerNameValues = listeners.map(_.listenerName.value).toSet + require(controllerListenerNames.forall(cln => listenerNameValues.contains(cln)), + s"${KafkaConfig.ControllerListenerNamesProp} must only contain values appearing in the '${KafkaConfig.ListenersProp}' configuration when running the KRaft controller role") + RaftConfig.parseVoterConnections(quorumVoters).asScala.foreach { case (nodeId, addressSpec) => Review comment: This checks the port of every voter. We should only check the port of this voter/controller. Instead of `foreach` maybe `get(nodeId)`? This must be `Some` because I think we have another validation check for the id of this controller is in the voter set. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org