[ 
https://issues.apache.org/jira/browse/KAFKA-18143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17903377#comment-17903377
 ] 

Ravi Gupta commented on KAFKA-18143:
------------------------------------

Hi [~lianetm] 

The issue in our production environment arises when the heartbeat thread 
encounters an "Authentication failed" error. According to the current 
implementation, the heartbeat thread is terminated in such cases. With our use 
of the AWS MSK IAM authentication module, this is typically a transient problem 
that resolves itself over time. However, due to the termination of the Kafka 
client's heartbeat thread, the consumer group undergoes a rebalance, resulting 
in the partition being reassigned to a different consumer.

Unfortunately, the original 'zombie' consumer is not notified of this 
rebalance, likely due to the eviction of the heartbeat thread, and continues to 
poll for messages. The key issue here is that polling is not fenced, allowing 
the zombie consumer to keep receiving messages. Although committing in the 
zombie consumer would likely trigger an exception, we currently have no need to 
commit, enabling the zombie consumer to continuously receive duplicate records 
indefinitely.

Below is the exception stack in the heartbeat thread.

 
{code:java}
org.apache.kafka.common.errors.SaslAuthenticationException: An error: 
(java.security.PrivilegedActionException: javax.security.sasl.SaslException: 
Failed to find AWS IAM Credentials [Caused by 
com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceException: 
Request is missing Authentication Token (Service: AWSSecurityTokenService; 
Status Code: 403; Error Code: MissingAuthenticationToken; Request ID: 
05ac8595-6a27-42b4-b765-052c593affd5; Proxy: null)]) occurred when evaluating 
SASL token received from the Kafka Broker. Kafka Client will go to 
AUTHENTICATION_FAILED state.
Caused by: javax.security.sasl.SaslException: Failed to find AWS IAM Credentials
        at 
software.amazon.msk.auth.iam.internals.IAMSaslClient.generateClientMessage(IAMSaslClient.java:149)
        at 
software.amazon.msk.auth.iam.internals.IAMSaslClient.evaluateChallenge(IAMSaslClient.java:96)
        at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslToken$1(SaslClientAuthenticator.java:534)
        at java.base/java.security.AccessController.doPrivileged(Native Method)
        at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
        at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:534)
        at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:433)
        at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendInitialToken(SaslClientAuthenticator.java:332)
        at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:273)
        at 
org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:181)
        at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:571)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:280)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:321)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1460)
 {code}
 

 

> Kafka consumer keeps getting records on poll after eviction from group
> ----------------------------------------------------------------------
>
>                 Key: KAFKA-18143
>                 URL: https://issues.apache.org/jira/browse/KAFKA-18143
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>    Affects Versions: 3.7.1, 3.9.0
>            Reporter: Ravi Gupta
>            Priority: Major
>
> My application polls records from a MSK Kafka cluster. The application 
> maintains offset of each partition and hence has disabled autocommit. It 
> actually never commits offset as it persists offset to internal data store.
> In production, we observed duplicate records. {*}The duplicate records don't 
> stop until we restart the instance with zombie consumer (evicted from group 
> but keeps polling){*}. A consumer turns to zombie when it fails to send 
> heartbeats. This typically happens due to IAM authentication issues in MSK 
> which sometime lasts for longer time.
> On further digging, I found that the partitions that are assigned to zombie 
> consumer are assigned to other active consumers, but the zombie consumers 
> poll continue to return the records.
> The question is - *should a zombie consumer get records on poll?*
> I have been able to reproduce it locally. Here is my local setup with issue 
> reproduced:
>  * A single broker (docker image) with three different external port.
>  * Create one topic with two partitions
>  * [Toxiproxy|https://github.com/Shopify/toxiproxy] proxies these port.
>  * Two consumers (subscribed to the topic) each connecting one of the proxied 
> port with session timeout set to 10 seconds
>  * Introduce latency in one of the proxy to evict one of the consumer from 
> the group
>  * Produce some messages for each partition
>  * Both the consumer keep getting the messages



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to