[ https://issues.apache.org/jira/browse/KAFKA-8950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16950702#comment-16950702 ]
Tom Lee edited comment on KAFKA-8950 at 10/14/19 4:08 AM: ---------------------------------------------------------- Hm not sure I see what you mean. I think you're right that checkDisconnects could be called by the coordinator/heartbeat thread at the same time send is invoked by the fetcher (edit: or the other way around!), so no disagreement there. But RequestFuture.addListener() will enqueue the listener in a ConcurrentLinkedQueue, then check if the future was previously succeeded or failed by checking an atomic reference before invoking fire\{Success,Failure\} which will then invoke the enqueued listener. So say we enqueue the listener then we "see" that the future is neither succeeded or failed. It's then guaranteed that the heartbeat/coordinator thread will invoke the listener because the enqueue "happened-before" the atomic reference write. On the other hand if the atomic reference write "happened-before" we check if the future is succeeded or failed, addListener will execute fireSuccess/fireFailure which will invoke the listener logic. It's certainly delicate, but I don't think it's incorrect. I personally think the issue is that we're somehow entirely missing the complete()/raise() somewhere, but I'd be happy to be wrong if we could get it fixed. :) was (Author: thomaslee): Hm not sure I see what you mean. I think you're right that checkDisconnects could be called by the coordinator/heartbeat thread at the same time send is invoked by the fetcher, so no disagreement there. But RequestFuture.addListener() will enqueue the listener in a ConcurrentLinkedQueue, then check if the future was previously succeeded or failed by checking an atomic reference before invoking fire{Success,Failure} which will then invoke the enqueued listener. So say we enqueue the listener then we "see" that the future is neither succeeded or failed. It's then guaranteed that the heartbeat/coordinator thread will invoke the listener because the enqueue "happened-before" the atomic reference write. On the other hand if the atomic reference write "happened-before" we check if the future is succeeded or failed, addListener will execute fireSuccess/fireFailure which will invoke the listener logic. It's certainly delicate, but I don't think it's incorrect. I personally think the issue is that we're somehow entirely missing the complete()/raise() somewhere, but I'd be happy to be wrong if we could get it fixed. :) > KafkaConsumer stops fetching > ---------------------------- > > Key: KAFKA-8950 > URL: https://issues.apache.org/jira/browse/KAFKA-8950 > Project: Kafka > Issue Type: Bug > Components: clients > Affects Versions: 2.3.0 > Environment: linux > Reporter: Will James > Priority: Major > > We have a KafkaConsumer consuming from a single partition with > enable.auto.commit set to true. > Very occasionally, the consumer goes into a broken state. It returns no > records from the broker with every poll, and from most of the Kafka metrics > in the consumer it looks like it is fully caught up to the end of the log. > We see that we are long polling for the max poll timeout, and that there is > zero lag. In addition, we see that the heartbeat rate stays unchanged from > before the issue begins (so the consumer stays a part of the consumer group). > In addition, from looking at the __consumer_offsets topic, it is possible to > see that the consumer is committing the same offset on the auto commit > interval, however, the offset does not move, and the lag from the broker's > perspective continues to increase. > The issue is only resolved by restarting our application (which restarts the > KafkaConsumer instance). > From a heap dump of an application in this state, I can see that the Fetcher > is in a state where it believes there are nodesWithPendingFetchRequests. > However, I can see the state of the fetch latency sensor, specifically, the > fetch rate, and see that the samples were not updated for a long period of > time (actually, precisely the amount of time that the problem in our > application was occurring, around 50 hours - we have alerting on other > metrics but not the fetch rate, so we didn't notice the problem until a > customer complained). > In this example, the consumer was processing around 40 messages per second, > with an average size of about 10kb, although most of the other examples of > this have happened with higher volume (250 messages / second, around 23kb per > message on average). > I have spent some time investigating the issue on our end, and will continue > to do so as time allows, however I wanted to raise this as an issue because > it may be affecting other people. > Please let me know if you have any questions or need additional information. > I doubt I can provide heap dumps unfortunately, but I can provide further > information as needed. -- This message was sent by Atlassian Jira (v8.3.4#803005)