[ https://issues.apache.org/jira/browse/KAFKA-13437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
RivenSun updated KAFKA-13437: ----------------------------- Affects Version/s: 3.0.0 > Broker parameter optimization: security.inter.broker.protocol and > num.network.threads > ------------------------------------------------------------------------------------- > > Key: KAFKA-13437 > URL: https://issues.apache.org/jira/browse/KAFKA-13437 > Project: Kafka > Issue Type: Improvement > Components: core > Affects Versions: 3.0.0 > Reporter: RivenSun > Priority: Major > > h1. 1. security.inter.broker.protocol > Firstly see this parameter comment > {code:java} > security.inter.broker.protocolSecurity protocol used to communicate between > brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. It is an > error to set this and inter.broker.listener.name properties at the same time. > {code} > We will not know from the comments, after using this configuration, the final > value of InterBrokerListenerName is the same as the value of > security.inter.broker.protocol. I originally thought it would find a suitable > listenerName from the listener.security.protocol.map configuration. > The result is: broker startup failed > > {code:java} > [2021-11-09 06:28:44,058] ERROR Exiting Kafka due to fatal exception > (kafka.Kafka$) > java.lang.IllegalArgumentException: requirement failed: > inter.broker.listener.name must be a listener name defined in > advertised.listeners. The valid options based on currently configured > listeners are SASL_PLAINTEXT,INTERNAL_SSL,PLAIN_PLUGIN_SSL > at scala.Predef$.require(Predef.scala:337) > at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1952) > at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1897) > at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1394) > at kafka.Kafka$.buildServer(Kafka.scala:67) > at kafka.Kafka$.main(Kafka.scala:87) > at kafka.Kafka.main(Kafka.scala) > {code} > > > h1. 2. num.network.threads > The networkThreads corresponding to this parameter are not shared by all > listeners, but each listener will create the same number of > networkProcessors, which causes the Kafka process to open too many > unnecessary threads, which leads to a waste of resources. > for example: > listenerNameA: used for communication between brokers > listenerNameB: used to connect production messages and fetch messages on the > client side > listenerNameC: Used by Kafka operation and maintenance personnel to manage > the cluster and send control type requests, such as deleting topics or adding > partitions, etc. > So as expected, the num.network.threads of listenerNameB should be increased, > and the networkThreads of the other two listeners can be appropriately reduced > > h1. Rootcause: > 1. See "getInterBrokerListenerNameAndSecurityProtocol" method in > KafkaConfig.scala > {code:java} > private def getInterBrokerListenerNameAndSecurityProtocol: (ListenerName, > SecurityProtocol) = { > Option(getString(KafkaConfig.InterBrokerListenerNameProp)) match { > case Some(_) if > originals.containsKey(KafkaConfig.InterBrokerSecurityProtocolProp) => > throw new ConfigException(s"Only one of > ${KafkaConfig.InterBrokerListenerNameProp} and " + > s"${KafkaConfig.InterBrokerSecurityProtocolProp} should be set.") > case Some(name) => > val listenerName = ListenerName.normalised(name) > val securityProtocol = > listenerSecurityProtocolMap.getOrElse(listenerName, > throw new ConfigException(s"Listener with name ${listenerName.value} > defined in " + > s"${KafkaConfig.InterBrokerListenerNameProp} not found in > ${KafkaConfig.ListenerSecurityProtocolMapProp}.")) > (listenerName, securityProtocol) > case None => > val securityProtocol = > getSecurityProtocol(getString(KafkaConfig.InterBrokerSecurityProtocolProp), > KafkaConfig.InterBrokerSecurityProtocolProp) > (ListenerName.forSecurityProtocol(securityProtocol), securityProtocol) > } > } {code} > ListenerName.forSecurityProtocol(securityProtocol) limits the value of > InterBrokerListenerName to the value of securityProtocol.name > 2. See "addDataPlaneProcessors" method in SocketServer.scala > In this method, processors of the size of newProcessorsPerListener are > created for each EndPoint, the value of newProcessorsPerListener is > config.numNetworkThreads > > h1. Suggestion > # Optimize the getInterBrokerListenerNameAndSecurityProtocol method. > Use listenerSecurityProtocolMap to find a suitable listenerName for > security.inter.broker.protocol. > If there are multiple keys in the listenerSecurityProtocolMap with > "mapKey.value='value of security.inter.broker.protocol'", the listenerName > corresponding to the first key is returned. > # The number of network threads can be configured separately for each > listenerName, refer to the definition of the parameter > sasl.server.callback.handler.class, > num.network.threads is used as the default value > *listener.name.\{listenerName}.num.network.threads* is used as the private > configuration of each listener. -- This message was sent by Atlassian Jira (v8.20.1#820001)