[ 
https://issues.apache.org/jira/browse/KAFKA-19958?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kavpreet Grewal updated KAFKA-19958:
------------------------------------
    Description: 
There is a race condition where a client using 
OAuthBearerSaslClientCallbackHandler might not have the authentication token in 
the Subject context due to it being cleared during token refresh.

 

*Details:*

When ExpiringCredentialRefreshingLogin is used, it starts a token refresh 
thread that 
[calls|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java#L107]
 ExpiringCredentialRefreshingLogin.reLogin() when the token needs to be 
refreshed. In reLogin, we will 
[logout|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java#L370]
 before we log back in and refresh the token. During logout in 
[OAuthBearerLoginModule|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java#L366],
 we remove the token from the Subject private credentials context. So the token 
is removed before it is renewed and added back to the context in 
[commit|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java#L396]
 (which is called right after login).

This means there is a brief period of time where the context has no token. If a 
consumer client needs a new token (for any reason) during this period, it will 
be unable to retrieve a token from the context. So the callback handler in 
[OAuthBearerSaslClientCallbackHandler|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientCallbackHandler.java#L102]
 would fail.

This is more likely to occur when a large number of data consumers (executors) 
are used, and a high volume of data is processed (resulting in numerous polls). 
When this happens, the Kafka consumer will fail with the error below.

 

*Example error:*
{code:java}
javax.security.sasl.SaslException: No OAuth Bearer tokens in Subject's private 
credentials
        at 
kafkashaded.org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient.evaluateChallenge(OAuthBearerSaslClient.java:120)
        at 
kafkashaded.org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslToken$1(SaslClientAuthenticator.java:534)
        at 
java.security.AccessController.doPrivileged(AccessController.java:712)
        at javax.security.auth.Subject.doAs(Subject.java:439)
        at 
kafkashaded.org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:534)
        at 
kafkashaded.org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:433)
        at 
kafkashaded.org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendInitialToken(SaslClientAuthenticator.java:332)
        at 
kafkashaded.org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:273)
        at 
kafkashaded.org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:181)
        at 
kafkashaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)
        at 
kafkashaded.org.apache.kafka.common.network.Selector.poll(Selector.java:481)
        at 
kafkashaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560)
        at 
kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:280)
        at 
kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:251)
        at 
kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
        at 
kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164)
        at 
kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:573)
        at 
kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1272)
        at 
kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1236)
        at 
kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
 {code}

  was:
There is a race condition where a client using 
OAuthBearerSaslClientCallbackHandler might not have the authentication token in 
the Subject context due to it being cleared during token refresh.

 

*Details:*

When ExpiringCredentialRefreshingLogin is used, it starts a token refresh 
thread that 
[calls|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java#L107]
 ExpiringCredentialRefreshingLogin.reLogin() when the token needs to be 
refreshed. In reLogin, we will 
[logout|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java#L370]
 before we log back in and refresh the token. During logout in 
[OAuthBearerLoginModule|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java#L366],
 we remove the token from the Subject private credentials context. So the token 
is removed before it is renewed and added back to the context in 
[commit|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java#L396]
 (which is called right after login).

This means there is a brief period of time where the context has no token. If a 
consumer client needs a new token (for any reason) during this period, it will 
be unable to retrieve a token from the context. So the callback handler in 
[OAuthBearerSaslClientCallbackHandler|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientCallbackHandler.java#L102]
 would fail.

This is more prone to happening when there are many data consumers (executors) 
being used, and there is a high volume of data (so there are many polls). When 
this happens, the Kafka consumer will fail with the error below.

 

*Example error:*
{code:java}
javax.security.sasl.SaslException: No OAuth Bearer tokens in Subject's private 
credentials
        at 
kafkashaded.org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient.evaluateChallenge(OAuthBearerSaslClient.java:120)
        at 
kafkashaded.org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslToken$1(SaslClientAuthenticator.java:534)
        at 
java.security.AccessController.doPrivileged(AccessController.java:712)
        at javax.security.auth.Subject.doAs(Subject.java:439)
        at 
kafkashaded.org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:534)
        at 
kafkashaded.org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:433)
        at 
kafkashaded.org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendInitialToken(SaslClientAuthenticator.java:332)
        at 
kafkashaded.org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:273)
        at 
kafkashaded.org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:181)
        at 
kafkashaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)
        at 
kafkashaded.org.apache.kafka.common.network.Selector.poll(Selector.java:481)
        at 
kafkashaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560)
        at 
kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:280)
        at 
kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:251)
        at 
kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
        at 
kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164)
        at 
kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:573)
        at 
kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1272)
        at 
kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1236)
        at 
kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
 {code}


> Race condition between OAuthBearerSaslClientCallbackHandler and 
> ExpiringCredentialRefreshingLogin.reLogin
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-19958
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19958
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>            Reporter: Kavpreet Grewal
>            Priority: Major
>
> There is a race condition where a client using 
> OAuthBearerSaslClientCallbackHandler might not have the authentication token 
> in the Subject context due to it being cleared during token refresh.
>  
> *Details:*
> When ExpiringCredentialRefreshingLogin is used, it starts a token refresh 
> thread that 
> [calls|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java#L107]
>  ExpiringCredentialRefreshingLogin.reLogin() when the token needs to be 
> refreshed. In reLogin, we will 
> [logout|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/expiring/ExpiringCredentialRefreshingLogin.java#L370]
>  before we log back in and refresh the token. During logout in 
> [OAuthBearerLoginModule|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java#L366],
>  we remove the token from the Subject private credentials context. So the 
> token is removed before it is renewed and added back to the context in 
> [commit|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java#L396]
>  (which is called right after login).
> This means there is a brief period of time where the context has no token. If 
> a consumer client needs a new token (for any reason) during this period, it 
> will be unable to retrieve a token from the context. So the callback handler 
> in 
> [OAuthBearerSaslClientCallbackHandler|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientCallbackHandler.java#L102]
>  would fail.
> This is more likely to occur when a large number of data consumers 
> (executors) are used, and a high volume of data is processed (resulting in 
> numerous polls). When this happens, the Kafka consumer will fail with the 
> error below.
>  
> *Example error:*
> {code:java}
> javax.security.sasl.SaslException: No OAuth Bearer tokens in Subject's 
> private credentials
>       at 
> kafkashaded.org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient.evaluateChallenge(OAuthBearerSaslClient.java:120)
>       at 
> kafkashaded.org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslToken$1(SaslClientAuthenticator.java:534)
>       at 
> java.security.AccessController.doPrivileged(AccessController.java:712)
>       at javax.security.auth.Subject.doAs(Subject.java:439)
>       at 
> kafkashaded.org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:534)
>       at 
> kafkashaded.org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:433)
>       at 
> kafkashaded.org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendInitialToken(SaslClientAuthenticator.java:332)
>       at 
> kafkashaded.org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:273)
>       at 
> kafkashaded.org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:181)
>       at 
> kafkashaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)
>       at 
> kafkashaded.org.apache.kafka.common.network.Selector.poll(Selector.java:481)
>       at 
> kafkashaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560)
>       at 
> kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:280)
>       at 
> kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:251)
>       at 
> kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
>       at 
> kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164)
>       at 
> kafkashaded.org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:573)
>       at 
> kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1272)
>       at 
> kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1236)
>       at 
> kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to