This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ef9c9486dac KAFKA-14676: Include all SASL configs in login cache key 
to ensure clients in a JVM can use different OAuth configs (#13211)
ef9c9486dac is described below

commit ef9c9486dac8d6076e2897580b10a699697205ac
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
AuthorDate: Sun Feb 12 19:49:12 2023 +0000

    KAFKA-14676: Include all SASL configs in login cache key to ensure clients 
in a JVM can use different OAuth configs (#13211)
    
    We currently cache login managers in static maps for both static JAAS 
config using system property and for JAAS config specified using Kafka config 
sasl.jaas.config. In addition to the JAAS config, the login manager callback 
handler is included in the key, but all other configs are ignored. This 
implementation is based on the assumption clients that require different logins 
(e.g. username/password) use different JAAS configs, because login properties 
are included in the JAAS config ra [...]
    
    This PR includes all SASL configs prefixed with sasl. to be included in the 
key so that logins are only shared if all the sasl configs are identical.
    
    Reviewers: Manikumar Reddy <manikumar.re...@gmail.com>, Kirk True 
<k...@mustardgrain.com>
---
 .../security/authenticator/LoginManager.java       | 17 ++++++---
 .../security/authenticator/LoginManagerTest.java   | 43 ++++++++++++++++++++++
 2 files changed, 55 insertions(+), 5 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
index 6613fd147f8..f84a8ac9b75 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
@@ -99,14 +99,14 @@ public class LoginManager {
             LoginManager loginManager;
             Password jaasConfigValue = jaasContext.dynamicJaasConfig();
             if (jaasConfigValue != null) {
-                LoginMetadata<Password> loginMetadata = new 
LoginMetadata<>(jaasConfigValue, loginClass, loginCallbackClass);
+                LoginMetadata<Password> loginMetadata = new 
LoginMetadata<>(jaasConfigValue, loginClass, loginCallbackClass, configs);
                 loginManager = DYNAMIC_INSTANCES.get(loginMetadata);
                 if (loginManager == null) {
                     loginManager = new LoginManager(jaasContext, 
saslMechanism, configs, loginMetadata);
                     DYNAMIC_INSTANCES.put(loginMetadata, loginManager);
                 }
             } else {
-                LoginMetadata<String> loginMetadata = new 
LoginMetadata<>(jaasContext.name(), loginClass, loginCallbackClass);
+                LoginMetadata<String> loginMetadata = new 
LoginMetadata<>(jaasContext.name(), loginClass, loginCallbackClass, configs);
                 loginManager = STATIC_INSTANCES.get(loginMetadata);
                 if (loginManager == null) {
                     loginManager = new LoginManager(jaasContext, 
saslMechanism, configs, loginMetadata);
@@ -198,17 +198,23 @@ public class LoginManager {
         final T configInfo;
         final Class<? extends Login> loginClass;
         final Class<? extends AuthenticateCallbackHandler> loginCallbackClass;
+        final Map<String, Object> saslConfigs;
 
         LoginMetadata(T configInfo, Class<? extends Login> loginClass,
-                      Class<? extends AuthenticateCallbackHandler> 
loginCallbackClass) {
+                      Class<? extends AuthenticateCallbackHandler> 
loginCallbackClass,
+                      Map<String, ?> configs) {
             this.configInfo = configInfo;
             this.loginClass = loginClass;
             this.loginCallbackClass = loginCallbackClass;
+            this.saslConfigs = new HashMap<>();
+            configs.entrySet().stream()
+                    .filter(e -> e.getKey().startsWith("sasl."))
+                    .forEach(e -> saslConfigs.put(e.getKey(), e.getValue())); 
// value may be null
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(configInfo, loginClass, loginCallbackClass);
+            return Objects.hash(configInfo, loginClass, loginCallbackClass, 
saslConfigs);
         }
 
         @Override
@@ -219,7 +225,8 @@ public class LoginManager {
             LoginMetadata<?> loginMetadata = (LoginMetadata<?>) o;
             return Objects.equals(configInfo, loginMetadata.configInfo) &&
                    Objects.equals(loginClass, loginMetadata.loginClass) &&
-                   Objects.equals(loginCallbackClass, 
loginMetadata.loginCallbackClass);
+                   Objects.equals(loginCallbackClass, 
loginMetadata.loginCallbackClass) &&
+                   Objects.equals(saslConfigs, loginMetadata.saslConfigs);
         }
     }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/LoginManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/LoginManagerTest.java
index 92edfae77f9..3e3300e78e4 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/LoginManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/LoginManagerTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.security.authenticator;
 
+import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.security.JaasContext;
@@ -109,6 +110,48 @@ public class LoginManagerTest {
         verifyLoginManagerRelease(staticScramLogin, 2, scramJaasContext, 
configs);
     }
 
+    @Test
+    public void testLoginManagerWithDifferentConfigs() throws Exception {
+        Map<String, Object> configs1 = new HashMap<>();
+        configs1.put("sasl.jaas.config", dynamicPlainContext);
+        configs1.put("client.id", "client");
+        configs1.put(SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, 
"http://host1:1234";);
+        Map<String, Object> configs2 = new HashMap<>(configs1);
+        configs2.put(SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, 
"http://host2:1234";);
+        JaasContext dynamicContext = JaasContext.loadClientContext(configs1);
+        Map<String, Object> configs3 = new HashMap<>(configs1);
+        configs3.put("client.id", "client3");
+
+        LoginManager dynamicLogin1 = 
LoginManager.acquireLoginManager(dynamicContext, "PLAIN", DefaultLogin.class, 
configs1);
+        LoginManager dynamicLogin2 = 
LoginManager.acquireLoginManager(dynamicContext, "PLAIN", DefaultLogin.class, 
configs2);
+
+        assertEquals(dynamicPlainContext, dynamicLogin1.cacheKey());
+        assertEquals(dynamicPlainContext, dynamicLogin2.cacheKey());
+        assertNotSame(dynamicLogin1, dynamicLogin2);
+
+        assertSame(dynamicLogin1, 
LoginManager.acquireLoginManager(dynamicContext, "PLAIN", DefaultLogin.class, 
configs1));
+        assertSame(dynamicLogin2, 
LoginManager.acquireLoginManager(dynamicContext, "PLAIN", DefaultLogin.class, 
configs2));
+        assertSame(dynamicLogin1, 
LoginManager.acquireLoginManager(dynamicContext, "PLAIN", DefaultLogin.class, 
new HashMap<>(configs1)));
+        assertSame(dynamicLogin1, 
LoginManager.acquireLoginManager(dynamicContext, "PLAIN", DefaultLogin.class, 
configs3));
+
+
+        JaasContext staticContext = 
JaasContext.loadClientContext(Collections.emptyMap());
+        LoginManager staticLogin1 = 
LoginManager.acquireLoginManager(staticContext, "SCRAM-SHA-256", 
DefaultLogin.class, configs1);
+        LoginManager staticLogin2 = 
LoginManager.acquireLoginManager(staticContext, "SCRAM-SHA-256", 
DefaultLogin.class, configs2);
+        assertNotSame(staticLogin1, dynamicLogin1);
+        assertNotSame(staticLogin2, dynamicLogin2);
+        assertNotSame(staticLogin1, staticLogin2);
+        assertSame(staticLogin1, 
LoginManager.acquireLoginManager(staticContext, "SCRAM-SHA-256", 
DefaultLogin.class, configs1));
+        assertSame(staticLogin2, 
LoginManager.acquireLoginManager(staticContext, "SCRAM-SHA-256", 
DefaultLogin.class, configs2));
+        assertSame(staticLogin1, 
LoginManager.acquireLoginManager(staticContext, "SCRAM-SHA-256", 
DefaultLogin.class, new HashMap<>(configs1)));
+        assertSame(staticLogin1, 
LoginManager.acquireLoginManager(staticContext, "SCRAM-SHA-256", 
DefaultLogin.class, configs3));
+
+        verifyLoginManagerRelease(dynamicLogin1, 4, dynamicContext, configs1);
+        verifyLoginManagerRelease(dynamicLogin2, 2, dynamicContext, configs2);
+        verifyLoginManagerRelease(staticLogin1, 4, staticContext, configs1);
+        verifyLoginManagerRelease(staticLogin2, 2, staticContext, configs2);
+    }
+
     private void verifyLoginManagerRelease(LoginManager loginManager, int 
acquireCount, JaasContext jaasContext,
                                            Map<String, ?> configs) throws 
Exception {
 

Reply via email to