[
https://issues.apache.org/jira/browse/KAFKA-19958?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kirk True updated KAFKA-19958:
------------------------------
Component/s: clients
> Race condition between OAuthBearerSaslClientCallbackHandler and
> ExpiringCredentialRefreshingLogin.reLogin
> ---------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-19958
> URL: https://issues.apache.org/jira/browse/KAFKA-19958
> Project: Kafka
> Issue Type: Bug
> Components: clients, consumer
> Reporter: Kavpreet Grewal
> Priority: Major
>
> There is a race condition where a client using
> OAuthBearerSaslClientCallbackHandler might not have the authentication token
> in the subject's private credentials context due to it being cleared during
> token refresh.
>
> *Details:*
> When ExpiringCredentialRefreshingLogin is used, it starts a token refresh
> thread that
> [calls|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java#L107]
> ExpiringCredentialRefreshingLogin.reLogin() when the token needs to be
> refreshed. In reLogin, we will
> [logout|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java#L370]
> before we log back in and refresh the token. During logout in
> [OAuthBearerLoginModule|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java#L366],
> we remove the token from the Subject private credentials context. So the
> token is removed before it is renewed and added back to the context in
> [commit|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java#L396]
> (which is called right after login).
> This means there is a brief period of time where the context has no token. If
> a consumer client needs a new token (for any reason) during this period, it
> will be unable to retrieve a token from the context. So the callback handler
> in
> [OAuthBearerSaslClientCallbackHandler|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientCallbackHandler.java#L102]
> would fail.
> This is more likely to occur when a large number of data consumers
> (executors) are used, and a high volume of data is processed (resulting in
> numerous polls). When this happens, the Kafka consumer will fail with the
> error below.
>
> *Example error:*
> {code:java}
> javax.security.sasl.SaslException: No OAuth Bearer tokens in Subject's
> private credentials
> at
> kafkashaded.org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient.evaluateChallenge(OAuthBearerSaslClient.java:120)
> at
> kafkashaded.org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslToken$1(SaslClientAuthenticator.java:534)
> at
> java.security.AccessController.doPrivileged(AccessController.java:712)
> at javax.security.auth.Subject.doAs(Subject.java:439)
> at
> kafkashaded.org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:534)
> at
> kafkashaded.org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:433)
> at
> kafkashaded.org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendInitialToken(SaslClientAuthenticator.java:332)
> at
> kafkashaded.org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:273)
> at
> kafkashaded.org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:181)
> at
> kafkashaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)
> at
> kafkashaded.org.apache.kafka.common.network.Selector.poll(Selector.java:481)
> at
> kafkashaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560)
> at
> kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:280)
> at
> kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:251)
> at
> kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
> at
> kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164)
> at
> kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:573)
> at
> kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1272)
> at
> kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1236)
> at
> kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)