Hi Kishore Senji,

Thanks for the reply.

Do you have some suggestions before the fix came up? Try not to modify the
retention.ms? Or disable the auto rebalance? Cause this problem is 100%
reproduceable in my scenario (two times got dead lock in two retention.ms
modification), and I even found some data loss which I'm still looking for
the reason.

Any idea on why shrinking the retention.ms causes the network unstable?

And yes I use the comma for clarity :)

2015-08-17 8:59 GMT+08:00 Kishore Senji <kse...@gmail.com>:

> Interesting problem you ran in to. It seems like this broker was chosen as
> the Controller and onControllerFailure() method was called. This will
> schedule the checkAndTriggerPartitionRebalance method to execute after 5
> seconds (when auto rebalance enabled). In the mean time this broker lost
> zookeeper connection and so this broker resigns from the Controller status
> and so the onControllerResignation() method is called and this method will
> try to shutdown the auto rebalance executor.  But it is doing it by holding
> the lock and this is what caused the dead lock in your case.
>
> I do not think we need to hold the lock to shutdown the executor. This
> could be the fix we might need.
>
> retention.ms config parameter should not have commas in the value. Are you
> just using it here to clarify for us.
>
> It so happened in your
> On Sun, Aug 16, 2015 at 1:52 AM Zhao Weinan <zhaow...@gmail.com> wrote:
>
> > Hi guys,
> >
> > I got this problem, after changing one topic's config to
> > retention.ms=86,400,000
> > from 864,000,000, the brokers start to shedule and do deletions of
> outdated
> > index of that topic.
> >
> > Then for some reason some brokers' connection with zookeeper were
> expired,
> > suddenly lots of ERRORs showed up in logs/server.log: At controller
> > broker(id=5) are:
> >
> > > *ERROR [ReplicaFetcherThread-2-4], Error for partition [XXXXX,4] to
> > broker
> > > 4:class kafka.common.NotLeaderForPartitionException
> > > (kafka.server.ReplicaFetcherThread).*
> > >
> > At other broker which the controller broker try to fetch are:
> >
> > > *[Replica Manager on Broker 4]: Fetch request with correlation id
> 1920630
> > > from client ReplicaFetcherThread-2-4 on partition [XXXXX,4] failed due
> to
> > > Leader not local for partition [XXXXXXX,4] on broker 4
> > > (kafka.server.ReplicaManager).*
> > >
> >
> > In controller broker's server.log there are zk reconnections:
> >
> > > *INFO Client session timed out, have not heard from server in 5126ms
> for
> > > sessionid 0x54e0aaa9582b8e4, closing socket connection and attempting
> > > reconnect (org.apache.zookeeper.ClientCnxn)*
> > > *INFO zookeeper state changed (Disconnected)
> > > (org.I0Itec.zkclient.ZkClient)*
> > > *NFO Opening socket connection to server xxxxxxxxxxx. Will not attempt
> to
> > > authenticate using SASL (java.lang.SecurityException: Unable to locate
> a
> > > login configuration) (org.apache.zookeeper.ClientCnxn)*
> > > *INFO Socket connection established to xxxxxxxxxxxxx, initiating
> session
> > > (org.apache.zookeeper.ClientCnxn)*
> > > *INFO Session establishment complete on server xxxxxxxxxx, sessionid =
> > > 0x54e0aaa9582b8e4, negotiated timeout = 6000
> > > (org.apache.zookeeper.ClientCnxn)*
> > > *INFO zookeeper state changed (SyncConnected)
> > > (org.I0Itec.zkclient.ZkClient)*
> > >
> > But on zookeeper /brokers/ids/ there is no controller broker's id 5.
> >
> > Then I tried to restart the controller broker, found the process won't
> > quit.
> >
> > Then I jstacked it, found the broker process kind of stucked, some
> > keypoints pasted as below. It seems zk-client-expired-callback aquired
> the
> > controllerLock and wait the kafka-scheduler Executor to exit (for one
> day),
> > but some thread in that Executor is doing Rebalance job which need to
> > aquire the controllerLock, then the broker is in DEAD LOCK and will be
> > totally lost from zookeeper for ONE DAY if I'm corrected? And since it's
> > still hold outdated view of the cluster, it will try to try to follower
> up
> > the Leaders which maybe not actual Leader, caused the ERRORs as above
> > mentioned.
> >
> > I'm using 8 Kafka brokers @0.8.2.1 with 3 Zookeeper server @3.4.6, all
> on
> > different host in same data center, the cluster load is about 200K
> messages
> > in and 30M bytes in and 80M bytes out totally.
> >
> > Does some one has the same issue? Any suggestion is appreciated.
> >
> >
> > jstack:
> >
> > > "kafka-scheduler-0" daemon prio=10 tid=0x0000000057967800 nid=0x2994
> > > waiting on condition [0x0000000046dac000]
> > >    java.lang.Thread.State: WAITING (parking)
> > >         at sun.misc.Unsafe.park(Native Method)
> > >         - parking to wait for  <0x00000000c2ec6418> (a
> > > java.util.concurrent.locks.ReentrantLock$NonfairSync)
> > >         at
> > > java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
> > >         at
> > >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
> > >         at
> > >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
> > >         at
> > >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
> > >         at
> > >
> >
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
> > >         at
> > > java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
> > >         at kafka.utils.Utils$.inLock(Utils.scala:533)
> > >         at
> > >
> >
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1131)
> > >         at
> > >
> >
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1127)
> > >         at
> > > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
> > >         at
> > >
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> > >         at
> > >
> >
> kafka.controller.KafkaController.kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance(KafkaController.scala:1127)
> > >         at
> > >
> >
> kafka.controller.KafkaController$$anonfun$onControllerFailover$1.apply$mcV$sp(KafkaController.scala:326)
> > >         at
> > >
> >
> kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:99)
> > >         at kafka.utils.Utils$$anon$1.run(Utils.scala:54)
> > >         at
> > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
> > >         at
> > >
> >
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
> > >         at
> > java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
> > >         at
> > >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
> > >         at
> > >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
> > >         at
> > >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
> > >         at
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> > >         at
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> > >         at java.lang.Thread.run(Thread.java:662)
> > >
> > "ZkClient-EventThread-15-xxxxxxxxxxxxxxxxxxxxx" daemon prio=10
> > > tid=0x00002aaab42ba800 nid=0x24c4 waiting on condition [
> > > 0x00000000402e8000]
> > >    java.lang.Thread.State: TIMED_WAITING (parking)
> > >         at sun.misc.Unsafe.park(Native Method)
> > >         - parking to wait for  <0x00000000f4c450b8> (a
> > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > >         at
> > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196)
> > >         at
> > >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
> > >         at
> > >
> >
> java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1253)
> > >         at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:88)
> > >         at
> > >
> >
> kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:350)
> > >         at
> > >
> >
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply$mcZ$sp(KafkaController.scala:1108)
> > >         at
> > >
> >
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1107)
> > >         at
> > >
> >
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1107)
> > >         at kafka.utils.Utils$.inLock(Utils.scala:535)
> > >         at
> > >
> >
> kafka.controller.KafkaController$SessionExpirationListener.handleNewSession(KafkaController.scala:1107)
> > >         at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
> > >         at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> > >
> >
>

Reply via email to