[ 
https://issues.apache.org/jira/browse/KAFKA-13437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

RivenSun updated KAFKA-13437:
-----------------------------
    Affects Version/s: 3.0.0

> Broker parameter optimization: security.inter.broker.protocol and 
> num.network.threads
> -------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13437
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13437
>             Project: Kafka
>          Issue Type: Improvement
>          Components: core
>    Affects Versions: 3.0.0
>            Reporter: RivenSun
>            Priority: Major
>
> h1. 1. security.inter.broker.protocol
> Firstly see this parameter comment
> {code:java}
> security.inter.broker.protocolSecurity protocol used to communicate between 
> brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. It is an 
> error to set this and inter.broker.listener.name properties at the same time. 
> {code}
> We will not know from the comments, after using this configuration, the final 
> value of InterBrokerListenerName is the same as the value of 
> security.inter.broker.protocol. I originally thought it would find a suitable 
> listenerName from the listener.security.protocol.map configuration.
> The result is: broker startup failed
>  
> {code:java}
> [2021-11-09 06:28:44,058] ERROR Exiting Kafka due to fatal exception 
> (kafka.Kafka$)
> java.lang.IllegalArgumentException: requirement failed: 
> inter.broker.listener.name must be a listener name defined in 
> advertised.listeners. The valid options based on currently configured 
> listeners are SASL_PLAINTEXT,INTERNAL_SSL,PLAIN_PLUGIN_SSL
>         at scala.Predef$.require(Predef.scala:337)
>         at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1952)
>         at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1897)
>         at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1394)
>         at kafka.Kafka$.buildServer(Kafka.scala:67)
>         at kafka.Kafka$.main(Kafka.scala:87)
>         at kafka.Kafka.main(Kafka.scala)
>  {code}
>  
>  
> h1. 2. num.network.threads
> The networkThreads corresponding to this parameter are not shared by all 
> listeners, but each listener will create the same number of 
> networkProcessors, which causes the Kafka process to open too many 
> unnecessary threads, which leads to a waste of resources.
> for example:
> listenerNameA: used for communication between brokers
> listenerNameB: used to connect production messages and fetch messages on the 
> client side
> listenerNameC: Used by Kafka operation and maintenance personnel to manage 
> the cluster and send control type requests, such as deleting topics or adding 
> partitions, etc.
> So as expected, the num.network.threads of listenerNameB should be increased, 
> and the networkThreads of the other two listeners can be appropriately reduced
>  
> h1. Rootcause:
> 1. See "getInterBrokerListenerNameAndSecurityProtocol" method in 
> KafkaConfig.scala
> {code:java}
> private def getInterBrokerListenerNameAndSecurityProtocol: (ListenerName, 
> SecurityProtocol) = {
>   Option(getString(KafkaConfig.InterBrokerListenerNameProp)) match {
>     case Some(_) if 
> originals.containsKey(KafkaConfig.InterBrokerSecurityProtocolProp) =>
>       throw new ConfigException(s"Only one of 
> ${KafkaConfig.InterBrokerListenerNameProp} and " +
>         s"${KafkaConfig.InterBrokerSecurityProtocolProp} should be set.")
>     case Some(name) =>
>       val listenerName = ListenerName.normalised(name)
>       val securityProtocol = 
> listenerSecurityProtocolMap.getOrElse(listenerName,
>         throw new ConfigException(s"Listener with name ${listenerName.value} 
> defined in " +
>           s"${KafkaConfig.InterBrokerListenerNameProp} not found in 
> ${KafkaConfig.ListenerSecurityProtocolMapProp}."))
>       (listenerName, securityProtocol)
>     case None =>
>       val securityProtocol = 
> getSecurityProtocol(getString(KafkaConfig.InterBrokerSecurityProtocolProp),
>         KafkaConfig.InterBrokerSecurityProtocolProp)
>       (ListenerName.forSecurityProtocol(securityProtocol), securityProtocol)
>   }
> } {code}
> ListenerName.forSecurityProtocol(securityProtocol) limits the value of 
> InterBrokerListenerName to the value of securityProtocol.name
> 2. See "addDataPlaneProcessors" method in SocketServer.scala
> In this method, processors of the size of newProcessorsPerListener are 
> created for each EndPoint, the value of newProcessorsPerListener is 
> config.numNetworkThreads
>  
> h1. Suggestion
>  # Optimize the getInterBrokerListenerNameAndSecurityProtocol method.
> Use listenerSecurityProtocolMap to find a suitable listenerName for 
> security.inter.broker.protocol.
> If there are multiple keys in the listenerSecurityProtocolMap with 
> "mapKey.value='value of security.inter.broker.protocol'", the listenerName 
> corresponding to the first key is returned.
>  # The number of network threads can be configured separately for each 
> listenerName, refer to the definition of the parameter 
> sasl.server.callback.handler.class,
> num.network.threads is used as the default value
> *listener.name.\{listenerName}.num.network.threads* is used as the private 
> configuration of each listener.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to