[ 
https://issues.apache.org/jira/browse/KAFKA-17734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-17734:
---------------------------------

    Assignee:     (was: Kirk True)

> KafkaConsumer.close(0) can block indefinitely in ConsumerNetworkClient.poll
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-17734
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17734
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>    Affects Versions: 3.8.0
>         Environment: openjdk version "17.0.12" 2024-07-16
> OpenJDK Runtime Environment (build 17.0.12+7-Ubuntu-1ubuntu222.04)
> OpenJDK 64-Bit Server VM (build 17.0.12+7-Ubuntu-1ubuntu222.04, mixed mode, 
> sharing)
>            Reporter: Kyle Kingsbury
>            Priority: Major
>
> This might be related to KAFKA-17263, but I've got a slightly different 
> stacktrace.
> With the official Java Kafka client library, version 3.8.0, calls to 
> `consumer.close()` can stall indefinitely even though I've provided a 
> zero-second timeout. I've tried spawning a separate thread which calls 
> consumer.wakeup() after one second just to make sure the client gets woken 
> up, and that doesn't seem to work either. Jstack indicates the thread calling 
> consumer.close() is stuck on AbstractCoordinator.maybeLeaveGroup -> 
> ConsumerNetworkClient.poll, which in turn is stuck on a future completion 
> handler, which goes *back* into the AbstractCoordinator, which in turn hits 
> onLeaderElected, which eventually issues ConsumerNetworkClient.poll(), and 
> that looks to ground out in a blocking `select()` call.
> It would be really nice if there were a way to reliably release client 
> resources--threadpools, network connections, etc--that didn't do in-line 
> network IO.
> {{"jepsen worker 24" #24101 prio=5 os_prio=0 cpu=6680.43ms elapsed=10760.37s 
> tid=0x00007462200059e0 nid=0x4909a runnable  [0x00007474133fd000]
>    java.lang.Thread.State: RUNNABLE
>       at sun.nio.ch.EPoll.wait([email protected]/Native Method)
>       at 
> sun.nio.ch.EPollSelectorImpl.doSelect([email protected]/EPollSelectorImpl.java:118)
>       at 
> sun.nio.ch.SelectorImpl.lockAndDoSelect([email protected]/SelectorImpl.java:129)
>       - locked <0x00000002dcbe20d0> (a sun.nio.ch.Util$2)
>       - locked <0x00000002dcbe2080> (a sun.nio.ch.EPollSel"jepsen worker 24" 
> #24101 prio=5 os_prio=0 cpu=6680.43ms elapsed=10760.37s 
> tid=0x00007462200059e0 nid=0x4909a runnable  [0x00007474133fd000]
>    java.lang.Thread.State: RUNNABLE
>       at sun.nio.ch.EPoll.wait([email protected]/Native Method)
>       at 
> sun.nio.ch.EPollSelectorImpl.doSelect([email protected]/EPollSelectorImpl.java:118)
>       at 
> sun.nio.ch.SelectorImpl.lockAndDoSelect([email protected]/SelectorImpl.java:129)
>       - locked <0x00000002dcbe20d0> (a sun.nio.ch.Util$2)
>       - locked <0x00000002dcbe2080> (a sun.nio.ch.EPollSelectorImpl)
>       at 
> sun.nio.ch.SelectorImpl.select([email protected]/SelectorImpl.java:141)
>       at org.apache.kafka.common.network.Selector.select(Selector.java:878)
>       at org.apache.kafka.common.network.Selector.poll(Selector.java:469)
>       at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:595)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:281)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:252)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:243)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:165)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.ensureFreshMetadata(ConsumerNetworkClient.java:176)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.updateGroupSubscription(ConsumerCoordinator.java:557)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeaderElected(ConsumerCoordinator.java:636)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onLeaderElected(AbstractCoordinator.java:766)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:117)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:670)
>       - locked <0x00000002dcbe2228> (a 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:631)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1310)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1285)
>       at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
>       at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
>       at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:616)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:428)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:313)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:322)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.maybeLeaveGroup(AbstractCoordinator.java:1181)
>       - locked <0x00000002dcbe2228> (a 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:1131)
>       - locked <0x00000002dcbe2228> (a 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:986)
>       at 
> org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.lambda$close$3(LegacyKafkaConsumer.java:1135)
>       at 
> org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer$$Lambda$466/0x0000747234d46198.run(Unknown
>  Source)
>       at org.apache.kafka.common.utils.Utils.swallow(Utils.java:1042)
>       at 
> org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.close(LegacyKafkaConsumer.java:1135)
>       at 
> org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.close(LegacyKafkaConsumer.java:1104)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1777)
>       at 
> jepsen.redpanda.client$close_consumer_BANG_$fn__7227.invoke(client.clj:212)
>       at 
> jepsen.redpanda.client$close_consumer_BANG_.invokeStatic(client.clj:211)
>       at jepsen.redpanda.client$close_consumer_BANG_.invoke(client.clj:200)
>       at jepsen.redpanda.workload.queue.Client.close_BANG_(queue.clj:782)
>       at jepsen.client.Validate.close_BANG_(client.clj:81)
>       at 
> jepsen.generator.interpreter.ClientWorker.close_BANG_(interpreter.clj:69)
>       at 
> jepsen.generator.interpreter.ClientWorker.invoke_BANG_(interpreter.clj:47)
>       at 
> jepsen.generator.interpreter$spawn_worker$fn__13745$fn__13746.invoke(interpreter.clj:140)
>       at 
> jepsen.generator.interpreter$spawn_worker$fn__13745.invoke(interpreter.clj:123)
>       at clojure.core$binding_conveyor_fn$fn__5842.invoke(core.clj:2047)
>       at clojure.lang.AFn.call(AFn.java:18)
>       at 
> java.util.concurrent.FutureTask.run([email protected]/FutureTask.java:264)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1136)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:635)
>       at java.lang.Thread.run([email protected]/Thread.java:840)ectorImpl)
>       at 
> sun.nio.ch.SelectorImpl.select([email protected]/SelectorImpl.java:141)
>       at org.apache.kafka.common.network.Selector.select(Selector.java:878)
>       at org.apache.kafka.common.network.Selector.poll(Selector.java:469)
>       at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:595)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:281)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:252)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:243)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:165)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.ensureFreshMetadata(ConsumerNetworkClient.java:176)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.updateGroupSubscription(ConsumerCoordinator.java:557)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeaderElected(ConsumerCoordinator.java:636)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onLeaderElected(AbstractCoordinator.java:766)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:117)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:670)
>       - locked <0x00000002dcbe2228> (a 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:631)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1310)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1285)
>       at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
>       at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
>       at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:616)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:428)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:313)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:322)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.maybeLeaveGroup(AbstractCoordinator.java:1181)
>       - locked <0x00000002dcbe2228> (a 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:1131)
>       - locked <0x00000002dcbe2228> (a 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:986)
>       at 
> org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.lambda$close$3(LegacyKafkaConsumer.java:1135)
>       at 
> org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer$$Lambda$466/0x0000747234d46198.run(Unknown
>  Source)
>       at org.apache.kafka.common.utils.Utils.swallow(Utils.java:1042)
>       at 
> org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.close(LegacyKafkaConsumer.java:1135)
>       at 
> org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.close(LegacyKafkaConsumer.java:1104)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1777)
>       at 
> jepsen.redpanda.client$close_consumer_BANG_$fn__7227.invoke(client.clj:212)
>       at 
> jepsen.redpanda.client$close_consumer_BANG_.invokeStatic(client.clj:211)
>       at jepsen.redpanda.client$close_consumer_BANG_.invoke(client.clj:200)
>       at jepsen.redpanda.workload.queue.Client.close_BANG_(queue.clj:782)
>       at jepsen.client.Validate.close_BANG_(client.clj:81)
>       at 
> jepsen.generator.interpreter.ClientWorker.close_BANG_(interpreter.clj:69)
>       at 
> jepsen.generator.interpreter.ClientWorker.invoke_BANG_(interpreter.clj:47)
>       at 
> jepsen.generator.interpreter$spawn_worker$fn__13745$fn__13746.invoke(interpreter.clj:140)
>       at 
> jepsen.generator.interpreter$spawn_worker$fn__13745.invoke(interpreter.clj:123)
>       at clojure.core$binding_conveyor_fn$fn__5842.invoke(core.clj:2047)
>       at clojure.lang.AFn.call(AFn.java:18)
>       at 
> java.util.concurrent.FutureTask.run([email protected]/FutureTask.java:264)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1136)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:635)
>       at java.lang.Thread.run([email protected]/Thread.java:840)}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to