[ 
https://issues.apache.org/jira/browse/KAFKA-8016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-8016:
---------------------------------------
    Description: 
I think the consumer heartbeat thread has a possibility for a race condition 
that can crash it.


I have seen the following client exception after a consumer group rebalance:
{code:java}
INFO  Fetcher  Resetting offset for partition _ to offset 32110985.
INFO  Fetcher  Resetting offset for partition _ to offset 32108462.

java.lang.IllegalStateException: No current assignment for partition X
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:259)
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:264)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetIfNeeded(Fetcher.java:562)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.access$2100(Fetcher.java:93)
at 
org.apache.kafka.clients.consumer.internals.Fetcher$2.onSuccess(Fetcher.java:589)
at 
org.apache.kafka.clients.consumer.internals.Fetcher$2.onSuccess(Fetcher.java:577)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.handleListOffsetResponse(Fetcher.java:784)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.access$2300(Fetcher.java:93)
at 
org.apache.kafka.clients.consumer.internals.Fetcher$4.onSuccess(Fetcher.java:704)
at 
org.apache.kafka.clients.consumer.internals.Fetcher$4.onSuccess(Fetcher.java:699)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:563)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:300)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:948)
{code}
The logs also had this message in a close timeframe:
{code:java}
INFO ConsumerCoordinator Revoking previously assigned partitions [X, ...]{code}
 

After investigating, I see that there might be a race condition:
 
 [Updating the fetch 
positions|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L2213]
 in the client [involves sending a `ListOffsetsRequest` request to the 
broker|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L603].
 It is possible for the Heartbeat thread to initiate the code that handles the 
response in its run 
loop([1|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1036]->[2|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L247])
 
updateFetchPositions() is called from the public methods `Consumer#position()` 
and  `Consumer#poll()`.
The problem is that 
[onJoinPrepare|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L479]
 may mutate the `subscriptions` variable while the offset response handling by 
the heartbeat thread takes place. This results in `subscriptions.seek()` 
throwing an IllegalStateException.

  was:
I have seen the following client exception after a consumer group rebalance:
{code:java}
INFO  Fetcher  Resetting offset for partition _ to offset 32110985.
INFO  Fetcher  Resetting offset for partition _ to offset 32108462.

java.lang.IllegalStateException: No current assignment for partition X
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:259)
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:264)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetIfNeeded(Fetcher.java:562)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.access$2100(Fetcher.java:93)
at 
org.apache.kafka.clients.consumer.internals.Fetcher$2.onSuccess(Fetcher.java:589)
at 
org.apache.kafka.clients.consumer.internals.Fetcher$2.onSuccess(Fetcher.java:577)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.handleListOffsetResponse(Fetcher.java:784)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.access$2300(Fetcher.java:93)
at 
org.apache.kafka.clients.consumer.internals.Fetcher$4.onSuccess(Fetcher.java:704)
at 
org.apache.kafka.clients.consumer.internals.Fetcher$4.onSuccess(Fetcher.java:699)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:563)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:300)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:948)
{code}
The logs also had this message in a close timeframe:
{code:java}
INFO ConsumerCoordinator Revoking previously assigned partitions [X, ...]{code}
 

After investigating, I see that there might be a race condition:
 
 [Updating the fetch 
positions|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L2213]
 in the client [involves sending a `ListOffsetsRequest` request to the 
broker|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L603].
 When we receive a successful response, we [call 
`resetOffsetIfNeeded()`|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L616]
 for every fetched offset.
{code:java}
private void resetOffsetIfNeeded(TopicPartition partition, Long 
requestedResetTimestamp, OffsetData offsetData) {
    // we might lose the assignment while fetching the offset, or the user 
might seek to a different offset,
    // so verify it is still assigned and still in need of the requested reset
    if (!subscriptions.isAssigned(partition)) {
        log.debug("Skipping reset of partition {} since it is no longer 
assigned", partition);
    } else if (!subscriptions.isOffsetResetNeeded(partition)) {
        log.debug("Skipping reset of partition {} since reset is no longer 
needed", partition);
    } else if 
(!requestedResetTimestamp.equals(offsetResetStrategyTimestamp(partition))) {
        log.debug("Skipping reset of partition {} since an alternative reset 
has been requested", partition);
    } else {
        log.info("Resetting offset for partition {} to offset {}.", partition, 
offsetData.offset);
        subscriptions.seek(partition, offsetData.offset);
    }
}{code}

It is possible for the Heartbeat thread to initiate the callback that handles 
the response in its run 
loop([1|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1036]->[2|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L247]),
 ultimately accessing the `Fetcher#subscriptions` variable.
The problem seems to be that 
[onJoinPrepare|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L479]
 may mutate the `Fetcher#subscriptions` variable while the offset response 
handling by the heartbeat thread takes place. This results in 
`subscriptions.seek()` throwing an IllegalStateException and crashing the 
heartbeat thread.

 


> Race condition resulting in IllegalStateException inside Consumer Heartbeat 
> thread when consumer joins group
> ------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-8016
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8016
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Stanislav Kozlovski
>            Assignee: Stanislav Kozlovski
>            Priority: Major
>
> I think the consumer heartbeat thread has a possibility for a race condition 
> that can crash it.
> I have seen the following client exception after a consumer group rebalance:
> {code:java}
> INFO  Fetcher  Resetting offset for partition _ to offset 32110985.
> INFO  Fetcher  Resetting offset for partition _ to offset 32108462.
> java.lang.IllegalStateException: No current assignment for partition X
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:259)
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:264)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetIfNeeded(Fetcher.java:562)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.access$2100(Fetcher.java:93)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher$2.onSuccess(Fetcher.java:589)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher$2.onSuccess(Fetcher.java:577)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.handleListOffsetResponse(Fetcher.java:784)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.access$2300(Fetcher.java:93)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher$4.onSuccess(Fetcher.java:704)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher$4.onSuccess(Fetcher.java:699)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:563)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:300)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:948)
> {code}
> The logs also had this message in a close timeframe:
> {code:java}
> INFO ConsumerCoordinator Revoking previously assigned partitions [X, 
> ...]{code}
>  
> After investigating, I see that there might be a race condition:
>  
>  [Updating the fetch 
> positions|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L2213]
>  in the client [involves sending a `ListOffsetsRequest` request to the 
> broker|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L603].
>  It is possible for the Heartbeat thread to initiate the code that handles 
> the response in its run 
> loop([1|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1036]->[2|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L247])
>  
> updateFetchPositions() is called from the public methods 
> `Consumer#position()` and  `Consumer#poll()`.
> The problem is that 
> [onJoinPrepare|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L479]
>  may mutate the `subscriptions` variable while the offset response handling 
> by the heartbeat thread takes place. This results in `subscriptions.seek()` 
> throwing an IllegalStateException.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to