Hi Joe, This is not expected behaviour, please file a JIRA.
Ismael On Mon, Mar 18, 2019 at 7:29 AM Joe Ammann <j...@pyx.ch> wrote: > Hi all > > We're running several clusters (mostly with 3 brokers) with 2.1.1 > > We quite regularly see the pattern that one of the 3 brokers "detaches" > from ZK (the broker id is not registered anymore under /brokers/ids). We > assume that the root cause for this is that the brokers are running on VMs > (due to company policy, no alternative) and that the VM gets "stalled" for > several minutes due to missing resources on the VMware ESX host. > > This is not new behaviour with 2.1.1, we already saw it with 0.10.2.1 > before. > > The sequence of events is normally something like the following > - cluster is running ok > - one broker "gets stalled", not pingable anymore > - partitions go to underreplicated > - failed broker comes back and reports that ZK session was expired [1] > - some of the brokers that were ok report leader election problems [2] > - the failed/revived broker logs errors continuosly about expired session > [3] > > This goes on, until we restart the broker on the VM that had failed. Until > we do this restart, the failed broker seems to think it is working > perfectly ok. We're monitoring all brokers via JMX, and that one does not > report any problems. It claims in the JMX values to be the leader of a > number of partitions, and have 0 underreplicated partitions. Whilst the > other brokers rightfully report via JMX that they in turn do have > underreplicate paritions. This then causes alerts to go off about the > brokers that still work in degraded mode, while the one that is really > broken appears green/ok. > > Is this in any way expected behaviour? That a Kafka broker gets its ZK > session expired but continues to run (just issues the errors in [3]). I > would have expected that the broker would shutdown itsself in a similar > manner it does when it's unable to register with ZK on startup. > > Any idea how I could best detect this situation in monitoring? I'm > thinking about after polling the broker via JMX, I also poll ZK to check if > /brokers/ids/<brokerid> node exists. If not, restart that broker. > > BTW: I do know that probably the best answer is: "just run your ZK/Kafka > on hardware, not VMs". We're working on that, but company policies seem to > prefer outages over spending a little money). > > -- > CU, Joe > > [1] > > [2019-03-18 02:27:13,043] INFO [ZooKeeperClient] Session expired. > (kafka.zookeeper.ZooKeeperClient) > > [2] > > [2019-03-18 02:27:20,283] ERROR [Controller id=3 epoch=94562] Controller 3 > epoch 94562 failed to change state for partition __consumer_offsets-4 from > OnlinePartition to OnlinePartition (state.change.logger) > kafka.common.StateChangeFailedException: Failed to elect leader for > partition __consumer_offsets-4 under strategy > PreferredReplicaPartitionLeaderElectionStrategy > at > kafka.controller.PartitionStateMachine$$anonfun$doElectLeaderForPartitions$3.apply(PartitionStateMachine.scala:366) > at > kafka.controller.PartitionStateMachine$$anonfun$doElectLeaderForPartitions$3.apply(PartitionStateMachine.scala:364) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.controller.PartitionStateMachine.doElectLeaderForPartitions(PartitionStateMachine.scala:364) > at > kafka.controller.PartitionStateMachine.electLeaderForPartitions(PartitionStateMachine.scala:292) > at > kafka.controller.PartitionStateMachine.doHandleStateChanges(PartitionStateMachine.scala:210) > at > kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:133) > at > kafka.controller.KafkaController.kafka$controller$KafkaController$$onPreferredReplicaElection(KafkaController.scala:624) > at > kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerAutoLeaderRebalance$3.apply(KafkaController.scala:974) > at > kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerAutoLeaderRebalance$3.apply(KafkaController.scala:955) > at scala.collection.immutable.Map$Map4.foreach(Map.scala:188) > at > kafka.controller.KafkaController.kafka$controller$KafkaController$$checkAndTriggerAutoLeaderRebalance(KafkaController.scala:955) > at > kafka.controller.KafkaController$AutoPreferredReplicaLeaderElection$.process(KafkaController.scala:986) > at > kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp(ControllerEventManager.scala:89) > at > kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:89) > at > kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:89) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) > at > kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:88) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > > [3] > > [2019-03-18 02:28:34,493] ERROR Uncaught exception in scheduled task > 'isr-expiration' (kafka.utils.KafkaScheduler) > org.apache.zookeeper.KeeperException$SessionExpiredException: > KeeperErrorCode = Session expired for > /brokers/topics/__consumer_offsets/partitions/9/state > at > org.apache.zookeeper.KeeperException.create(KeeperException.java:130) > at > org.apache.zookeeper.KeeperException.create(KeeperException.java:54) > at > kafka.zookeeper.AsyncResponse.resultException(ZooKeeperClient.scala:539) > at > kafka.zk.KafkaZkClient.conditionalUpdatePath(KafkaZkClient.scala:717) > at > kafka.utils.ReplicationUtils$.updateLeaderAndIsr(ReplicationUtils.scala:33) > at > kafka.cluster.Partition.kafka$cluster$Partition$$updateIsr(Partition.scala:969) > at > kafka.cluster.Partition$$anonfun$2.apply$mcZ$sp(Partition.scala:642) > at kafka.cluster.Partition$$anonfun$2.apply(Partition.scala:633) > at kafka.cluster.Partition$$anonfun$2.apply(Partition.scala:633) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) > at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:259) > at kafka.cluster.Partition.maybeShrinkIsr(Partition.scala:632) > at > kafka.server.ReplicaManager$$anonfun$kafka$server$ReplicaManager$$maybeShrinkIsr$2$$anonfun$apply$43.apply(ReplicaManager.scala:1349) > at > kafka.server.ReplicaManager$$anonfun$kafka$server$ReplicaManager$$maybeShrinkIsr$2$$anonfun$apply$43.apply(ReplicaManager.scala:1349) > at scala.Option.foreach(Option.scala:257) > at > kafka.server.ReplicaManager$$anonfun$kafka$server$ReplicaManager$$maybeShrinkIsr$2.apply(ReplicaManager.scala:1349) > at > kafka.server.ReplicaManager$$anonfun$kafka$server$ReplicaManager$$maybeShrinkIsr$2.apply(ReplicaManager.scala:1348) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > kafka.server.ReplicaManager.kafka$server$ReplicaManager$$maybeShrinkIsr(ReplicaManager.scala:1348) > at > kafka.server.ReplicaManager$$anonfun$2.apply$mcV$sp(ReplicaManager.scala:323) > at > kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:114) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java) > at java.lang.Thread.run(Thread.java:745) > > > >