[
https://issues.apache.org/jira/browse/KAFKA-13751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17515032#comment-17515032
]
RivenSun commented on KAFKA-13751:
----------------------------------
Hi [~dajac] [~ijuma]
Could you give some suggestions for this issue?
Thanks.
> On the broker side, OAUTHBEARER is not compatible with other SASL mechanisms
> ----------------------------------------------------------------------------
>
> Key: KAFKA-13751
> URL: https://issues.apache.org/jira/browse/KAFKA-13751
> Project: Kafka
> Issue Type: Bug
> Components: security
> Affects Versions: 3.0.1
> Reporter: RivenSun
> Priority: Critical
>
> h1. Phenomenon:
> SASL/OAUTHBEARER, whether implemented by default or customized by the user,
> is not compatible with other SASL mechanisms.
> h3.
> case1:
> kafka_server_jaas_oauth.conf
> {code:java}
> KafkaServer {
> org.apache.kafka.common.security.plain.PlainLoginModule required
> username="admin"
> password="admin"
> user_admin="admin"
> user_alice="alice";
> org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
> required;
> org.apache.kafka.common.security.scram.ScramLoginModule required
> username="admin"
> password="admin_scram";
> }; {code}
> server.properties
> {code:java}
> advertised.listeners=SASL_PLAINTEXT://publicIp:8779,SASL_SSL://publicIp:8889,OAUTH://publicIp:8669
>
> listener.security.protocol.map=INTERNAL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,OAUTH:SASL_SSL
> sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER{code}
> Error when starting kafka:
> server.log
> {code:java}
> [2022-03-16 13:18:42,658] ERROR [KafkaServer id=1] Fatal error during
> KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
> org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException:
> Must supply exactly 1 non-null JAAS mechanism configuration (size was 3)
> at
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:172)
> at kafka.network.Processor.<init>(SocketServer.scala:724)
> at kafka.network.SocketServer.newProcessor(SocketServer.scala:367)
> at
> kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:252)
> at
> kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:251)
> at
> kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:214)
> at
> kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:211)
> at
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
> at
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
> at
> kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:211)
> at kafka.network.SocketServer.startup(SocketServer.scala:122)
> at kafka.server.KafkaServer.startup(KafkaServer.scala:266)
> at
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
> at kafka.Kafka$.main(Kafka.scala:82)
> at kafka.Kafka.main(Kafka.scala)
> Caused by: java.lang.IllegalArgumentException: Must supply exactly 1 non-null
> JAAS mechanism configuration (size was 3)
> at
> org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredValidatorCallbackHandler.configure(OAuthBearerUnsecuredValidatorCallbackHandler.java:117)
> at
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:139)
> ... 17 more
> [2022-03-16 13:18:42,662] INFO [KafkaServer id=1] shutting down
> (kafka.server.KafkaServer)
> [2022-03-16 13:18:42,664] INFO [SocketServer brokerId=1] Stopping socket
> server request processors (kafka.network.SocketServer) {code}
> The default implementation class of oauthbearer's
> `sasl.server.callback.handler.class` is
> OAuthBearerUnsecuredValidatorCallbackHandler.
> In the OAuthBearerUnsecuredValidatorCallbackHandler#configure(...) method,
> the jaasConfigEntries parameter is verified.
> What I want to say is that {*}the verification logic here is completely
> reasonable{*}, but the jaasConfigEntries passed in from the upper layer
> should not contain the AppConfigurationEntry of other loginModules. There are
> several other codes for the check of the same keyword *"Must supply exactly 1
> non-null JAAS mechanism configuration".*
> Rootcause elaborates later.
> By the way, at present, KafkaServer allows {*}the same LoginModule to be
> configured multiple times in kafkaJaasConfigFile{*}, which will also lead to
> the phenomenon of case1.
> kafka_server_jaas_oauth.conf eg:
> {code:java}
> KafkaServer {
> org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
> required;
> org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
> required;
>
> }; {code}
>
> h3. case2:
> On the basis of case1, modify the default implementation of oauthbearer's
> `sasl.server.callback.handler.class`
> server.properties add new configuration
> {code:java}
> listener.name.sasl_plaintext.oauthbearer.sasl.server.callback.handler.class=us.zoom.mq.security.oauth.AsyncMQOAuthCallbackHandler
> listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class=us.zoom.mq.security.oauth.AsyncMQOAuthCallbackHandler
> listener.name.oauth.oauthbearer.sasl.server.callback.handler.class=us.zoom.mq.security.oauth.AsyncMQOAuthCallbackHandler
> {code}
> The specific implementation class code is omitted here.
> When we start kafka again, we still encounter exceptions
> server.log
> {code:java}
> [2022-03-16 14:47:46,037] ERROR Unrecognized SASL Login callback
> (org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule)
> javax.security.auth.callback.UnsupportedCallbackException: Unrecognized SASL
> Login callback
> at
> org.apache.kafka.common.security.authenticator.AbstractLogin$DefaultLoginCallbackHandler.handle(AbstractLogin.java:105)
> at
> org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:316)
> at
> org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301)
> at
> java.base/javax.security.auth.login.LoginContext.invoke(LoginContext.java:726)
> at
> java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:665)
> at
> java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:663)
> at
> java.base/java.security.AccessController.doPrivileged(AccessController.java:691)
> at
> java.base/javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:663)
> at
> java.base/javax.security.auth.login.LoginContext.login(LoginContext.java:574)
> at
> org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:60)
> at
> org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62)
> at
> org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:112)
> at
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:158)
> at
> org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157)
> at
> org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:97)
> at kafka.network.Processor.<init>(SocketServer.scala:724)
> at kafka.network.SocketServer.newProcessor(SocketServer.scala:367)
> at
> kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:252)
> at
> kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:251)
> at
> kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:214)
> at
> kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:211)
> at
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
> at
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
> at
> kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:211)
> at kafka.network.SocketServer.startup(SocketServer.scala:122)
> at kafka.server.KafkaServer.startup(KafkaServer.scala:266)
> at
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
> at kafka.Kafka$.main(Kafka.scala:82)
> at kafka.Kafka.main(Kafka.scala)
> [2022-03-16 14:47:46,048] ERROR [KafkaServer id=1] Fatal error during
> KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
> org.apache.kafka.common.KafkaException:
> javax.security.auth.login.LoginException: An internal error occurred while
> retrieving token from callback handler
> at
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:172)
> at
> org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157)
> at
> org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:97)
> at kafka.network.Processor.<init>(SocketServer.scala:724)
> at kafka.network.SocketServer.newProcessor(SocketServer.scala:367)
> at
> kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:252)
> at
> kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:251)
> at
> kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:214)
> at
> kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:211)
> at
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
> at
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
> at
> kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:211)
> at kafka.network.SocketServer.startup(SocketServer.scala:122)
> at kafka.server.KafkaServer.startup(KafkaServer.scala:266)
> at
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
> at kafka.Kafka$.main(Kafka.scala:82)
> at kafka.Kafka.main(Kafka.scala)
> Caused by: javax.security.auth.login.LoginException: An internal error
> occurred while retrieving token from callback handler
> at
> org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:319)
> at
> org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301)
> at
> java.base/javax.security.auth.login.LoginContext.invoke(LoginContext.java:726)
> at
> java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:665)
> at
> java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:663)
> at
> java.base/java.security.AccessController.doPrivileged(AccessController.java:691)
> at
> java.base/javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:663)
> at
> java.base/javax.security.auth.login.LoginContext.login(LoginContext.java:574)
> at
> org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:60)
> at
> org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62)
> at
> org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:112)
> at
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:158)
> ... 17 more
> [2022-03-16 14:47:46,049] INFO [KafkaServer id=1] shutting down
> (kafka.server.KafkaServer)
> [2022-03-16 14:47:46,052] INFO [SocketServer brokerId=1] Stopping socket
> server request processors (kafka.network.SocketServer) {code}
> By analyzing the stack, it is easy to see the problem:
> 1) AbstractLogin.login should not call the OAuthBearerLoginModule.login
> method, only OAuthBearerRefreshingLogin.login can call the
> OAuthBearerLoginModule.login method.
> 2) OAuthBearerLoginModule.login should not call the
> AbstractLogin$DefaultLoginCallbackHandler.handle method, it should call the
> OAuthBearerUnsecuredLoginCallbackHandler.handle method.
>
> h1. Summary
> Analyze the stack, read the source code, the root cause is:
> In the ChannelBuilders#create method, if the corresponding JaasContext of
> each SASL mechanism is returned by the *JaasContext#defaultContext()* method,
> the *JaasContext.configuration* and *JaasContext.configurationEntries* fields
> both contain the AppConfigurationEntry of other mechanisms. And then in the
> subsequent code stack, resulting in case1 or case2.
> In fact, you can find from some source codes of Kafka saslModule that Kafka
> has this {*}original intention: JaasContext per saslMechanism should be
> pure{*}.
> 1) In the ChannelBuilders#create(...) method, when constructing the
> SaslChannelBuilder, `Map<String, JaasContext> jaasContexts` is passed in
> 2) In the SaslChannelBuilder#configure(...) method, after constructing
> LoginManager, LoginManager is also stored with `Map<String, LoginManager>
> loginManagers`.
> The keys of these two maps are {*}saslMechanism{*}, but their values both
> contain all saslMechanism information. Affected by this behavior,
> jaasConfigEntries in OAuthBearerUnsecuredValidatorCallbackHandler also
> contains other saslMechanism information, other implementation classes of
> sasl.server.callback.handler.class have the same problem.
> In fact, I have reported an issue a long time ago, and KAFKA-13422 has the
> same RC and Solution as this issue.
> https://issues.apache.org/jira/browse/KAFKA-13422
> For more detailed RC analysis and solutions you can read the last two parts
> of the description of KAFKA-13422: Root Cause and Suggestion & Solutions.
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)