[ https://issues.apache.org/jira/browse/KAFKA-8016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Stanislav Kozlovski updated KAFKA-8016: --------------------------------------- Description: I think the consumer heartbeat thread has a possibility for a race condition that can crash it. I have seen the following client exception after a consumer group rebalance: {code:java} INFO Fetcher Resetting offset for partition _ to offset 32110985. INFO Fetcher Resetting offset for partition _ to offset 32108462. java.lang.IllegalStateException: No current assignment for partition X at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:259) at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:264) at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetIfNeeded(Fetcher.java:562) at org.apache.kafka.clients.consumer.internals.Fetcher.access$2100(Fetcher.java:93) at org.apache.kafka.clients.consumer.internals.Fetcher$2.onSuccess(Fetcher.java:589) at org.apache.kafka.clients.consumer.internals.Fetcher$2.onSuccess(Fetcher.java:577) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) at org.apache.kafka.clients.consumer.internals.Fetcher.handleListOffsetResponse(Fetcher.java:784) at org.apache.kafka.clients.consumer.internals.Fetcher.access$2300(Fetcher.java:93) at org.apache.kafka.clients.consumer.internals.Fetcher$4.onSuccess(Fetcher.java:704) at org.apache.kafka.clients.consumer.internals.Fetcher$4.onSuccess(Fetcher.java:699) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:563) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:300) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:948) {code} The logs also had this message in a close timeframe: {code:java} INFO ConsumerCoordinator Revoking previously assigned partitions [X, ...]{code} After investigating, I see that there might be a race condition: [Updating the fetch positions|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L2213] in the client [involves sending a `ListOffsetsRequest` request to the broker|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L603]. It is possible for the Heartbeat thread to initiate the code that handles the response in its run loop([1|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1036]->[2|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L247]) updateFetchPositions() is called from the public methods `Consumer#position()` and `Consumer#poll()`. The problem is that [onJoinPrepare|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L479] may mutate the `subscriptions` variable while the offset response handling by the heartbeat thread takes place. This results in `subscriptions.seek()` throwing an IllegalStateException. was: I have seen the following client exception after a consumer group rebalance: {code:java} INFO Fetcher Resetting offset for partition _ to offset 32110985. INFO Fetcher Resetting offset for partition _ to offset 32108462. java.lang.IllegalStateException: No current assignment for partition X at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:259) at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:264) at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetIfNeeded(Fetcher.java:562) at org.apache.kafka.clients.consumer.internals.Fetcher.access$2100(Fetcher.java:93) at org.apache.kafka.clients.consumer.internals.Fetcher$2.onSuccess(Fetcher.java:589) at org.apache.kafka.clients.consumer.internals.Fetcher$2.onSuccess(Fetcher.java:577) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) at org.apache.kafka.clients.consumer.internals.Fetcher.handleListOffsetResponse(Fetcher.java:784) at org.apache.kafka.clients.consumer.internals.Fetcher.access$2300(Fetcher.java:93) at org.apache.kafka.clients.consumer.internals.Fetcher$4.onSuccess(Fetcher.java:704) at org.apache.kafka.clients.consumer.internals.Fetcher$4.onSuccess(Fetcher.java:699) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:563) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:300) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:948) {code} The logs also had this message in a close timeframe: {code:java} INFO ConsumerCoordinator Revoking previously assigned partitions [X, ...]{code} After investigating, I see that there might be a race condition: [Updating the fetch positions|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L2213] in the client [involves sending a `ListOffsetsRequest` request to the broker|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L603]. When we receive a successful response, we [call `resetOffsetIfNeeded()`|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L616] for every fetched offset. {code:java} private void resetOffsetIfNeeded(TopicPartition partition, Long requestedResetTimestamp, OffsetData offsetData) { // we might lose the assignment while fetching the offset, or the user might seek to a different offset, // so verify it is still assigned and still in need of the requested reset if (!subscriptions.isAssigned(partition)) { log.debug("Skipping reset of partition {} since it is no longer assigned", partition); } else if (!subscriptions.isOffsetResetNeeded(partition)) { log.debug("Skipping reset of partition {} since reset is no longer needed", partition); } else if (!requestedResetTimestamp.equals(offsetResetStrategyTimestamp(partition))) { log.debug("Skipping reset of partition {} since an alternative reset has been requested", partition); } else { log.info("Resetting offset for partition {} to offset {}.", partition, offsetData.offset); subscriptions.seek(partition, offsetData.offset); } }{code} It is possible for the Heartbeat thread to initiate the callback that handles the response in its run loop([1|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1036]->[2|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L247]), ultimately accessing the `Fetcher#subscriptions` variable. The problem seems to be that [onJoinPrepare|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L479] may mutate the `Fetcher#subscriptions` variable while the offset response handling by the heartbeat thread takes place. This results in `subscriptions.seek()` throwing an IllegalStateException and crashing the heartbeat thread. > Race condition resulting in IllegalStateException inside Consumer Heartbeat > thread when consumer joins group > ------------------------------------------------------------------------------------------------------------ > > Key: KAFKA-8016 > URL: https://issues.apache.org/jira/browse/KAFKA-8016 > Project: Kafka > Issue Type: Improvement > Reporter: Stanislav Kozlovski > Assignee: Stanislav Kozlovski > Priority: Major > > I think the consumer heartbeat thread has a possibility for a race condition > that can crash it. > I have seen the following client exception after a consumer group rebalance: > {code:java} > INFO Fetcher Resetting offset for partition _ to offset 32110985. > INFO Fetcher Resetting offset for partition _ to offset 32108462. > java.lang.IllegalStateException: No current assignment for partition X > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:259) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:264) > at > org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetIfNeeded(Fetcher.java:562) > at > org.apache.kafka.clients.consumer.internals.Fetcher.access$2100(Fetcher.java:93) > at > org.apache.kafka.clients.consumer.internals.Fetcher$2.onSuccess(Fetcher.java:589) > at > org.apache.kafka.clients.consumer.internals.Fetcher$2.onSuccess(Fetcher.java:577) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > at > org.apache.kafka.clients.consumer.internals.Fetcher.handleListOffsetResponse(Fetcher.java:784) > at > org.apache.kafka.clients.consumer.internals.Fetcher.access$2300(Fetcher.java:93) > at > org.apache.kafka.clients.consumer.internals.Fetcher$4.onSuccess(Fetcher.java:704) > at > org.apache.kafka.clients.consumer.internals.Fetcher$4.onSuccess(Fetcher.java:699) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:563) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:300) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:948) > {code} > The logs also had this message in a close timeframe: > {code:java} > INFO ConsumerCoordinator Revoking previously assigned partitions [X, > ...]{code} > > After investigating, I see that there might be a race condition: > > [Updating the fetch > positions|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L2213] > in the client [involves sending a `ListOffsetsRequest` request to the > broker|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L603]. > It is possible for the Heartbeat thread to initiate the code that handles > the response in its run > loop([1|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1036]->[2|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L247]) > > updateFetchPositions() is called from the public methods > `Consumer#position()` and `Consumer#poll()`. > The problem is that > [onJoinPrepare|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L479] > may mutate the `subscriptions` variable while the offset response handling > by the heartbeat thread takes place. This results in `subscriptions.seek()` > throwing an IllegalStateException. -- This message was sent by Atlassian JIRA (v7.6.3#76005)