[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17020027#comment-17020027
 ] 

leibo edited comment on KAFKA-8532 at 1/21/20 8:51 AM:
-------------------------------------------------------

[~junrao] I do a further analysis as below:

zk client heartbeat thread org.apache.zookeeper.ClientCnxn$SendThread
{code:java}
        SendThread(ClientCnxnSocket clientCnxnSocket) {
            super(ClientCnxn.makeThreadName("-SendThread()"));
            ClientCnxn.this.state = States.CONNECTING;        //init state
            this.clientCnxnSocket = clientCnxnSocket;
            this.setDaemon(true);
        }
{code}
As we can see, zk heartbeat thread SendThread init state is *CONNECTING*,
{code:java}
//代码占位符
public enum States {
    CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
    CLOSED, AUTH_FAILED, NOT_CONNECTED;

    public boolean isAlive() {          // when zk client state is CONNECTING 
or  NOT_CONNECTED, obviously, zk session is not valid, we can't regard as 
zookeeper is Alive
        return this != CLOSED && this != AUTH_FAILED;
    }
    public boolean isConnected() {
        return this == CONNECTED || this == CONNECTEDREADONLY;
    }
}
{code}
According to above code, we can see that when zk session expired, 
ClientCnxn.SendThread will attempt to reconnect to zk sever and establish a new 
session.
 It's init state is CONNECTING, if zk-session-expired-thread enter to 
reinitialize method with zookeeper state CONNECTING, it will skip if 
*(!connectionState.isAlive)* condition and execute callAfterInitializingSession 
, trigger controller-event-thread process RegisterBrokerAndReelect,  while zk 
session is invalid now.


was (Author: lbdai3190):
zk client heartbeat thread org.apache.zookeeper.ClientCnxn$SendThread
{code:java}
        SendThread(ClientCnxnSocket clientCnxnSocket) {
            super(ClientCnxn.makeThreadName("-SendThread()"));
            ClientCnxn.this.state = States.CONNECTING;        //init state
            this.clientCnxnSocket = clientCnxnSocket;
            this.setDaemon(true);
        }
{code}
As we can see, zk heartbeat thread SendThread init state is *CONNECTING*,
{code:java}
//代码占位符
public enum States {
    CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
    CLOSED, AUTH_FAILED, NOT_CONNECTED;

    public boolean isAlive() {          // when zk client state is CONNECTING 
or  NOT_CONNECTED, obviously, zk session is not valid, we can't regard as 
zookeeper is Alive
        return this != CLOSED && this != AUTH_FAILED;
    }
    public boolean isConnected() {
        return this == CONNECTED || this == CONNECTEDREADONLY;
    }
}
{code}
According to above code, we can found that when zk session expired, 
ClientCnxn.SendThread will attempt to reconnect to zk sever and establish a new 
session.
 It's init state is CONNECTING, if zk-session-expired-thread enter to 
reinitialize method with zookeeper state CONNECTING, it will skip if 
*(!connectionState.isAlive)* condition and execute callAfterInitializingSession 
, trigger controller-event-thread process RegisterBrokerAndReelect,  while zk 
session is invalid now.

> controller-event-thread deadlock with zk-session-expiry-handler0
> ----------------------------------------------------------------
>
>                 Key: KAFKA-8532
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8532
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 2.1.1
>            Reporter: leibo
>            Priority: Blocker
>         Attachments: controllerDeadLockAnalysis-2020-01-20.png, js.log, 
> js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x00007fcc9c010000 nid=0xfb22 waiting on condition [0x00007fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x00000005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Locked ownable synchronizers:
>  - <0x0000000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker)
> "controller-event-thread" #51 prio=5 os_prio=0 tid=0x00007fceaeec4000 
> nid=0x310 waiting on condition [0x00007fccb55c8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x00000005d1be5a00> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>  at kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:157)
>  at 
> kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1596)
>  at 
> kafka.zk.KafkaZkClient.kafka$zk$KafkaZkClient$$retryRequestUntilConnected(KafkaZkClient.scala:1589)
>  at 
> kafka.zk.KafkaZkClient.deletePreferredReplicaElection(KafkaZkClient.scala:989)
>  at 
> kafka.controller.KafkaController.removePartitionsFromPreferredReplicaElection(KafkaController.scala:873)
>  at 
> kafka.controller.KafkaController.kafka$controller$KafkaController$$onPreferredReplicaElection(KafkaController.scala:631)
>  at 
> kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:266)
>  at 
> kafka.controller.KafkaController.kafka$controller$KafkaController$$elect(KafkaController.scala:1221)
>  at 
> kafka.controller.KafkaController$Reelect$.process(KafkaController.scala:1508)
>  at 
> kafka.controller.KafkaController$RegisterBrokerAndReelect$.process(KafkaController.scala:1517)
>  at 
> kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:89)
>  at 
> kafka.controller.ControllerEventManager$ControllerEventThread$$Lambda$362/918424570.apply$mcV$sp(Unknown
>  Source)
>  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
>  at 
> kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:89)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> Locked ownable synchronizers:
>  
> this issue is same to https://issues.apache.org/jira/browse/KAFKA-6879 , I 
> doubt that  KAFKA-6879 was not been fixed complete , please be attention to 
> this issue
>  - None



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to