Pierre-Henri Dezanneau created KAFKA-6799:
---------------------------------------------

             Summary: Consumer livelock during consumer group rebalance
                 Key: KAFKA-6799
                 URL: https://issues.apache.org/jira/browse/KAFKA-6799
             Project: Kafka
          Issue Type: Bug
          Components: clients, consumer
    Affects Versions: 1.1.0, 0.11.0.2, 1.0.0
            Reporter: Pierre-Henri Dezanneau


We have the following environment:
* 1 kafka cluster with 3 brokers
* 1 topic with 3 partitions
* 1 producer
* 1 consumer group with 3 consumers

>From this setup, we remove one broker from the cluster, the hard way, by 
>simply killing it. Quite often, we see that the consumer group is not 
>rebalanced correctly. By that I mean that all 3 consumers stop consuming and 
>get stuck in a loop, forever.

The thread dump shows that the consumer threads aren't blocked, but run forever 
in {{AbstractCoordinator.ensureCoordinatorReady}}, holding a lock due to the 
{{synchonized}} keyword on the calling method. Heartbeat threads are blocked, 
waiting for the consumer threads to release the lock. This situation prevents 
all consumers from consuming any more record.

We build a simple project which seems to reliably demonstrate this:
{code:sh}
$ mkdir -p /tmp/sandbox && cd /tmp/sandbox
$ git clone https://github.com/phdezann/helloworld-kafka-livelock
$ cd helloworld-kafka-livelock && ./spin.sh
...
$ livelock detecte
{code}

{code:sh|title=Consumer thread|borderStyle=solid}
"kafka-consumer-1@10733" daemon prio=5 tid=0x31 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
         blocks kafka-coordinator-heartbeat-thread | helloWorldGroup@10728
          at sun.nio.ch.EPollArrayWrapper.epollWait(EPollArrayWrapper.java:-1)
          at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
          at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
          at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
          - locked <0x2a15> (a sun.nio.ch.EPollSelectorImpl)
          - locked <0x2a16> (a java.util.Collections$UnmodifiableSet)
          - locked <0x2a17> (a sun.nio.ch.Util$3)
          at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
          at org.apache.kafka.common.network.Selector.select(Selector.java:684)
          at org.apache.kafka.common.network.Selector.poll(Selector.java:408)
          at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
          at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:261)
          at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
          at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
          at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:156)
          at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:228)
          - locked <0x2a0c> (a 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
          at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:205)
          at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:279)
          at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1149)
          at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
          at 
org.helloworld.kafka.bus.HelloWorldKafkaListener.lambda$createConsumerInDedicatedThread$0(HelloWorldKafkaListener.java:45)
          at 
org.helloworld.kafka.bus.HelloWorldKafkaListener$$Lambda$42.1776656466.run(Unknown
 Source:-1)
          at java.lang.Thread.run(Thread.java:748)
{code}

{code:sh|title=Heartbeat thread|borderStyle=solid}
"kafka-coordinator-heartbeat-thread | helloWorldGroup@10728" daemon prio=5 
tid=0x36 nid=NA waiting for monitor entry
  java.lang.Thread.State: BLOCKED
         waiting for kafka-consumer-1@10733 to release lock on <0x2a0c> (a 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
          at java.lang.Object.wait(Object.java:-1)
          at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:955)
{code}




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to