Unless I'm misreading something, that is controlled by the topic.metadata.refresh.interval.ms variable (defaults to 10 minutes), and I've not seen it run longer than that (unless there was other problems besides that going on).
I would check the JMX values for things under "kafka.server":type="ReplicaManager", particularly UnderReplicatedPartitions and possibly the ISR Expand/Shrinks values - those could indicate a problem on the brokers that is preventing things from settling down completely. Might also look and see if you are doing any heavy GCs (which can cause zookeeper connection issues, which would then complicate the ISR election stuff). -- Dave DeMaagd ddema...@linkedin.com | 818 262 7958 (vkeylis2...@gmail.com - Fri, Jun 28, 2013 at 11:32:42AM -0700) > David. What is the expected time frame for the exception to continue? Its > an hour has passed since short downtime and I still see the exception in > kafka service logs. > > Thanks, > Vadim > > > On Fri, Jun 28, 2013 at 11:25 AM, David DeMaagd <ddema...@linkedin.com>wrote: > > > Getting kafka.common.NotLeaderForPartitionException for a time after a > > node is brought back on line (especially if it is a short downtime) is > > normal - that is because the consumers have not yet completely picked up > > the new leader information. If should settle shortly. > > > > -- > > Dave DeMaagd > > ddema...@linkedin.com | 818 262 7958 > > > > (vkeylis2...@gmail.com - Fri, Jun 28, 2013 at 11:08:46AM -0700) > > > I want to clarify that I restarted only one kafka node, all others were > > > running and did not require restart > > > > > > > > > On Fri, Jun 28, 2013 at 10:57 AM, Vadim Keylis <vkeylis2...@gmail.com > > >wrote: > > > > > > > Good morning. I have a cluster of 3 kafka nodes. They were both > > running at > > > > the time. I need it to make configuration change in the property file > > and > > > > restart kafka. I have not broker shutdown tool, but simple used pkill > > -TERM > > > > -u ${KAFKA_USER} -f kafka.Kafka. That suddenly cause the exception. > > How to > > > > avoid this issue in the future? What's the right way to shutdown kafka > > to > > > > prevent Not Leder Exception > > > > > > > > Thanks so much in advance, > > > > Vadim > > > > > > > > > > > > > > > > [2013-06-28 10:46:53,281] WARN [KafkaApi-1] Fetch request with > > correlation > > > > id 1171435 from client ReplicaFetcherThread-0-1 on partition [meetme,0] > > > > failed due to Leader not local for partition [meetme,0] on broker 1 > > > > (kafka.server.KafkaApis) > > > > [2013-06-28 10:46:53,282] WARN [KafkaApi-1] Fetch request with > > correlation > > > > id 1171436 from client ReplicaFetcherThread-0-1 on partition [meetme,0] > > > > failed due to Leader not local for partition [meetme,0] on broker 1 > > > > (kafka.server.KafkaApis) > > > > [2013-06-28 10:46:53,448] WARN [ReplicaFetcherThread-0-2], error for > > > > partition [meetme,0] to broker 2 (kafka.server.ReplicaFetcherThread) > > > > kafka.common.NotLeaderForPartitionException > > > > at > > sun.reflect.GeneratedConstructorAccessor2.newInstance(Unknown > > > > Source) > > > > at > > > > > > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) > > > > at > > java.lang.reflect.Constructor.newInstance(Constructor.java:513) > > > > at java.lang.Class.newInstance0(Class.java:355) > > > > at java.lang.Class.newInstance(Class.java:308) > > > > at > > kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:70) > > > > at > > > > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonfun$apply$5.apply(AbstractFetcherThread.scala:157) > > > > at > > > > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonfun$apply$5.apply(AbstractFetcherThread.scala:157) > > > > at kafka.utils.Logging$class.warn(Logging.scala:88) > > > > at > > kafka.utils.ShutdownableThread.warn(ShutdownableThread.scala:23) > > > > at > > > > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:156) > > > > at > > > > > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:112) > > > > at > > > > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178) > > > > at > > > > > > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347) > > > > at > > > > > > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:112) > > > > at > > > > > > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) > > > > at > > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > > > [2013-06-28 10:46:53,476] INFO Closing socket connection to / > > 10.98.21.112. > > > > (kafka.network.Processor) > > > > [2013-06-28 10:46:53,686] INFO Closing socket connection to / > > 10.98.21.112. > > > > (kafka.network.Processor) > > > > > > > > > >