[ 
https://issues.apache.org/jira/browse/KAFKA-13751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

RivenSun updated KAFKA-13751:
-----------------------------
    Description: 
h1. Phenomenon:

 SASL/OAUTHBEARER, whether implemented by default or customized by the user, is 
not compatible with other SASL mechanisms.
h3.  

case1:

kafka_server_jaas_oauth.conf
{code:java}
KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="admin"
  user_admin="admin"
  user_alice="alice"; 

   org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;

   org.apache.kafka.common.security.scram.ScramLoginModule required
  username="admin"
  password="admin_scram";
}; {code}
 server.properties
{code:java}
advertised.listeners=SASL_PLAINTEXT://publicIp:8779,SASL_SSL://publicIp:8889,OAUTH://publicIp:8669
 
listener.security.protocol.map=INTERNAL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,OAUTH:SASL_SSL
sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER{code}
Error when starting kafka:
server.log
{code:java}
[2022-03-16 13:18:42,658] ERROR [KafkaServer id=1] Fatal error during 
KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: 
Must supply exactly 1 non-null JAAS mechanism configuration (size was 3)
        at 
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:172)
        at kafka.network.Processor.<init>(SocketServer.scala:724)
        at kafka.network.SocketServer.newProcessor(SocketServer.scala:367)
        at 
kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:252)
        at 
kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:251)
        at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:214)
        at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:211)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at 
kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:211)
        at kafka.network.SocketServer.startup(SocketServer.scala:122)
        at kafka.server.KafkaServer.startup(KafkaServer.scala:266)
        at 
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
        at kafka.Kafka$.main(Kafka.scala:82)
        at kafka.Kafka.main(Kafka.scala)
Caused by: java.lang.IllegalArgumentException: Must supply exactly 1 non-null 
JAAS mechanism configuration (size was 3)
        at 
org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredValidatorCallbackHandler.configure(OAuthBearerUnsecuredValidatorCallbackHandler.java:117)
        at 
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:139)
        ... 17 more
[2022-03-16 13:18:42,662] INFO [KafkaServer id=1] shutting down 
(kafka.server.KafkaServer)
[2022-03-16 13:18:42,664] INFO [SocketServer brokerId=1] Stopping socket server 
request processors (kafka.network.SocketServer) {code}
The default implementation class of oauthbearer's 
`sasl.server.callback.handler.class` is 
OAuthBearerUnsecuredValidatorCallbackHandler. 
In the OAuthBearerUnsecuredValidatorCallbackHandler#configure(...) method, the 
jaasConfigEntries parameter is verified.
What I want to say is that {*}the verification logic here is completely 
reasonable{*}, but the jaasConfigEntries passed in from the upper layer should 
not contain the AppConfigurationEntry of other loginModules. There are several 
other codes for the check of the same keyword *"Must supply exactly 1 non-null 
JAAS mechanism configuration".*

Rootcause elaborates later.

By the way, at present, KafkaServer allows {*}the same LoginModule to be 
configured multiple times in kafkaJaasConfigFile{*}, which will also lead to 
the phenomenon of case1.

kafka_server_jaas_oauth.conf eg:
{code:java}
KafkaServer {
   org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;

  org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
 
}; {code}
 
h3. case2:

On the basis of case1, modify the default implementation of oauthbearer's 
`sasl.server.callback.handler.class`

 server.properties add new configuration
{code:java}
listener.name.sasl_plaintext.oauthbearer.sasl.server.callback.handler.class=us.zoom.mq.security.oauth.AsyncMQOAuthCallbackHandler
listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class=us.zoom.mq.security.oauth.AsyncMQOAuthCallbackHandler
listener.name.oauth.oauthbearer.sasl.server.callback.handler.class=us.zoom.mq.security.oauth.AsyncMQOAuthCallbackHandler
 {code}
The specific implementation class code is omitted here.

When we start kafka again, we still encounter exceptions
server.log
{code:java}
[2022-03-16 14:47:46,037] ERROR Unrecognized SASL Login callback 
(org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule)
javax.security.auth.callback.UnsupportedCallbackException: Unrecognized SASL 
Login callback
        at 
org.apache.kafka.common.security.authenticator.AbstractLogin$DefaultLoginCallbackHandler.handle(AbstractLogin.java:105)
        at 
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:316)
        at 
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301)
        at 
java.base/javax.security.auth.login.LoginContext.invoke(LoginContext.java:726)
        at 
java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:665)
        at 
java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:663)
        at 
java.base/java.security.AccessController.doPrivileged(AccessController.java:691)
        at 
java.base/javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:663)
        at 
java.base/javax.security.auth.login.LoginContext.login(LoginContext.java:574)
        at 
org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:60)
        at 
org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62)
        at 
org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:112)
        at 
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:158)
        at 
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157)
        at 
org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:97)
        at kafka.network.Processor.<init>(SocketServer.scala:724)
        at kafka.network.SocketServer.newProcessor(SocketServer.scala:367)
        at 
kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:252)
        at 
kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:251)
        at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:214)
        at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:211)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at 
kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:211)
        at kafka.network.SocketServer.startup(SocketServer.scala:122)
        at kafka.server.KafkaServer.startup(KafkaServer.scala:266)
        at 
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
        at kafka.Kafka$.main(Kafka.scala:82)
        at kafka.Kafka.main(Kafka.scala)
[2022-03-16 14:47:46,048] ERROR [KafkaServer id=1] Fatal error during 
KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
org.apache.kafka.common.KafkaException: 
javax.security.auth.login.LoginException: An internal error occurred while 
retrieving token from callback handler
        at 
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:172)
        at 
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157)
        at 
org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:97)
        at kafka.network.Processor.<init>(SocketServer.scala:724)
        at kafka.network.SocketServer.newProcessor(SocketServer.scala:367)
        at 
kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:252)
        at 
kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:251)
        at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:214)
        at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:211)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at 
kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:211)
        at kafka.network.SocketServer.startup(SocketServer.scala:122)
        at kafka.server.KafkaServer.startup(KafkaServer.scala:266)
        at 
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
        at kafka.Kafka$.main(Kafka.scala:82)
        at kafka.Kafka.main(Kafka.scala)
Caused by: javax.security.auth.login.LoginException: An internal error occurred 
while retrieving token from callback handler
        at 
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:319)
        at 
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301)
        at 
java.base/javax.security.auth.login.LoginContext.invoke(LoginContext.java:726)
        at 
java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:665)
        at 
java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:663)
        at 
java.base/java.security.AccessController.doPrivileged(AccessController.java:691)
        at 
java.base/javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:663)
        at 
java.base/javax.security.auth.login.LoginContext.login(LoginContext.java:574)
        at 
org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:60)
        at 
org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62)
        at 
org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:112)
        at 
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:158)
        ... 17 more
[2022-03-16 14:47:46,049] INFO [KafkaServer id=1] shutting down 
(kafka.server.KafkaServer)
[2022-03-16 14:47:46,052] INFO [SocketServer brokerId=1] Stopping socket server 
request processors (kafka.network.SocketServer) {code}
By analyzing the stack, it is easy to see the problem:
1) AbstractLogin.login should not call the OAuthBearerLoginModule.login method, 
only OAuthBearerRefreshingLogin.login can call the OAuthBearerLoginModule.login 
method.
2) OAuthBearerLoginModule.login should not call the 
AbstractLogin$DefaultLoginCallbackHandler.handle method, it should call the 
OAuthBearerUnsecuredLoginCallbackHandler.handle method.

 
h1. Summary

Analyze the stack, read the source code, the root cause is:
In the ChannelBuilders#create method, if the corresponding JaasContext of each 
SASL mechanism is returned by the *JaasContext#defaultContext()* method, the 
*JaasContext.configuration* and *JaasContext.configurationEntries* fields both 
contain the AppConfigurationEntry of other mechanisms. And then in the 
subsequent code stack, resulting in case1 or case2.
In fact, you can find from some source codes of Kafka saslModule that Kafka has 
this {*}original intention: JaasContext per saslMechanism should be pure{*}.
1) In the ChannelBuilders#create(...) method, when constructing the 
SaslChannelBuilder, `Map<String, JaasContext> jaasContexts` is passed in
2) In the SaslChannelBuilder#configure(...) method, after constructing 
LoginManager, LoginManager is also stored with `Map<String, LoginManager> 
loginManagers`.
The keys of these two maps are {*}saslMechanism{*}, but their values ​​both 
contain all saslMechanism information. Affected by this behavior, 
jaasConfigEntries in OAuthBearerUnsecuredValidatorCallbackHandler also contains 
other saslMechanism information, other implementation classes of 
sasl.server.callback.handler.class have the same problem.

In fact, I have reported an issue a long time ago, and KAFKA-13422 has the same 
RC and Solution as this issue. 
https://issues.apache.org/jira/browse/KAFKA-13422

For more detailed RC analysis and solutions you can read the last two parts of 
the description of KAFKA-13422: Root Cause and Suggestion & Solutions.

 

 

 

  was:
h1. Phenomenon:

 SASL/OAUTHBEARER, whether implemented by default or customized by the user, is 
not compatible with other SASL mechanisms.
h3. 
case1:

kafka_server_jaas_oauth.conf
{code:java}
KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="admin"
  user_admin="admin"
  user_alice="alice"; 

   org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;

   org.apache.kafka.common.security.scram.ScramLoginModule required
  username="admin"
  password="admin_scram";
}; {code}
 server.properties
{code:java}
advertised.listeners=SASL_PLAINTEXT://publicIp:8779,SASL_SSL://publicIp:8889,OAUTH://publicIp:8669
 
listener.security.protocol.map=INTERNAL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,OAUTH:SASL_SSL
sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER{code}
Error when starting kafka:
server.log
{code:java}
[2022-03-16 13:18:42,658] ERROR [KafkaServer id=1] Fatal error during 
KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: 
Must supply exactly 1 non-null JAAS mechanism configuration (size was 3)
        at 
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:172)
        at kafka.network.Processor.<init>(SocketServer.scala:724)
        at kafka.network.SocketServer.newProcessor(SocketServer.scala:367)
        at 
kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:252)
        at 
kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:251)
        at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:214)
        at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:211)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at 
kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:211)
        at kafka.network.SocketServer.startup(SocketServer.scala:122)
        at kafka.server.KafkaServer.startup(KafkaServer.scala:266)
        at 
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
        at kafka.Kafka$.main(Kafka.scala:82)
        at kafka.Kafka.main(Kafka.scala)
Caused by: java.lang.IllegalArgumentException: Must supply exactly 1 non-null 
JAAS mechanism configuration (size was 3)
        at 
org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredValidatorCallbackHandler.configure(OAuthBearerUnsecuredValidatorCallbackHandler.java:117)
        at 
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:139)
        ... 17 more
[2022-03-16 13:18:42,662] INFO [KafkaServer id=1] shutting down 
(kafka.server.KafkaServer)
[2022-03-16 13:18:42,664] INFO [SocketServer brokerId=1] Stopping socket server 
request processors (kafka.network.SocketServer) {code}
The default implementation class of oauthbearer's 
`sasl.server.callback.handler.class` is 
OAuthBearerUnsecuredValidatorCallbackHandler. 
In the OAuthBearerUnsecuredValidatorCallbackHandler#configure(...) method, the 
jaasConfigEntries parameter is verified.
What I want to say is that {*}the verification logic here is completely 
reasonable{*}, but the jaasConfigEntries passed in from the upper layer should 
not contain the AppConfigurationEntry of other loginModules. There are several 
other codes for the check of the same keyword *"Must supply exactly 1 non-null 
JAAS mechanism configuration".*

Rootcause elaborates later.


By the way, at present, KafkaServer allows {*}the same LoginModule to be 
configured multiple times in kafkaJaasConfigFile{*}, which will also lead to 
the phenomenon of case1.

kafka_server_jaas_oauth.conf eg:
{code:java}
KafkaServer {
   org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;

  org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
 
}; {code}
 




h3. case2:
On the basis of case1, modify the default implementation of oauthbearer's 
`sasl.server.callback.handler.class`



 server.properties add new configuration
{code:java}
listener.name.sasl_plaintext.oauthbearer.sasl.server.callback.handler.class=us.zoom.mq.security.oauth.AsyncMQOAuthCallbackHandler
listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class=us.zoom.mq.security.oauth.AsyncMQOAuthCallbackHandler
listener.name.oauth.oauthbearer.sasl.server.callback.handler.class=us.zoom.mq.security.oauth.AsyncMQOAuthCallbackHandler
 {code}
The specific implementation class code is omitted here.

When we start kafka again, we still encounter exceptions
server.log
{code:java}
[2022-03-16 14:47:46,037] ERROR Unrecognized SASL Login callback 
(org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule)
javax.security.auth.callback.UnsupportedCallbackException: Unrecognized SASL 
Login callback
        at 
org.apache.kafka.common.security.authenticator.AbstractLogin$DefaultLoginCallbackHandler.handle(AbstractLogin.java:105)
        at 
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:316)
        at 
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301)
        at 
java.base/javax.security.auth.login.LoginContext.invoke(LoginContext.java:726)
        at 
java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:665)
        at 
java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:663)
        at 
java.base/java.security.AccessController.doPrivileged(AccessController.java:691)
        at 
java.base/javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:663)
        at 
java.base/javax.security.auth.login.LoginContext.login(LoginContext.java:574)
        at 
org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:60)
        at 
org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62)
        at 
org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:112)
        at 
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:158)
        at 
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157)
        at 
org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:97)
        at kafka.network.Processor.<init>(SocketServer.scala:724)
        at kafka.network.SocketServer.newProcessor(SocketServer.scala:367)
        at 
kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:252)
        at 
kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:251)
        at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:214)
        at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:211)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at 
kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:211)
        at kafka.network.SocketServer.startup(SocketServer.scala:122)
        at kafka.server.KafkaServer.startup(KafkaServer.scala:266)
        at 
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
        at kafka.Kafka$.main(Kafka.scala:82)
        at kafka.Kafka.main(Kafka.scala)
[2022-03-16 14:47:46,048] ERROR [KafkaServer id=1] Fatal error during 
KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
org.apache.kafka.common.KafkaException: 
javax.security.auth.login.LoginException: An internal error occurred while 
retrieving token from callback handler
        at 
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:172)
        at 
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157)
        at 
org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:97)
        at kafka.network.Processor.<init>(SocketServer.scala:724)
        at kafka.network.SocketServer.newProcessor(SocketServer.scala:367)
        at 
kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:252)
        at 
kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:251)
        at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:214)
        at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:211)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at 
kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:211)
        at kafka.network.SocketServer.startup(SocketServer.scala:122)
        at kafka.server.KafkaServer.startup(KafkaServer.scala:266)
        at 
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
        at kafka.Kafka$.main(Kafka.scala:82)
        at kafka.Kafka.main(Kafka.scala)
Caused by: javax.security.auth.login.LoginException: An internal error occurred 
while retrieving token from callback handler
        at 
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:319)
        at 
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301)
        at 
java.base/javax.security.auth.login.LoginContext.invoke(LoginContext.java:726)
        at 
java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:665)
        at 
java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:663)
        at 
java.base/java.security.AccessController.doPrivileged(AccessController.java:691)
        at 
java.base/javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:663)
        at 
java.base/javax.security.auth.login.LoginContext.login(LoginContext.java:574)
        at 
org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:60)
        at 
org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62)
        at 
org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:112)
        at 
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:158)
        ... 17 more
[2022-03-16 14:47:46,049] INFO [KafkaServer id=1] shutting down 
(kafka.server.KafkaServer)
[2022-03-16 14:47:46,052] INFO [SocketServer brokerId=1] Stopping socket server 
request processors (kafka.network.SocketServer) {code}
By analyzing the stack, it is easy to see the problem:
1) AbstractLogin.login should not call the OAuthBearerLoginModule.login method, 
only OAuthBearerRefreshingLogin.login can call the OAuthBearerLoginModule.login 
method.
2) OAuthBearerLoginModule.login should not call the 
AbstractLogin$DefaultLoginCallbackHandler.handle method, it should call the 
OAuthBearerUnsecuredLoginCallbackHandler.handle method.

Analyze the stack, read the source code, the root cause is:
In the ChannelBuilders#create method, if the corresponding JaasContext of each 
SASL mechanism is returned by the *JaasContext#defaultContext()* method, the 
*JaasContext.configuration* and *JaasContext.configurationEntries* fields both 
contain the AppConfigurationEntry of other mechanisms. And then in the 
subsequent code stack, resulting in case1 or case2.
In fact, you can find from some source codes of Kafka saslModule that Kafka has 
this {*}original intention: JaasContext per saslMechanism should be pure{*}.
1) In the ChannelBuilders#create(...) method, when constructing the 
SaslChannelBuilder, `Map<String, JaasContext> jaasContexts` is passed in
2) In the SaslChannelBuilder#configure(...) method, after constructing 
LoginManager, LoginManager is also stored with `Map<String, LoginManager> 
loginManagers`.
The keys of these two maps are {*}saslMechanism{*}, but their values ​​both 
contain all saslMechanism information. Affected by this behavior, 
jaasConfigEntries in OAuthBearerUnsecuredValidatorCallbackHandler also contains 
other saslMechanism information, other implementation classes of 
sasl.server.callback.handler.class have the same problem.

In fact, I have reported an issue a long time ago, and KAFKA-13422 has the same 
RC and Solution as this issue. 
https://issues.apache.org/jira/browse/KAFKA-13422

For more detailed RC analysis and solutions you can read the last two parts of 
the description of KAFKA-13422: Root Cause and Suggestion & Solutions.







 

 

 


> On the broker side, OAUTHBEARER is not compatible with other SASL mechanisms
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-13751
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13751
>             Project: Kafka
>          Issue Type: Bug
>          Components: security
>    Affects Versions: 3.0.1
>            Reporter: RivenSun
>            Priority: Critical
>
> h1. Phenomenon:
>  SASL/OAUTHBEARER, whether implemented by default or customized by the user, 
> is not compatible with other SASL mechanisms.
> h3.  
> case1:
> kafka_server_jaas_oauth.conf
> {code:java}
> KafkaServer {
>   org.apache.kafka.common.security.plain.PlainLoginModule required
>   username="admin"
>   password="admin"
>   user_admin="admin"
>   user_alice="alice"; 
>    org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule 
> required;
>    org.apache.kafka.common.security.scram.ScramLoginModule required
>   username="admin"
>   password="admin_scram";
> }; {code}
>  server.properties
> {code:java}
> advertised.listeners=SASL_PLAINTEXT://publicIp:8779,SASL_SSL://publicIp:8889,OAUTH://publicIp:8669
>  
> listener.security.protocol.map=INTERNAL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,OAUTH:SASL_SSL
> sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER{code}
> Error when starting kafka:
> server.log
> {code:java}
> [2022-03-16 13:18:42,658] ERROR [KafkaServer id=1] Fatal error during 
> KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
> org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: 
> Must supply exactly 1 non-null JAAS mechanism configuration (size was 3)
>         at 
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:172)
>         at kafka.network.Processor.<init>(SocketServer.scala:724)
>         at kafka.network.SocketServer.newProcessor(SocketServer.scala:367)
>         at 
> kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:252)
>         at 
> kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:251)
>         at 
> kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:214)
>         at 
> kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:211)
>         at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>         at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>         at 
> kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:211)
>         at kafka.network.SocketServer.startup(SocketServer.scala:122)
>         at kafka.server.KafkaServer.startup(KafkaServer.scala:266)
>         at 
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
>         at kafka.Kafka$.main(Kafka.scala:82)
>         at kafka.Kafka.main(Kafka.scala)
> Caused by: java.lang.IllegalArgumentException: Must supply exactly 1 non-null 
> JAAS mechanism configuration (size was 3)
>         at 
> org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredValidatorCallbackHandler.configure(OAuthBearerUnsecuredValidatorCallbackHandler.java:117)
>         at 
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:139)
>         ... 17 more
> [2022-03-16 13:18:42,662] INFO [KafkaServer id=1] shutting down 
> (kafka.server.KafkaServer)
> [2022-03-16 13:18:42,664] INFO [SocketServer brokerId=1] Stopping socket 
> server request processors (kafka.network.SocketServer) {code}
> The default implementation class of oauthbearer's 
> `sasl.server.callback.handler.class` is 
> OAuthBearerUnsecuredValidatorCallbackHandler. 
> In the OAuthBearerUnsecuredValidatorCallbackHandler#configure(...) method, 
> the jaasConfigEntries parameter is verified.
> What I want to say is that {*}the verification logic here is completely 
> reasonable{*}, but the jaasConfigEntries passed in from the upper layer 
> should not contain the AppConfigurationEntry of other loginModules. There are 
> several other codes for the check of the same keyword *"Must supply exactly 1 
> non-null JAAS mechanism configuration".*
> Rootcause elaborates later.
> By the way, at present, KafkaServer allows {*}the same LoginModule to be 
> configured multiple times in kafkaJaasConfigFile{*}, which will also lead to 
> the phenomenon of case1.
> kafka_server_jaas_oauth.conf eg:
> {code:java}
> KafkaServer {
>    org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule 
> required;
>   org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule 
> required;
>  
> }; {code}
>  
> h3. case2:
> On the basis of case1, modify the default implementation of oauthbearer's 
> `sasl.server.callback.handler.class`
>  server.properties add new configuration
> {code:java}
> listener.name.sasl_plaintext.oauthbearer.sasl.server.callback.handler.class=us.zoom.mq.security.oauth.AsyncMQOAuthCallbackHandler
> listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class=us.zoom.mq.security.oauth.AsyncMQOAuthCallbackHandler
> listener.name.oauth.oauthbearer.sasl.server.callback.handler.class=us.zoom.mq.security.oauth.AsyncMQOAuthCallbackHandler
>  {code}
> The specific implementation class code is omitted here.
> When we start kafka again, we still encounter exceptions
> server.log
> {code:java}
> [2022-03-16 14:47:46,037] ERROR Unrecognized SASL Login callback 
> (org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule)
> javax.security.auth.callback.UnsupportedCallbackException: Unrecognized SASL 
> Login callback
>         at 
> org.apache.kafka.common.security.authenticator.AbstractLogin$DefaultLoginCallbackHandler.handle(AbstractLogin.java:105)
>         at 
> org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:316)
>         at 
> org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301)
>         at 
> java.base/javax.security.auth.login.LoginContext.invoke(LoginContext.java:726)
>         at 
> java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:665)
>         at 
> java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:663)
>         at 
> java.base/java.security.AccessController.doPrivileged(AccessController.java:691)
>         at 
> java.base/javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:663)
>         at 
> java.base/javax.security.auth.login.LoginContext.login(LoginContext.java:574)
>         at 
> org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:60)
>         at 
> org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62)
>         at 
> org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:112)
>         at 
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:158)
>         at 
> org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157)
>         at 
> org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:97)
>         at kafka.network.Processor.<init>(SocketServer.scala:724)
>         at kafka.network.SocketServer.newProcessor(SocketServer.scala:367)
>         at 
> kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:252)
>         at 
> kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:251)
>         at 
> kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:214)
>         at 
> kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:211)
>         at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>         at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>         at 
> kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:211)
>         at kafka.network.SocketServer.startup(SocketServer.scala:122)
>         at kafka.server.KafkaServer.startup(KafkaServer.scala:266)
>         at 
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
>         at kafka.Kafka$.main(Kafka.scala:82)
>         at kafka.Kafka.main(Kafka.scala)
> [2022-03-16 14:47:46,048] ERROR [KafkaServer id=1] Fatal error during 
> KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
> org.apache.kafka.common.KafkaException: 
> javax.security.auth.login.LoginException: An internal error occurred while 
> retrieving token from callback handler
>         at 
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:172)
>         at 
> org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157)
>         at 
> org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:97)
>         at kafka.network.Processor.<init>(SocketServer.scala:724)
>         at kafka.network.SocketServer.newProcessor(SocketServer.scala:367)
>         at 
> kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:252)
>         at 
> kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:251)
>         at 
> kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:214)
>         at 
> kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:211)
>         at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>         at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>         at 
> kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:211)
>         at kafka.network.SocketServer.startup(SocketServer.scala:122)
>         at kafka.server.KafkaServer.startup(KafkaServer.scala:266)
>         at 
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
>         at kafka.Kafka$.main(Kafka.scala:82)
>         at kafka.Kafka.main(Kafka.scala)
> Caused by: javax.security.auth.login.LoginException: An internal error 
> occurred while retrieving token from callback handler
>         at 
> org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:319)
>         at 
> org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301)
>         at 
> java.base/javax.security.auth.login.LoginContext.invoke(LoginContext.java:726)
>         at 
> java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:665)
>         at 
> java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:663)
>         at 
> java.base/java.security.AccessController.doPrivileged(AccessController.java:691)
>         at 
> java.base/javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:663)
>         at 
> java.base/javax.security.auth.login.LoginContext.login(LoginContext.java:574)
>         at 
> org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:60)
>         at 
> org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62)
>         at 
> org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:112)
>         at 
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:158)
>         ... 17 more
> [2022-03-16 14:47:46,049] INFO [KafkaServer id=1] shutting down 
> (kafka.server.KafkaServer)
> [2022-03-16 14:47:46,052] INFO [SocketServer brokerId=1] Stopping socket 
> server request processors (kafka.network.SocketServer) {code}
> By analyzing the stack, it is easy to see the problem:
> 1) AbstractLogin.login should not call the OAuthBearerLoginModule.login 
> method, only OAuthBearerRefreshingLogin.login can call the 
> OAuthBearerLoginModule.login method.
> 2) OAuthBearerLoginModule.login should not call the 
> AbstractLogin$DefaultLoginCallbackHandler.handle method, it should call the 
> OAuthBearerUnsecuredLoginCallbackHandler.handle method.
>  
> h1. Summary
> Analyze the stack, read the source code, the root cause is:
> In the ChannelBuilders#create method, if the corresponding JaasContext of 
> each SASL mechanism is returned by the *JaasContext#defaultContext()* method, 
> the *JaasContext.configuration* and *JaasContext.configurationEntries* fields 
> both contain the AppConfigurationEntry of other mechanisms. And then in the 
> subsequent code stack, resulting in case1 or case2.
> In fact, you can find from some source codes of Kafka saslModule that Kafka 
> has this {*}original intention: JaasContext per saslMechanism should be 
> pure{*}.
> 1) In the ChannelBuilders#create(...) method, when constructing the 
> SaslChannelBuilder, `Map<String, JaasContext> jaasContexts` is passed in
> 2) In the SaslChannelBuilder#configure(...) method, after constructing 
> LoginManager, LoginManager is also stored with `Map<String, LoginManager> 
> loginManagers`.
> The keys of these two maps are {*}saslMechanism{*}, but their values ​​both 
> contain all saslMechanism information. Affected by this behavior, 
> jaasConfigEntries in OAuthBearerUnsecuredValidatorCallbackHandler also 
> contains other saslMechanism information, other implementation classes of 
> sasl.server.callback.handler.class have the same problem.
> In fact, I have reported an issue a long time ago, and KAFKA-13422 has the 
> same RC and Solution as this issue. 
> https://issues.apache.org/jira/browse/KAFKA-13422
> For more detailed RC analysis and solutions you can read the last two parts 
> of the description of KAFKA-13422: Root Cause and Suggestion & Solutions.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to