[
https://issues.apache.org/jira/browse/KAFKA-17734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kirk True reassigned KAFKA-17734:
---------------------------------
Assignee: Kirk True
> KafkaConsumer.close(0) can block indefinitely in ConsumerNetworkClient.poll
> ---------------------------------------------------------------------------
>
> Key: KAFKA-17734
> URL: https://issues.apache.org/jira/browse/KAFKA-17734
> Project: Kafka
> Issue Type: Bug
> Components: clients, consumer
> Affects Versions: 3.8.0
> Environment: openjdk version "17.0.12" 2024-07-16
> OpenJDK Runtime Environment (build 17.0.12+7-Ubuntu-1ubuntu222.04)
> OpenJDK 64-Bit Server VM (build 17.0.12+7-Ubuntu-1ubuntu222.04, mixed mode,
> sharing)
> Reporter: Kyle Kingsbury
> Assignee: Kirk True
> Priority: Major
>
> This might be related to KAFKA-17263, but I've got a slightly different
> stacktrace.
> With the official Java Kafka client library, version 3.8.0, calls to
> `consumer.close()` can stall indefinitely even though I've provided a
> zero-second timeout. I've tried spawning a separate thread which calls
> consumer.wakeup() after one second just to make sure the client gets woken
> up, and that doesn't seem to work either. Jstack indicates the thread calling
> consumer.close() is stuck on AbstractCoordinator.maybeLeaveGroup ->
> ConsumerNetworkClient.poll, which in turn is stuck on a future completion
> handler, which goes *back* into the AbstractCoordinator, which in turn hits
> onLeaderElected, which eventually issues ConsumerNetworkClient.poll(), and
> that looks to ground out in a blocking `select()` call.
> It would be really nice if there were a way to reliably release client
> resources--threadpools, network connections, etc--that didn't do in-line
> network IO.
> {{"jepsen worker 24" #24101 prio=5 os_prio=0 cpu=6680.43ms elapsed=10760.37s
> tid=0x00007462200059e0 nid=0x4909a runnable [0x00007474133fd000]
> java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.EPoll.wait([email protected]/Native Method)
> at
> sun.nio.ch.EPollSelectorImpl.doSelect([email protected]/EPollSelectorImpl.java:118)
> at
> sun.nio.ch.SelectorImpl.lockAndDoSelect([email protected]/SelectorImpl.java:129)
> - locked <0x00000002dcbe20d0> (a sun.nio.ch.Util$2)
> - locked <0x00000002dcbe2080> (a sun.nio.ch.EPollSel"jepsen worker 24"
> #24101 prio=5 os_prio=0 cpu=6680.43ms elapsed=10760.37s
> tid=0x00007462200059e0 nid=0x4909a runnable [0x00007474133fd000]
> java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.EPoll.wait([email protected]/Native Method)
> at
> sun.nio.ch.EPollSelectorImpl.doSelect([email protected]/EPollSelectorImpl.java:118)
> at
> sun.nio.ch.SelectorImpl.lockAndDoSelect([email protected]/SelectorImpl.java:129)
> - locked <0x00000002dcbe20d0> (a sun.nio.ch.Util$2)
> - locked <0x00000002dcbe2080> (a sun.nio.ch.EPollSelectorImpl)
> at
> sun.nio.ch.SelectorImpl.select([email protected]/SelectorImpl.java:141)
> at org.apache.kafka.common.network.Selector.select(Selector.java:878)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:469)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:595)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:281)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:252)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:243)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:165)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.ensureFreshMetadata(ConsumerNetworkClient.java:176)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.updateGroupSubscription(ConsumerCoordinator.java:557)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeaderElected(ConsumerCoordinator.java:636)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onLeaderElected(AbstractCoordinator.java:766)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:117)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:670)
> - locked <0x00000002dcbe2228> (a
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:631)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1310)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1285)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:616)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:428)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:313)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:322)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.maybeLeaveGroup(AbstractCoordinator.java:1181)
> - locked <0x00000002dcbe2228> (a
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:1131)
> - locked <0x00000002dcbe2228> (a
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:986)
> at
> org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.lambda$close$3(LegacyKafkaConsumer.java:1135)
> at
> org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer$$Lambda$466/0x0000747234d46198.run(Unknown
> Source)
> at org.apache.kafka.common.utils.Utils.swallow(Utils.java:1042)
> at
> org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.close(LegacyKafkaConsumer.java:1135)
> at
> org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.close(LegacyKafkaConsumer.java:1104)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1777)
> at
> jepsen.redpanda.client$close_consumer_BANG_$fn__7227.invoke(client.clj:212)
> at
> jepsen.redpanda.client$close_consumer_BANG_.invokeStatic(client.clj:211)
> at jepsen.redpanda.client$close_consumer_BANG_.invoke(client.clj:200)
> at jepsen.redpanda.workload.queue.Client.close_BANG_(queue.clj:782)
> at jepsen.client.Validate.close_BANG_(client.clj:81)
> at
> jepsen.generator.interpreter.ClientWorker.close_BANG_(interpreter.clj:69)
> at
> jepsen.generator.interpreter.ClientWorker.invoke_BANG_(interpreter.clj:47)
> at
> jepsen.generator.interpreter$spawn_worker$fn__13745$fn__13746.invoke(interpreter.clj:140)
> at
> jepsen.generator.interpreter$spawn_worker$fn__13745.invoke(interpreter.clj:123)
> at clojure.core$binding_conveyor_fn$fn__5842.invoke(core.clj:2047)
> at clojure.lang.AFn.call(AFn.java:18)
> at
> java.util.concurrent.FutureTask.run([email protected]/FutureTask.java:264)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1136)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:635)
> at java.lang.Thread.run([email protected]/Thread.java:840)ectorImpl)
> at
> sun.nio.ch.SelectorImpl.select([email protected]/SelectorImpl.java:141)
> at org.apache.kafka.common.network.Selector.select(Selector.java:878)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:469)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:595)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:281)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:252)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:243)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:165)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.ensureFreshMetadata(ConsumerNetworkClient.java:176)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.updateGroupSubscription(ConsumerCoordinator.java:557)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeaderElected(ConsumerCoordinator.java:636)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onLeaderElected(AbstractCoordinator.java:766)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:117)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:670)
> - locked <0x00000002dcbe2228> (a
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:631)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1310)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1285)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:616)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:428)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:313)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:322)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.maybeLeaveGroup(AbstractCoordinator.java:1181)
> - locked <0x00000002dcbe2228> (a
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:1131)
> - locked <0x00000002dcbe2228> (a
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:986)
> at
> org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.lambda$close$3(LegacyKafkaConsumer.java:1135)
> at
> org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer$$Lambda$466/0x0000747234d46198.run(Unknown
> Source)
> at org.apache.kafka.common.utils.Utils.swallow(Utils.java:1042)
> at
> org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.close(LegacyKafkaConsumer.java:1135)
> at
> org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.close(LegacyKafkaConsumer.java:1104)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1777)
> at
> jepsen.redpanda.client$close_consumer_BANG_$fn__7227.invoke(client.clj:212)
> at
> jepsen.redpanda.client$close_consumer_BANG_.invokeStatic(client.clj:211)
> at jepsen.redpanda.client$close_consumer_BANG_.invoke(client.clj:200)
> at jepsen.redpanda.workload.queue.Client.close_BANG_(queue.clj:782)
> at jepsen.client.Validate.close_BANG_(client.clj:81)
> at
> jepsen.generator.interpreter.ClientWorker.close_BANG_(interpreter.clj:69)
> at
> jepsen.generator.interpreter.ClientWorker.invoke_BANG_(interpreter.clj:47)
> at
> jepsen.generator.interpreter$spawn_worker$fn__13745$fn__13746.invoke(interpreter.clj:140)
> at
> jepsen.generator.interpreter$spawn_worker$fn__13745.invoke(interpreter.clj:123)
> at clojure.core$binding_conveyor_fn$fn__5842.invoke(core.clj:2047)
> at clojure.lang.AFn.call(AFn.java:18)
> at
> java.util.concurrent.FutureTask.run([email protected]/FutureTask.java:264)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1136)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:635)
> at java.lang.Thread.run([email protected]/Thread.java:840)}}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)