hachikuji commented on a change in pull request #11503:
URL: https://github.com/apache/kafka/pull/11503#discussion_r762175150



##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -748,7 +749,8 @@ object KafkaConfig {
     "Different security (SSL and SASL) settings can be configured for each 
listener by adding a normalised " +
     "prefix (the listener name is lowercased) to the config name. For example, 
to set a different keystore for the " +
     "INTERNAL listener, a config with name 
<code>listener.name.internal.ssl.keystore.location</code> would be set. " +
-    "If the config for the listener name is not set, the config will fallback 
to the generic config (i.e. <code>ssl.keystore.location</code>). "
+    "If the config for the listener name is not set, the config will fallback 
to the generic config (i.e. <code>ssl.keystore.location</code>). " +
+    "Note that in KRaft an additional default mapping CONTROLLER to PLAINTEXT 
is added."

Review comment:
       Can we generalize this comment now that the implementation has changed?

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1959,10 +1961,18 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
     }
   }
 
-  def listenerSecurityProtocolMap: Map[ListenerName, SecurityProtocol] = {
-    getMap(KafkaConfig.ListenerSecurityProtocolMapProp, 
getString(KafkaConfig.ListenerSecurityProtocolMapProp))
+  def effectiveListenerSecurityProtocolMap: Map[ListenerName, 
SecurityProtocol] = {
+    val mapValue = getMap(KafkaConfig.ListenerSecurityProtocolMapProp, 
getString(KafkaConfig.ListenerSecurityProtocolMapProp))
       .map { case (listenerName, protocolName) =>
-      ListenerName.normalised(listenerName) -> 
getSecurityProtocol(protocolName, KafkaConfig.ListenerSecurityProtocolMapProp)
+        ListenerName.normalised(listenerName) -> 
getSecurityProtocol(protocolName, KafkaConfig.ListenerSecurityProtocolMapProp)
+      }
+    if (usesSelfManagedQuorum && 
!originals.containsKey(ListenerSecurityProtocolMapProp)) {
+      // Nothing was specified explicitly, so we are using the default value; 
therefore, since we are using KRaft,
+      // add the PLAINTEXT mappings for all controller listener names that are 
not security protocols
+      mapValue ++ controllerListenerNames.filter(cln => cln.nonEmpty && 
!SecurityProtocol.values().exists(_.name.equals(cln))).map(

Review comment:
       Suppose that the user has the following configuration:
   
   ```
   listeners=SSL://9092,CONTROLLER://9093
   controller.listener.names=CONTROLLER
   ```
   
   With the change here, would we still default the CONTROLLER listener to 
PLAINTEXT? I can't shake the feeling that we would be better off forcing the 
user to be explicit about their intent. Perhaps in the one case when all of the 
other listeners are PLAINTEXT (as would be the case with quickstarts), then it 
is reasonable to also assume PLAINTEXT. But I'm inclined to be stricter if 
other security protocols are in use. What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to