[ 
https://issues.apache.org/jira/browse/KAFKA-13422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17444438#comment-17444438
 ] 

Rajini Sivaram commented on KAFKA-13422:
----------------------------------------

[~RivenSun] We should have probably added that check earlier. But changing it 
now from picking the first one to disallowing multiple may be a breaking 
change. We can log an error instead of throwing exception if that helps.

> Even if the correct username and password are configured, when ClientBroker 
> or KafkaClient tries to establish a SASL connection to ServerBroker, an 
> exception is thrown: (Authentication failed: Invalid username or password)
> ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13422
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13422
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, core
>    Affects Versions: 2.7.1, 3.0.0
>            Reporter: RivenSun
>            Priority: Major
>         Attachments: CustomerAuthCallbackHandler.java, 
> LoginContext_login_debug.png, SaslClientCallbackHandler_handle_debug.png
>
>
>  
> h1. Foreword:
> When deploying a Kafka cluster with a higher version (2.7.1), I encountered 
> an exception of communication identity authentication failure between 
> brokers. In the current latest version 3.0.0, this problem can also be 
> reproduced.
> h1. Problem recurring:
> h2. 1)broker Version is 3.0.0
> h3. The content of kafka_server_jaas.conf of each broker is exactly the same, 
> the content is as follows:
>  
>  
> {code:java}
> KafkaServer {
>   org.apache.kafka.common.security.plain.PlainLoginModule required
>   username="admin"
>   password="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
>   user_admin="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
>   user_alice="alice";
>   org.apache.kafka.common.security.scram.ScramLoginModule required
>   username="admin_scram"
>   password="admin_scram_password";
>  
> };
> {code}
>  
>  
> h3. broker server.properties:
> One of the broker configuration files is provided, and the content of the 
> configuration files of other brokers is only different from the localPublicIp 
> of advertised.listeners.
>  
> {code:java}
> broker.id=1
> broker.rack=us-east-1a
> advertised.listeners=SASL_PLAINTEXT://localPublicIp:9779,SASL_SSL://localPublicIp:9889,INTERNAL_SSL://:9009,PLAIN_PLUGIN_SSL://localPublicIp:9669
> log.dirs=/asyncmq/kafka/data_1,/asyncmq/kafka/data_2
> zookeeper.connect=***
> listeners=SASL_PLAINTEXT://:9779,SASL_SSL://:9889,INTERNAL_SSL://:9009,PLAIN_PLUGIN_SSL://:9669
> listener.security.protocol.map=INTERNAL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,PLAIN_PLUGIN_SSL:SASL_SSL
> listener.name.plain_plugin_ssl.plain.sasl.server.callback.handler.class=org.apache.kafka.common.security.plain.internals.PlainServerCallbackHandler
> #ssl config
> ssl.keystore.password=***
> ssl.key.password=***
> ssl.truststore.password=***
> ssl.keystore.location=***
> ssl.truststore.location=***
> ssl.client.auth=none
> ssl.endpoint.identification.algorithm=
> #broker communicate config
> #security.inter.broker.protocol=SASL_PLAINTEXT
> inter.broker.listener.name=INTERNAL_SSL
> sasl.mechanism.inter.broker.protocol=PLAIN
> #sasl authentication config
> sasl.kerberos.service.name=kafka
> sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,GSSAPI
> delegation.token.master.key=***
> delegation.token.expiry.time.ms=86400000
> delegation.token.max.lifetime.ms=3153600000000
> {code}
>  
>  
> Then start all brokers at the same time. Each broker has actually been 
> started successfully, but when establishing a connection between the 
> controller node and all brokers, the identity authentication has always 
> failed. The connection between brokers cannot be established normally, 
> causing the entire Kafka cluster to be unable to provide external services.
> h3. The server log keeps printing abnormally like crazy:
> The real ip sensitive information of the broker in the log, I use ****** 
> instead of here
>  
> {code:java}
> [2021-10-29 14:16:19,831] INFO [SocketServer listenerType=ZK_BROKER, 
> nodeId=3] Started socket server acceptors and processors 
> (kafka.network.SocketServer)
> [2021-10-29 14:16:19,836] INFO Kafka version: 3.0.0 
> (org.apache.kafka.common.utils.AppInfoParser)
> [2021-10-29 14:16:19,836] INFO Kafka commitId: 8cb0a5e9d3441962 
> (org.apache.kafka.common.utils.AppInfoParser)
> [2021-10-29 14:16:19,836] INFO Kafka startTimeMs: 1635516979831 
> (org.apache.kafka.common.utils.AppInfoParser)
> [2021-10-29 14:16:19,837] INFO [KafkaServer id=3] started 
> (kafka.server.KafkaServer)
> [2021-10-29 14:16:20,249] INFO [SocketServer listenerType=ZK_BROKER, 
> nodeId=3] Failed authentication with /****** (Authentication failed: Invalid 
> username or password) (org.apache.kafka.common.network.Selector)
> [2021-10-29 14:16:20,680] INFO [SocketServer listenerType=ZK_BROKER, 
> nodeId=3] Failed authentication with /****** (Authentication failed: Invalid 
> username or password) (org.apache.kafka.common.network.Selector)
> [2021-10-29 14:16:21,109] INFO [SocketServer listenerType=ZK_BROKER, 
> nodeId=3] Failed authentication with /****** (Authentication failed: Invalid 
> username or password) (org.apache.kafka.common.network.Selector)
> {code}
>  
>  
> h2. 2)Try to change the password of the PlainLoginModule communication 
> between brokers in kafka_server_jaas.conf
> change is kJTVDziatPgjXG82sFHc4O1EIuewmlvS --> DziatPgjXG82sFHc4O1EIuewmlvS
>  
> {code:java}
> KafkaServer {
>   org.apache.kafka.common.security.plain.PlainLoginModule required
>   username="admin"
>   password="DziatPgjXG82sFHc4O1EIuewmlvS"
>   user_admin="DziatPgjXG82sFHc4O1EIuewmlvS"
>   user_alice="alice";
>   org.apache.kafka.common.security.scram.ScramLoginModule required
>   username="admin_scram"
>   password="admin_scram_password";
>  
> };
> {code}
>  
>  
>  
> Restart all broker machines and find that connections can be established 
> normally between the controller and all brokers, which can be verified by the 
> netstat -anp | grep 9009 command
> Real ip sensitive information, I use ****** instead
>  
> {code:java}
> [root@ip-10-30-0-64 kafka]# netstat -anp | grep 9009
> tcp6       0      0 :::9009                 :::*                    LISTEN    
>   24852/java          
> tcp6       0      0 ******:47502        ******:9009         ESTABLISHED 
> 24852/java          
> tcp6       0      0 ******:9009         ******:41164        ESTABLISHED 
> 24852/java          
> tcp6       0      0 ******:9009         ******:41168        ESTABLISHED 
> 24852/java 
> {code}
>  
> The entire cluster can provide external services, which can be verified by 
> creating a topic through the script bin/kafka-topics.sh.
> h1. Preliminary guess:
> 1)Does the admin password kJTVDziatPgjXG82sFHc4O1EIuewmlvS contain special 
> characters not allowed by Kafka password?
> 2)Whether the content of the kafka_server_jaas.conf file does not conform to 
> the standard format of the Kafka official website?
> 3)Whether the end newline character of each line in kafka_server_jaas.conf 
> does not conform to the newline character of the Linux system?
> After consulting data and analysis, the above conjectures are not correct
> 1)kJTVDziatPgjXG82sFHc4O1EIuewmlvS does not contain special characters not 
> allowed by Kafka password
> 2)The content of the kafka_server_jaas.conf file conforms to the standard 
> format of Kafka official website, please refer 
> to[https://kafka.apache.org/documentation/#security_sasl]
> 3)Open the kafka_server_jaas.conf file through the vim command, and through 
> the :set list command, you can see that the end newline character of each 
> line is the standard linux system file newline character
>  
> So in order to improve the analysis of the reasons, try to think that the log 
> keeps outputting Invalid username or password, what is the username&password 
> passed by ClientBroker? What is the username&password expected by 
> ServerBroker?
>  
> h1. sasl.server.callback.handler.class implement
>  
> Refer to the 
> [sasl.server.callback.handler.class|https://kafka.apache.org/documentation/#brokerconfigs_sasl.server.callback.handler.class]
>  parameter and write the implementation class of the interface 
> AuthenticateCallbackHandler :CustomerAuthCallbackHandler.
> In fact, the code implementation of CustomerAuthCallbackHandler is almost 
> exactly the same as Kafka's own default implementation class 
> PlainServerCallbackHandler, except that the log output is temporarily added 
> to the native authenticate method. The complete code is included in the 
> attachment"CustomerAuthCallbackHandler.java"
>   
> {code:java}
> private final static Logger log = 
> LoggerFactory.getLogger(CustomerAuthCallbackHandler.class);
> ......
> protected boolean authenticate(String username, char[] password) throws 
> IOException {
>         if (username == null)
>             return false;
>         else {
>             String expectedPassword = 
> JaasContext.configEntryOption(jaasConfigEntries,
>                     JAAS_USER_PREFIX + username,
>                     PlainLoginModule.class.getName());
>             boolean authenticateSuccess = expectedPassword != null && 
> Utils.isEqualConstantTime(password, expectedPassword.toCharArray());
>             log.info("CustomerAuthCallbackHandler authenticate [{}] | user 
> [{}] password is [{}] , expectedPassword is [{}] ", authenticateSuccess, 
> username, new String(password), expectedPassword);
>             return authenticateSuccess;
>         }
>     }
> {code}
>  
>  
>  
>  
> Each broker's configuration file server.properties adds a new line of 
> configuration
>  
> {code:java}
> listener.name.internal_ssl.plain.sasl.server.callback.handler.class=us.zoom.mq.security.plain.CustomerAuthCallbackHandler
> {code}
>  
> Rollback the password of admin in the content of kafka_server_jaas.conf, the 
> content is as follows:
>  
> {code:java}
> KafkaServer {
>   org.apache.kafka.common.security.plain.PlainLoginModule required
>   username="admin"
>   password="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
>   user_admin="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
>   user_alice="alice";
>   org.apache.kafka.common.security.scram.ScramLoginModule required
>   username="admin_scram"
>   password="admin_scram_password";
>  
> };
> {code}
>  
>  
> Restart all brokers, you can observe the log:
> The real ip sensitive information of the broker in the log, I use ****** 
> instead of here
>  
> {code:java}
> [2021-10-29 15:31:09,886] INFO [SocketServer listenerType=ZK_BROKER, 
> nodeId=3] Started data-plane acceptor and processor(s) for endpoint : 
> ListenerName(SASL_PLAINTEXT) (kafka.network.SocketServer)
> [2021-10-29 15:31:09,925] INFO [SocketServer listenerType=ZK_BROKER, 
> nodeId=3] Started data-plane acceptor and processor(s) for endpoint : 
> ListenerName(PLAIN_PLUGIN_SSL) (kafka.network.SocketServer)
> [2021-10-29 15:31:09,926] INFO [SocketServer listenerType=ZK_BROKER, 
> nodeId=3] Started socket server acceptors and processors 
> (kafka.network.SocketServer)
> [2021-10-29 15:31:09,932] INFO Kafka version: 3.0.0 
> (org.apache.kafka.common.utils.AppInfoParser)
> [2021-10-29 15:31:09,932] INFO Kafka commitId: 8cb0a5e9d3441962 
> (org.apache.kafka.common.utils.AppInfoParser)
> [2021-10-29 15:31:09,932] INFO Kafka startTimeMs: 1635521469926 
> (org.apache.kafka.common.utils.AppInfoParser)
> [2021-10-29 15:31:09,933] INFO [KafkaServer id=3] started 
> (kafka.server.KafkaServer)
> [2021-10-29 15:31:10,305] INFO CustomerAuthCallbackHandler authenticate 
> [false] | user [admin] password is [admin_scram_password] , expectedPassword 
> is [kJTVDziatPgjXG82sFHc4O1EIuewmlvS]  
> (us.zoom.mq.security.plain.CustomerAuthCallbackHandler)
> [2021-10-29 15:31:10,306] INFO [SocketServer listenerType=ZK_BROKER, 
> nodeId=3] Failed authentication with /****** (Authentication failed: Invalid 
> username or password) (org.apache.kafka.common.network.Selector)
> [2021-10-29 15:31:10,734] INFO CustomerAuthCallbackHandler authenticate 
> [false] | user [admin] password is [admin_scram_password] , expectedPassword 
> is [kJTVDziatPgjXG82sFHc4O1EIuewmlvS]  
> (us.zoom.mq.security.plain.CustomerAuthCallbackHandler)
> [2021-10-29 15:31:10,735] INFO [SocketServer listenerType=ZK_BROKER, 
> nodeId=3] Failed authentication with /****** (Authentication failed: Invalid 
> username or password) (org.apache.kafka.common.network.Selector)
> [2021-10-29 15:31:11,165] INFO CustomerAuthCallbackHandler authenticate 
> [false] | user [admin] password is [admin_scram_password] , expectedPassword 
> is [kJTVDziatPgjXG82sFHc4O1EIuewmlvS]  
> (us.zoom.mq.security.plain.CustomerAuthCallbackHandler)
> [2021-10-29 15:31:11,165] INFO [SocketServer listenerType=ZK_BROKER, 
> nodeId=3] Failed authentication with /****** (Authentication failed: Invalid 
> username or password) (org.apache.kafka.common.network.Selector)
> [2021-10-29 15:31:11,596] INFO CustomerAuthCallbackHandler authenticate 
> [false] | user [admin] password is [admin_scram_password] , expectedPassword 
> is [kJTVDziatPgjXG82sFHc4O1EIuewmlvS]  
> (us.zoom.mq.security.plain.CustomerAuthCallbackHandler)
> {code}
>  
>  
> Through the log, we clearly know that the reason for Authentication failed is 
> that the username passed by ClientBroker is correct when the connection 
> between brokers is established, but the password and expectedPassword do not 
> match, and the value of password is configured in the ScramLoginModule in the 
> kafka_server_jaas.conf file password.
> At this point of analysis, we can only initially understand that the password 
> value passed by ClientBroker is wrong, but we still don't know the reason for 
> the wrong password. We can only continue to analyze RC by reading the source 
> code of the Kafka server startup process.
>  
> h1. Kafka server Startup process source code analysis
> The startup entry is in the core project, the main method of the scala class 
> kafka.Kafka
> KafkaServer's startup(), this method completes the initialization of many key 
> modules, such as logManager, socketServer, kafkaController, tokenManager, 
> groupCoordinator and other modules
> In the startup() method of KafkaServer, and the key pieces of code for this 
> problem as follows:
> 1)socketServer.startup(startProcessingRequests = false)
> This method completes the creation of Acceptor and Processors for 
> ControlPlane and DataPlane
>  
> 2)socketServer.startProcessingRequests(authorizerFutures)
> This method completes the start of Acceptor and Processors for ControlPlane 
> and DataPlane
>  
> 3)Further trace to the startAcceptorAndProcessors method in SocketServer
> This method will start all Processors threads for each listener.name
> See method startProcessors(processors: Seq[Processor], processorThreadPrefix: 
> String)
>  
> 4)Analyze configureNewConnections() in the run method of the Processor thread 
> class
> There is a line of key code in this 
> methodselector.register(connectionId(channel.socket), channel)
> Continue to analyze the code at the bottom
> registerChannel(id, socketChannel, SelectionKey.OP_READ) -->
> KafkaChannel channel = buildAndAttachKafkaChannel(socketChannel, id, key) -->
> SaslChannelBuilder.buildChannel(...)
>  
> 5)The method buildChannel(...) of SaslChannelBuilder
> In this method, the type of Mode determines whether it is 
> buildServerAuthenticator or buildClientAuthenticator
> The most critical difference between the two methods is that the former is 
> createSaslServer and the latter is createSaslClient.
> SaslServer is responsible for ServerBroker verifying user passwords, and the 
> logic of PlainLoginModule verifying passwords can refer to the default 
> implementation class PlainServerCallbackHandler
> SaslClient is responsible for obtaining user passwords on KafkaClient
> So our focus should be buildClientAuthenticator, so the value of the instance 
> variable Mode mode of SaslChannelBuilder should be CLIENT.
> So we can go back and trace the construction method :
> SaslChannelBuilder public SaslChannelBuilder(...)
> 6)the construction method public SaslChannelBuilder(...)
> This method is only called in the private static ChannelBuilder create(...) 
> method of ChannelBuilders,
> Continue to trace up to static ChannelBuilder clientChannelBuilder(...)
> In fact, traced here, you can find that the caller of the 
> clientChannelBuilder(...) method is either ClientBroker or ClientUtils class. 
> The latter is for KafkaProducer/KafkaConsumer/KafkaAdminClient
> So in order to further analyze the root cause, I decided to use KafkaProducer 
> to simulate ClientBroker request connection. Their underlying mechanisms are 
> almost the same, except that some configuration items are only supported by 
> ClientBroker or the section of static JAAS configuration is different.
> h1. Source code analysis of KafkaProducer's Sasl authentication process
>  
> ClientUtils#clientChannelBuilder(…) → ChannelBuilder#create(...) → 
> SaslChannelBuilder#void configure(Map<String, ?> configs)
> The key operations completed in the above methods
> 1)JaasContext load
> If the kafka_Client_jaas.conf file is specified through the 
> java.security.auth.login.config environment variable, the method of loading 
> into JaasContext is the method getAppConfigurationEntry(String var1) in 
> sun.security.provider.ConfigFile class.
>  
> {code:java}
>  public AppConfigurationEntry[] getAppConfigurationEntry(String var1) {
>         return this.spi.engineGetAppConfigurationEntry(var1);
>     }
> {code}
>  
>  
> 2)createClientCallbackHandler(Map<String, ?> configs)
> The default ClientCallbackHandler implementation class is 
> SaslClientCallbackHandler. The role of SaslClientCallbackHandler is that 
> KafkaClient takes the username and password from the saved authentication 
> credentials in Subject.java in the LoginManager of its own Channel. *This is 
> particularly critical, and we will analyze it in detail later.*
> 3)Initialization of LoginManager and loading of LoginContext
> Here we directly trace the construction method of LoginManager
> private LoginManager(...) → AbstractLogin#login()
> Then there are two key operations in the AbstractLogin#login() method
>  
> {code:java}
>     @Override
>     public LoginContext login() throws LoginException {
>         loginContext = new LoginContext(contextName, null, 
> loginCallbackHandler, configuration);
>         loginContext.login();
>         log.info("Successfully logged in.");
>         return loginContext;
>     }
> {code}
>  
> The role of new LoginContext(...) is
> (1)Get the configuration variables in JaasContext, where all the credentials 
> of kafka_Client_jaas.conf are stored
> (2)Complete the initialization of the instance variable moduleStack array
>  
> loginContext.login() → invokePriv(LOGIN_METHOD) → invoke(String methodName)
> (1)In these methods
>  Updated the Object module field of each element of the moduleStack array
> (2) Obtain all public methods of each type of LoginModule class through 
> reflection
>  
> {code:java}
>   methods = moduleStack[i].module.getClass().getMethods();
> {code}
>  
> And get the index of the initialize method in the methods array through the 
> INIT_METHOD constant
> Then execute the initialize method of each type of LoginModule class through 
> reflection
>  
> {code:java}
>   methods[mIndex].invoke(moduleStack[i].module, initArgs);
> {code}
>  
> This method is very important. In this method invokePriv(LOGIN_METHOD), by 
> looping through the moduleStack variables, LoginContext will execute all the 
> initialize methods of the LoginModule class you configured, and the 
> initialize methods of PlainLoginModule and ScramLoginModule will load the 
> current username&password configured in kafka_Client_jaas.conf into the 
> Subject of the LoginManager corresponding to the Channel. The data structure 
> of the two fields storing username and password in Subject are both 
> SecureSet, and in SecureSet, LinkedList is used to store the corresponding 
> elements. *So the key point: the order of the elements in the two Credentials 
> in the Subject must be added in the order of all LoginModules in 
> kafka_Client_jaas.conf*
> 4)Let's go back and analyze the SaslClientCallbackHandler mentioned in the 
> second step above
> Where is this class used? In fact, KafkaClient is ready to initiate the 
> operation of establishing a connection. 
> ClientFactoryImpl#createSaslClient(...) will use SaslClientCallbackHandler.
> The stack of the method is:
> NetworkClient#initiateConnect(Node node, long now)
> -->
> Selector#connect(String id, InetSocketAddress address, int sendBufferSize, 
> int receiveBufferSize)
> -->
> Selector#registerChannel(String id, SocketChannel socketChannel, int 
> interestedOps)
> -->
> Selector#buildAndAttachKafkaChannel(SocketChannel socketChannel, String id, 
> SelectionKey key)
> -->
> SaslChannelBuilder#buildChannel(...) –> 
> SaslChannelBuilder#buildClientAuthenticator
> -->
> SaslClientAuthenticator#SaslClient createSaslClient()
> -->
> Sasl.createSaslClient(...)
> -->
> ClientFactoryImpl 的createSaslClient(...) .
> The source code of this method ClientFactoryImpl#createSaslClient(...) is as 
> follows:
> {code:java}
> public SaslClient createSaslClient(String[] var1, String var2, String var3, 
> String var4, Map<String, ?> var5, CallbackHandler var6) throws SaslException {
>     for(int var7 = 0; var7 < var1.length; ++var7) {
>         if (var1[var7].equals(myMechs[0]) && 
> PolicyUtils.checkPolicy(mechPolicies[0], var5)) {
>             return new ExternalClient(var2);
>         }
>         Object[] var8;
>         if (var1[var7].equals(myMechs[1]) && 
> PolicyUtils.checkPolicy(mechPolicies[1], var5)) {
>             var8 = this.getUserInfo("CRAM-MD5", var2, var6);
>             return new CramMD5Client((String)var8[0], 
> (byte[])((byte[])var8[1]));
>         }
>         if (var1[var7].equals(myMechs[2]) && 
> PolicyUtils.checkPolicy(mechPolicies[2], var5)) {
>             var8 = this.getUserInfo("PLAIN", var2, var6);
>             return new PlainClient(var2, (String)var8[0], 
> (byte[])((byte[])var8[1]));
>         }
>     }
>     return null;
> }
> {code}
>  
>  
> You can see the CallbackHandler passed in from the upper layer, which is the 
> parameter var6. In fact, it is Kafka's own SaslClientCallbackHandler. Then 
> continue to look at the source code of getUserInfo. You can clearly see that 
> NameCallback and PasswordCallback are constructed, and the handle method of 
> SaslClientCallbackHandler is executed: var3.handle(new Callback[]\{var6, 
> var7});
>  
> {code:java}
> private Object[] getUserInfo(String var1, String var2, CallbackHandler var3) 
> throws SaslException {
>         if (var3 == null) {
>             throw new SaslException("Callback handler to get 
> username/password required");
>         } else {
>             try {
>                 String var4 = var1 + " authentication id: ";
>                 String var5 = var1 + " password: ";
>                 NameCallback var6 = var2 == null ? new NameCallback(var4) : 
> new NameCallback(var4, var2);
>                 PasswordCallback var7 = new PasswordCallback(var5, false);
>                 var3.handle(new Callback[]{var6, var7});
>                 char[] var8 = var7.getPassword();
>                 byte[] var9;
>                 if (var8 != null) {
>                     var9 = (new String(var8)).getBytes("UTF8");
>                     var7.clearPassword();
>                 } else {
>                     var9 = null;
>                 }
>                 String var10 = var6.getName();
>                 return new Object[]{var10, var9};
>             } catch (IOException var11) {
>                 throw new SaslException("Cannot get password", var11);
>             } catch (UnsupportedCallbackException var12) {
>                 throw new SaslException("Cannot get userid/password", var12);
>             }
>         }
>     }
> {code}
>  
>  
>  
> The most important role of SaslClientCallbackHandler is its handle(Callback[] 
> callbacks) method, which takes out username and password from Subject
> By analyzing the source code analysis, when Subject takes elements from each 
> type of Credentials, *it will change the data structure of the Credentials 
> SecureSet into a HashSet*, and then call *HashSet<String>.iterator().next()* 
> to get *the first* item of each type of HashSet Elements, as username and 
> password in the corresponding Callback
>  
> {code:java}
>  for (Callback callback : callbacks) {
>             if (callback instanceof NameCallback) {
>                 NameCallback nc = (NameCallback) callback;
>                 if (subject != null && 
> !subject.getPublicCredentials(String.class).isEmpty()) {
>                     
> nc.setName(subject.getPublicCredentials(String.class).iterator().next());
>                 } else
>                     nc.setName(nc.getDefaultName());
>             } else if (callback instanceof PasswordCallback) {
>                 if (subject != null && 
> !subject.getPrivateCredentials(String.class).isEmpty()) {
>                     char[] password = 
> subject.getPrivateCredentials(String.class).iterator().next().toCharArray();
>                     ((PasswordCallback) callback).setPassword(password);
>                 } else {
>                     String errorMessage = "Could not login: the client is 
> being asked for a password, but the Kafka" +
>                              " client code does not currently support 
> obtaining a password from the user.";
>                     throw new UnsupportedCallbackException(callback, 
> errorMessage);
>                 }
>             }
>             
>       .......
>       }
> {code}
>  
>  
> *The key point is: when KafkaClient takes the username and password from the* 
> *Subject, it must be the first element in the HashSet<String>* *of the 
> content of each* *Credentials* *element, and the index order of the elements 
> in the HashSet<String> depends on the element's* {color:#ff0000}*hash( 
> )*{color} *value.*
> h1. KafkaProducer Sasl identity authentication process Debug
>  
> h2. Precondition:
> h3. 1)kafka_server_jaas.conf Configuration:
> All the broker machines java.security.auth.login.config use this 
> configuration, the broker starts normally, the Kafka cluster is healthy, and 
> can provide services to the outside world
>  
> {code:java}
> KafkaServer {
>   org.apache.kafka.common.security.plain.PlainLoginModule required
>   username="admin"
>   password="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
>   user_admin="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
>   user_alice="alice";
>   org.apache.kafka.common.security.scram.ScramLoginModule required
>   username="admin_scram"
>   password="admin_scram_password";
>  
> };
> {code}
>  
>  
> h3. 2)kafkaProducer key Configuration
> {code:java}
> props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "******:9669"); 
> props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
> System.setProperty("java.security.auth.login.config","******\\kafka_Client_jaas.conf");
> props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
> props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, 
> "******\\client.truststore.jks");
> props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "******");
> props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
> KafkaProducer<String, String> producer = new KafkaProducer<String, 
> String>(props);
> {code}
>  
> h3. 3)kafka_Client_jaas.conf File,Simulate the JAAS configuration of 
> ClientBroker
>  
> {code:java}
> KafkaClient {
>   org.apache.kafka.common.security.plain.PlainLoginModule required
>   username="admin"
>   password="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
>   user_admin="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
>   user_alice="alice";
>   org.apache.kafka.common.security.scram.ScramLoginModule required
>   username="admin_scram"
>   password="DziatPgjXG82sFHc4O1EIuewmlvS";
>  
> };
> {code}
>  
>  
> h3. Start the Debug for authentication process
> h4. 1)“LoginSucceeded = true” in the login() method of LoginContext; place 
> breakpoint in this line code
> Code debug picture see LoginContext_login_debug
>   
>  We can see: *The order of the elements in the two Credentials in the* 
> Subject *must be added in the order of all LoginModules in 
> kafka_Client_jaas.conf*
>  
> h4. 2)Place breakpoint debugging to handle(Callback[] callbacks) of 
> SaslClientCallbackHandler
> Code debug picture see SaslClientCallbackHandler_handle_debug
>   
>  You can see that the password field character array of PasswordCallback here 
> starts with “DziatPgjXG”, which corresponds to the password of 
> ScramLoginModule in the kafka_Client_jaas.conf file: 
> "DziatPgjXG82sFHc4O1EIuewmlvS"
>  
> h4. 3)Cancel all breakpoints and run the KafkaProducer program
> You can see the Producer log
> The real ip sensitive information of the broker in the log, I use ****** 
> instead of here
> {code:java}
> [main] INFO org.apache.kafka.common.security.authenticator.AbstractLogin - 
> Successfully logged in.
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.0.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 8cb0a5e9d3441962
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1635534803428
> [kafka-producer-network-thread | producer-1] WARN 
> org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] 
> Bootstrap broker ******:9669 (id: -3 rack: null) disconnected
> [kafka-producer-network-thread | producer-1] WARN 
> org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] 
> Bootstrap broker ******:9669 (id: -1 rack: null) disconnected
> [kafka-producer-network-thread | producer-1] INFO 
> org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] 
> Failed authentication with ******/****** (Authentication failed: Invalid 
> username or password)
> [kafka-producer-network-thread | producer-1] ERROR 
> org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] 
> Connection to node -2 (******/******:9669) failed authentication due to: 
> Authentication failed: Invalid username or password
> [kafka-producer-network-thread | producer-1] WARN 
> org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] 
> Bootstrap broker ******:9669 (id: -2 rack: null) disconnected
> [main] ERROR ProducerTest - the producer has a error:Authentication failed: 
> Invalid username or password
> [kafka-producer-network-thread | producer-1] INFO 
> org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] 
> Failed authentication with ******/****** (Authentication failed: Invalid 
> username or password)
> [kafka-producer-network-thread | producer-1] ERROR 
> org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] 
> Connection to node -3 (******/******:9669) failed authentication due to: 
> Authentication failed: Invalid username or password
> [kafka-producer-network-thread | producer-1] WARN 
> org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] 
> Bootstrap broker ******:9669 (id: -3 rack: null) disconnected
> [main] ERROR ProducerTest - the producer has a error:Authentication failed: 
> Invalid username or password
> {code}
>  
> You can see that the Producer log is also printing exceptions: Authentication 
> failed: Invalid username or password
> Because the password of the PlainLoginModule expected by ServerBroker should 
> be “kJTVDziatPgjXG82sFHc4O1EIuewmlvS”, but the password on the KafkaProducer 
> side was wrong, it was taken as the password in the ScramLoginModule: 
> “DziatPgjXG82sFHc4O1EIuewmlvS”
>  
>  
> h1. Root Cause:
> h2. 1. The Credentials stored in the Subject contain the username&password of 
> {color:#ff0000}all LoginModules{color} in kafka_Client_jaas.conf
> The JDK's LoginContext class initialization code is as follows
> {code:java}
> public LoginContext(String name, Subject subject,
>                         CallbackHandler callbackHandler,
>                         Configuration config) throws LoginException {
>         this.config = config;
>         if (config != null) {
>             creatorAcc = java.security.AccessController.getContext();
>         }
>         init(name);
>        ....
>        }
> {code}
>  
>  
> When JAAS configuration is configured through 
> java.security.auth.login.config, the config type here is actually the 
> sun.security.provider.ConfigFile type of the JDK .
> Then when the above init(name); code is executed, 
> config.getAppConfigurationEntry(name) will read all the LoginModules in the 
> kafka_Client_jaas.conf file again, and use the entries variable to complete 
> the assignment of the instance variable moduleStack in the following code.
>  
> {code:java}
> // get the LoginModules configured for this application
>         AppConfigurationEntry[] entries = 
> config.getAppConfigurationEntry(name);
>         if (entries == null) {
>             if (sm != null && creatorAcc == null) {
>                 sm.checkPermission(new AuthPermission
>                                 ("createLoginContext." + OTHER));
>             }
>             entries = config.getAppConfigurationEntry(OTHER);
>             if (entries == null) {
>                 MessageFormat form = new MessageFormat(ResourcesMgr.getString
>                         ("No.LoginModules.configured.for.name"));
>                 Object[] source = {name};
>                 throw new LoginException(form.format(source));
>             }
>         }
>         moduleStack = new ModuleInfo[entries.length];
> {code}
>  
> *Eventually, the* Credentials *stored in the* Subject *will contain the 
> username & password of all* LoginModules *in kafka_Client_jaas.conf.*
> h2. 2.In the Subject class in JDK, when two kinds of Credentials take 
> elements, the order of taking elements may not be the same as the order in 
> which Credentials store elements. The order of taking elements depends on the 
> order of the {color:#ff0000}hash index{color} of the elements in HashSet.
>  
> h1. Suggestion & Solutions:
>  
> h2. 1.Modification of JaasContext and JaasConfig class construction methods
> h3.  
> 1)When KafkaClient initializes Map<String, JaasContext> jaasContexts, the 
> construction method of JaasContext needs to pass in the clientSaslMechanism 
> configured by KafkaClient
> And use clientSaslMechanism to filter the entries returned by 
> configuration.getAppConfigurationEntry(name). When clientSaslMechanism = 
> PLAIN, the final configurationEntries instance variable should only contain 
> the PlainLoginModule content in kafka_Client_jaas.conf;When 
> clientSaslMechanism =SCRAM-SHA-256, configurationEntries should only contain 
> the content of ScramLoginModule 
> {code:java}
> public JaasContext(String name, Type type, Configuration configuration, 
> Password dynamicJaasConfig) {
>         this.name = name;
>         this.type = type;
>         this.configuration = configuration;
>         AppConfigurationEntry[] entries = 
> configuration.getAppConfigurationEntry(name);
>         if (entries == null)
>             throw new IllegalArgumentException("Could not find a '" + name + 
> "' entry in this JAAS configuration.");
>         
>         //Add here the code that uses clientSaslMechanism to verify and 
> filter entries
>         
>         this.configurationEntries = Collections.unmodifiableList(new 
> ArrayList<>(Arrays.asList(entries)));
>         this.dynamicJaasConfig = dynamicJaasConfig;
>     }
> {code}
> The advantages of this are two points:
>  (1)For mode == Mode.CLIENT, even if JAAS configuration is configured through 
> java.security.auth.login.config, the JaasContext of KafkaClient (including 
> KafkaProducer or KafkaConsumer, etc.) can reach the ClientBroker side 
> customized configuration 
> "[listener.name|http://listener.name/].\{listenerName}.\{saslMechanism}.sasl.jaas.config";
>  semantics,See link 
> [https://kafka.apache.org/documentation/#security_jaas_broker] . 
>  To achieve the goal: In Map<String, JaasContext> jaasContexts, the 
> configurationEntries in the JaasContext corresponding to each saslMechanism 
> only contains the contents of the LoginModule part corresponding to 
> saslMechanism.
>  (2)For mode == Mode.SERVER,
>  SaslChannelBuilder#configure(Map<String, ?> configs)
>   
> {code:java}
> public void configure(Map<String, ?> configs) throws KafkaException {
>         try {
>             this.configs = configs;
>             if (mode == Mode.SERVER) {
>                 createServerCallbackHandlers(configs);
>                 createConnectionsMaxReauthMsMap(configs);
>             } else
>                 createClientCallbackHandler(configs);
>             for (Map.Entry<String, AuthenticateCallbackHandler> entry : 
> saslCallbackHandlers.entrySet()) {
>                 String mechanism = entry.getKey();
>                 entry.getValue().configure(configs, mechanism, 
> jaasContexts.get(mechanism).configurationEntries());
>             }
>             ......
>             
>             }
> {code}
> Because we have completed the verification of configurationEntries in 
> jaasContexts.get(mechanism), when executing entry.getValue().configure(...) 
> method, in particular, PlainServerCallbackHandler executes configure(...), 
> The instance variable jaasConfigEntries in PlainServerCallbackHandler will 
> become more pure, jaasConfigEntries will no longer contain the content of 
> other LoginModule
> h3. 2)JaasConfig Class need to provide a new construction method
> In the construction method of JaasContext, the parameter Configuration 
> configuration is uniformly converted into JaasConfig configuration. Because 
> of the behavior in the init(String name) method of the LoginContext class of 
> the JDK, we cannot change it.
>  * 
> {code:java}
> public JaasConfig(String loginContextName, List<AppConfigurationEntry> 
> configurationEntries) {
> this.loginContextName = loginContextName;
> if (configurationEntries == null || configurationEntries.size() == 0)
>         throw new IllegalArgumentException("JAAS config property does not 
> contain any login modules");
> this.configEntries = configurationEntries;
> }
> {code}
> Then in the construction method of JaasContext, use the configurationEntries 
> after verification and filtering to restructure the configuration
>   
> {code:java}
> public JaasContext(String name, Type type, Configuration configuration, 
> Password dynamicJaasConfig) {
>         this.name = name;
>         this.type = type;
>         this.configuration = configuration;
>         AppConfigurationEntry[] entries = 
> configuration.getAppConfigurationEntry(name);
>         if (entries == null)
>             throw new IllegalArgumentException("Could not find a '" + name + 
> "' entry in this JAAS configuration.");
>         
>         //Add here the code that uses clientSaslMechanism to verify and 
> filter entries
>         
>         this.configurationEntries = Collections.unmodifiableList(new 
> ArrayList<>(Arrays.asList(entries)));
>         
>         if (configuration instanceof JaasConfig)
>             this.configuration = configuration;
>         else
>             this.configuration = new JaasConfig(name, configurationEntries);
>         
>         this.dynamicJaasConfig = dynamicJaasConfig;
>     }
> {code}
>  
> h2. 2.Each type of LoginModule in kafka_Client_jaas.conf should only be 
> configured once
> For example, in the following configuration, JaasContext is loaded, and an 
> exception should be thrown when executing the construction method of 
> JaasContext
>  
> {code:java}
> KafkaClient {
>   org.apache.kafka.common.security.plain.PlainLoginModule required
>   username="admin"
>   password="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
>   user_admin="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
>   user_alice="alice";
>   
>   org.apache.kafka.common.security.plain.PlainLoginModule required
>   username="admin2"
>   password="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
>   user_admin="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
>   user_tom="tom";
>   org.apache.kafka.common.security.scram.ScramLoginModule required
>   username="admin_scram"
>   password="DziatPgjXG82sFHc4O1EIuewmlvS";
>  
> };
> {code}
>  
>  
> h2. 3. Discussion for Subject class in JDK
> The Subject class in the JDK, when fetching elements, SecureSet converts 
> HashSet. I don't know what the JDK considers. We can try to submit an issue 
> to the JDK official. When we can fetch elements, the Set type  uses 
> SecureSet/LinkedHashSet,
> like SecureSet → SecureSet/LinkedHashSet .
> But I recommend making changes on the upper application side, Kafka level.
> First of all, whether it is Map<String, JaasContext> jaasContexts or 
> Map<String, LoginManager> loginManagers variables, the key is the mechanism. 
> Therefore, *the value corresponding to each mechanism should be purer and 
> should not contain the contents of the LoginModule of other mechanisms*. This 
> way, it is also avoided that there are elements greater than 1 in each 
> Credentials in the Subject.
> Secondly, because JDK is designing Subject, Credentials may not pay attention 
> to the semantics of "*first element*". What the JDK promises is to return 
> every element in Credentials to you and allow you to change the returned data.
> See the comments of the getPrivateCredentials method
> {code:java}
> Return a Set of private credentials associated with this Subject that are 
> instances or subclasses of the specified Class.
> The caller must have permission to access all of the requested Credentials, 
> or a SecurityException will be thrown.
> The returned Set is not backed by this Subject's internal private Credential 
> Set. A new Set is created and returned for each method invocation. 
> Modifications to the returned Set will not affect the internal private 
> Credential Set.
> Params:
> c – the returned Set of private credentials will all be instances of this 
> class.
> Type parameters:
> <T> – the type of the class modeled by c
> Returns:
> a Set of private credentials that are instances of the specified Class.
> Throws:
> NullPointerException – if the specified Class is null.
> public <T> Set<T> getPrivateCredentials(Class<T> c) 
> {code}
>  
> However, when Kafka takes elements from the two Credentials in the Subject, 
> it uses the semantics of "*take the first element*", which may conflict with 
> the JDK's concept when designing the Subject. Because you should get all the 
> elements of each Credentials for identity authentication.
> {code:java}
> subject.getPrivateCredentials(String.class).iterator().next()
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to