[ 
https://issues.apache.org/jira/browse/KAFKA-19958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18043612#comment-18043612
 ] 

jagadeesh commented on KAFKA-19958:
-----------------------------------

Hi [~kirktrue] 

If no one is actively working on this, I’d like to take the issue.

I’m new to this part of the project and wanted to validate my understanding 
before starting.

>From debugging, the behavior only occurs when 
>{{logoutRequiredBeforeLoggingBackIn}} is {{{}true{}}}. In that case the 
>refresh thread performs:
 # {{logout()}}

 # then a new {{login()}}

This temporarily removes the expiring credential from the {{{}Subject{}}}. 
During that window, another thread may try to access the credential and fails 
because the token is briefly unavailable.

Given that, I wanted to ask:
h4. 1. Should we temporarily store the credential?

For example, during:
 
{{if (hasExpiringCredential && logoutRequiredBeforeLoggingBackIn) \{
    loginContext.logout();
}}}
Before the logout, should we retain the credential so that other threads can 
still read a valid token during the re-login sequence?
Or is the expectation that once logout() is called, no other thread should 
continue using the previous credential?
h4. 2. Or should the solution be retry logic when the credential is missing?

Meaning: if a caller requests the OAuth credential at the moment when the 
refresh thread has logged out but not yet finished logging back in, we retry 
instead of failing immediately.

I haven't yet traced all the consumer/producer paths where the credential is 
accessed, so I want to confirm the preferred direction before diving deeper.

Thanks!

> Race condition between OAuthBearerSaslClientCallbackHandler and 
> ExpiringCredentialRefreshingLogin.reLogin
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-19958
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19958
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer, security
>    Affects Versions: 4.1.0
>            Reporter: Kavpreet Grewal
>            Priority: Major
>              Labels: OAuth2, oauth2
>             Fix For: 4.3.0
>
>
> There is a race condition where a client using 
> OAuthBearerSaslClientCallbackHandler might not have the authentication token 
> in the subject's private credentials context due to it being cleared during 
> token refresh.
>  
> *Details:*
> When ExpiringCredentialRefreshingLogin is used, it starts a token refresh 
> thread that 
> [calls|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java#L107]
>  ExpiringCredentialRefreshingLogin.reLogin() when the token needs to be 
> refreshed. In reLogin, we will 
> [logout|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java#L370]
>  before we log back in and refresh the token. During logout in 
> [OAuthBearerLoginModule|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java#L366],
>  we remove the token from the Subject private credentials context. So the 
> token is removed before it is renewed and added back to the context in 
> [commit|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java#L396]
>  (which is called right after login).
> This means there is a brief period of time where the context has no token. If 
> a consumer client needs a new token (for any reason) during this period, it 
> will be unable to retrieve a token from the context. So the callback handler 
> in 
> [OAuthBearerSaslClientCallbackHandler|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientCallbackHandler.java#L102]
>  would fail.
> This is more likely to occur when a large number of data consumers 
> (executors) are used, and a high volume of data is processed (resulting in 
> numerous polls). When this happens, the Kafka consumer will fail with the 
> error below.
>  
> *Example error:*
> {code:java}
> javax.security.sasl.SaslException: No OAuth Bearer tokens in Subject's 
> private credentials
>       at 
> kafkashaded.org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient.evaluateChallenge(OAuthBearerSaslClient.java:120)
>       at 
> kafkashaded.org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslToken$1(SaslClientAuthenticator.java:534)
>       at 
> java.security.AccessController.doPrivileged(AccessController.java:712)
>       at javax.security.auth.Subject.doAs(Subject.java:439)
>       at 
> kafkashaded.org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:534)
>       at 
> kafkashaded.org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:433)
>       at 
> kafkashaded.org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendInitialToken(SaslClientAuthenticator.java:332)
>       at 
> kafkashaded.org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:273)
>       at 
> kafkashaded.org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:181)
>       at 
> kafkashaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)
>       at 
> kafkashaded.org.apache.kafka.common.network.Selector.poll(Selector.java:481)
>       at 
> kafkashaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560)
>       at 
> kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:280)
>       at 
> kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:251)
>       at 
> kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
>       at 
> kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164)
>       at 
> kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:573)
>       at 
> kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1272)
>       at 
> kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1236)
>       at 
> kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to