dajac commented on a change in pull request #10007:
URL: https://github.com/apache/kafka/pull/10007#discussion_r568370315



##########
File path: 
clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
##########
@@ -1777,6 +1785,74 @@ public void 
testInsufficientScopeSaslOauthBearerMechanism() throws Exception {
                 "{\"status\":\"insufficient_scope\", 
\"scope\":\"[LOGIN_TO_KAFKA]\"}");
     }
 
+    @Test
+    public void testSslClientAuthDisabledForSaslSslListener() throws Exception 
{
+        verifySslClientAuthForSaslSslListener(true, SslClientAuth.NONE);
+    }
+
+    @Test
+    public void testSslClientAuthRequestedForSaslSslListener() throws 
Exception {
+        verifySslClientAuthForSaslSslListener(true, SslClientAuth.REQUESTED);
+    }
+
+    @Test
+    public void testSslClientAuthRequiredForSaslSslListener() throws Exception 
{
+        verifySslClientAuthForSaslSslListener(true, SslClientAuth.REQUIRED);
+    }
+
+    @Test
+    public void testSslClientAuthRequestedOverriddenForSaslSslListener() 
throws Exception {
+        verifySslClientAuthForSaslSslListener(false, SslClientAuth.REQUESTED);
+    }
+
+    @Test
+    public void testSslClientAuthRequiredOverriddenForSaslSslListener() throws 
Exception {
+        verifySslClientAuthForSaslSslListener(false, SslClientAuth.REQUIRED);
+    }
+
+    private void verifySslClientAuthForSaslSslListener(boolean 
useListenerPrefix,
+                                                       SslClientAuth 
configuredClientAuth) throws Exception {
+
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
+        configureMechanisms("PLAIN", Collections.singletonList("PLAIN"));
+        String listenerPrefix = useListenerPrefix ? 
ListenerName.forSecurityProtocol(securityProtocol).configPrefix() : "";
+        saslServerConfigs.put(listenerPrefix  + 
BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, configuredClientAuth.name());

Review comment:
       nit: There is an extra space before `+`.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
##########
@@ -112,6 +118,7 @@ private static ChannelBuilder create(SecurityProtocol 
securityProtocol,
                                          Time time,
                                          LogContext logContext) {
         Map<String, Object> configs = channelBuilderConfigs(config, 
listenerName);
+        String sslClientAuthOverride = null;

Review comment:
       This variable is only used for `SASL_SSL` and `SASL_PLAINTEXT`. Could we 
move its declaration within the `case` block?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
##########
@@ -129,6 +136,22 @@ private static ChannelBuilder create(SecurityProtocol 
securityProtocol,
                     jaasContexts = new HashMap<>(enabledMechanisms.size());
                     for (String mechanism : enabledMechanisms)
                         jaasContexts.put(mechanism, 
JaasContext.loadServerContext(listenerName, mechanism, configs));
+
+                    // SSL client authentication is enabled in brokers for 
SASL_SSL only if listener-prefixed config is specified.
+                    if (listenerName != null && securityProtocol == 
SecurityProtocol.SASL_SSL) {
+                        String configuredClientAuth = (String) 
configs.get(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG);
+                        String listenerClientAuth = (String) 
config.originalsWithPrefix(listenerName.configPrefix(), true)
+                                
.get(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG);

Review comment:
       Constructing the config for the listener prefix seems a bit wasteful 
just to check that a config exists. I wonder if this is really necessary and if 
we could check `listenerName.configPrefix() + 
BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG` in `config` directly instead. 
Maybe it does not matter too much here as we do this only once after all. What 
do you think? 

##########
File path: 
clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
##########
@@ -129,6 +136,22 @@ private static ChannelBuilder create(SecurityProtocol 
securityProtocol,
                     jaasContexts = new HashMap<>(enabledMechanisms.size());
                     for (String mechanism : enabledMechanisms)
                         jaasContexts.put(mechanism, 
JaasContext.loadServerContext(listenerName, mechanism, configs));
+
+                    // SSL client authentication is enabled in brokers for 
SASL_SSL only if listener-prefixed config is specified.
+                    if (listenerName != null && securityProtocol == 
SecurityProtocol.SASL_SSL) {
+                        String configuredClientAuth = (String) 
configs.get(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG);
+                        String listenerClientAuth = (String) 
config.originalsWithPrefix(listenerName.configPrefix(), true)
+                                
.get(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG);
+                        if (listenerClientAuth == null) {
+                            sslClientAuthOverride = 
SslClientAuth.NONE.name().toLowerCase(Locale.ROOT);

Review comment:
       I was a bit confused by this at first as I was wondering why 
`listenerClientAuth` is not set to `listenerClientAuth` here are well. `null` 
means that we let the ssl factory get the configuration from `configs` later 
on. We could perhaps add a comment to make this clearer.
   
   Alternatively, could we just override `SSL_CLIENT_AUTH_CONFIG` in `configs` 
directly here instead of doing it in `SslFactory#configure`? This might make 
the handling clearer here. I don't feel strong about this. What do you think?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
##########
@@ -129,6 +136,22 @@ private static ChannelBuilder create(SecurityProtocol 
securityProtocol,
                     jaasContexts = new HashMap<>(enabledMechanisms.size());
                     for (String mechanism : enabledMechanisms)
                         jaasContexts.put(mechanism, 
JaasContext.loadServerContext(listenerName, mechanism, configs));
+
+                    // SSL client authentication is enabled in brokers for 
SASL_SSL only if listener-prefixed config is specified.
+                    if (listenerName != null && securityProtocol == 
SecurityProtocol.SASL_SSL) {
+                        String configuredClientAuth = (String) 
configs.get(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG);
+                        String listenerClientAuth = (String) 
config.originalsWithPrefix(listenerName.configPrefix(), true)
+                                
.get(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG);

Review comment:
       Ah right. I can't think of a better way neither.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
##########
@@ -129,6 +136,22 @@ private static ChannelBuilder create(SecurityProtocol 
securityProtocol,
                     jaasContexts = new HashMap<>(enabledMechanisms.size());
                     for (String mechanism : enabledMechanisms)
                         jaasContexts.put(mechanism, 
JaasContext.loadServerContext(listenerName, mechanism, configs));
+
+                    // SSL client authentication is enabled in brokers for 
SASL_SSL only if listener-prefixed config is specified.
+                    if (listenerName != null && securityProtocol == 
SecurityProtocol.SASL_SSL) {
+                        String configuredClientAuth = (String) 
configs.get(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG);
+                        String listenerClientAuth = (String) 
config.originalsWithPrefix(listenerName.configPrefix(), true)
+                                
.get(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG);
+                        if (listenerClientAuth == null) {
+                            sslClientAuthOverride = 
SslClientAuth.NONE.name().toLowerCase(Locale.ROOT);

Review comment:
       That makes sense. I did not think about the reconfiguration case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to