[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020027#comment-17020027 ] leibo edited comment on KAFKA-8532 at 1/22/20 12:56 AM: [~junrao] I do a further analysis as below: zk client heartbeat thread org.apache.zookeeper.ClientCnxn$SendThread {code:java} SendThread(ClientCnxnSocket clientCnxnSocket) { super(ClientCnxn.makeThreadName("-SendThread()")); ClientCnxn.this.state = States.CONNECTING;//init state this.clientCnxnSocket = clientCnxnSocket; this.setDaemon(true); } {code} As we can see, zk heartbeat thread SendThread init state is *CONNECTING*, And when zk session expired, zookeeper.getState() is *CONNECTING*. {code:java} //代码占位符 public enum States { CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY, CLOSED, AUTH_FAILED, NOT_CONNECTED; public boolean isAlive() { // when zk client state is CONNECTING or NOT_CONNECTED, obviously, zk session is not valid, we can't regard as zookeeper is Alive return this != CLOSED && this != AUTH_FAILED; } public boolean isConnected() { return this == CONNECTED || this == CONNECTEDREADONLY; } } {code} According to above code, we can see that when zk session expired, ClientCnxn.SendThread will attempt to reconnect to zk sever and establish a new session. It's init state is CONNECTING, if zk-session-expired-thread enter to reinitialize method with zookeeper state CONNECTING, it will skip if *(!connectionState.isAlive)* condition and execute callAfterInitializingSession , trigger controller-event-thread process RegisterBrokerAndReelect, while zk session is invalid now. was (Author: lbdai3190): [~junrao] I do a further analysis as below: zk client heartbeat thread org.apache.zookeeper.ClientCnxn$SendThread {code:java} SendThread(ClientCnxnSocket clientCnxnSocket) { super(ClientCnxn.makeThreadName("-SendThread()")); ClientCnxn.this.state = States.CONNECTING;//init state this.clientCnxnSocket = clientCnxnSocket; this.setDaemon(true); } {code} As we can see, zk heartbeat thread SendThread init state is *CONNECTING*, {code:java} //代码占位符 public enum States { CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY, CLOSED, AUTH_FAILED, NOT_CONNECTED; public boolean isAlive() { // when zk client state is CONNECTING or NOT_CONNECTED, obviously, zk session is not valid, we can't regard as zookeeper is Alive return this != CLOSED && this != AUTH_FAILED; } public boolean isConnected() { return this == CONNECTED || this == CONNECTEDREADONLY; } } {code} According to above code, we can see that when zk session expired, ClientCnxn.SendThread will attempt to reconnect to zk sever and establish a new session. It's init state is CONNECTING, if zk-session-expired-thread enter to reinitialize method with zookeeper state CONNECTING, it will skip if *(!connectionState.isAlive)* condition and execute callAfterInitializingSession , trigger controller-event-thread process RegisterBrokerAndReelect, while zk session is invalid now. > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: controllerDeadLockAnalysis-2020-01-20.png, js.log, > js0.log, js1.log, js2.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // >
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020027#comment-17020027 ] leibo edited comment on KAFKA-8532 at 1/21/20 8:51 AM: --- [~junrao] I do a further analysis as below: zk client heartbeat thread org.apache.zookeeper.ClientCnxn$SendThread {code:java} SendThread(ClientCnxnSocket clientCnxnSocket) { super(ClientCnxn.makeThreadName("-SendThread()")); ClientCnxn.this.state = States.CONNECTING;//init state this.clientCnxnSocket = clientCnxnSocket; this.setDaemon(true); } {code} As we can see, zk heartbeat thread SendThread init state is *CONNECTING*, {code:java} //代码占位符 public enum States { CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY, CLOSED, AUTH_FAILED, NOT_CONNECTED; public boolean isAlive() { // when zk client state is CONNECTING or NOT_CONNECTED, obviously, zk session is not valid, we can't regard as zookeeper is Alive return this != CLOSED && this != AUTH_FAILED; } public boolean isConnected() { return this == CONNECTED || this == CONNECTEDREADONLY; } } {code} According to above code, we can see that when zk session expired, ClientCnxn.SendThread will attempt to reconnect to zk sever and establish a new session. It's init state is CONNECTING, if zk-session-expired-thread enter to reinitialize method with zookeeper state CONNECTING, it will skip if *(!connectionState.isAlive)* condition and execute callAfterInitializingSession , trigger controller-event-thread process RegisterBrokerAndReelect, while zk session is invalid now. was (Author: lbdai3190): zk client heartbeat thread org.apache.zookeeper.ClientCnxn$SendThread {code:java} SendThread(ClientCnxnSocket clientCnxnSocket) { super(ClientCnxn.makeThreadName("-SendThread()")); ClientCnxn.this.state = States.CONNECTING;//init state this.clientCnxnSocket = clientCnxnSocket; this.setDaemon(true); } {code} As we can see, zk heartbeat thread SendThread init state is *CONNECTING*, {code:java} //代码占位符 public enum States { CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY, CLOSED, AUTH_FAILED, NOT_CONNECTED; public boolean isAlive() { // when zk client state is CONNECTING or NOT_CONNECTED, obviously, zk session is not valid, we can't regard as zookeeper is Alive return this != CLOSED && this != AUTH_FAILED; } public boolean isConnected() { return this == CONNECTED || this == CONNECTEDREADONLY; } } {code} According to above code, we can found that when zk session expired, ClientCnxn.SendThread will attempt to reconnect to zk sever and establish a new session. It's init state is CONNECTING, if zk-session-expired-thread enter to reinitialize method with zookeeper state CONNECTING, it will skip if *(!connectionState.isAlive)* condition and execute callAfterInitializingSession , trigger controller-event-thread process RegisterBrokerAndReelect, while zk session is invalid now. > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: controllerDeadLockAnalysis-2020-01-20.png, js.log, > js0.log, js1.log, js2.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // > 等待controller-event-thread线程处理expireEvent > at >
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020027#comment-17020027 ] leibo edited comment on KAFKA-8532 at 1/21/20 8:49 AM: --- zk client heartbeat thread org.apache.zookeeper.ClientCnxn$SendThread {code:java} SendThread(ClientCnxnSocket clientCnxnSocket) { super(ClientCnxn.makeThreadName("-SendThread()")); ClientCnxn.this.state = States.CONNECTING;//init state this.clientCnxnSocket = clientCnxnSocket; this.setDaemon(true); } {code} As we can see, zk heartbeat thread SendThread init state is *CONNECTING*, {code:java} //代码占位符 public enum States { CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY, CLOSED, AUTH_FAILED, NOT_CONNECTED; public boolean isAlive() { // when zk client state is CONNECTING or NOT_CONNECTED, obviously, zk session is not valid, we can't regard as zookeeper is Alive return this != CLOSED && this != AUTH_FAILED; } public boolean isConnected() { return this == CONNECTED || this == CONNECTEDREADONLY; } } {code} According to above code, we can found that when zk session expired, ClientCnxn.SendThread will attempt to reconnect to zk sever and establish a new session. It's init state is CONNECTING, if zk-session-expired-thread enter to reinitialize method with zookeeper state CONNECTING, it will skip if *(!connectionState.isAlive)* condition and execute callAfterInitializingSession , trigger controller-event-thread process RegisterBrokerAndReelect, while zk session is invalid now. was (Author: lbdai3190): {code:java} //代码占位符 public enum States { CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY, CLOSED, AUTH_FAILED, NOT_CONNECTED; public boolean isAlive() { // when zk client state is CONNECTING or NOT_CONNECTED, obviously, zk session is not valid, we can't regard as zookeeper is Alive return this != CLOSED && this != AUTH_FAILED; } /** * Returns whether we are connected to a server (which * could possibly be read-only, if this client is allowed * to go to read-only mode) * */ public boolean isConnected() { return this == CONNECTED || this == CONNECTEDREADONLY; } } {code} > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: controllerDeadLockAnalysis-2020-01-20.png, js.log, > js0.log, js1.log, js2.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // > 等待controller-event-thread线程处理expireEvent > at > kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533) > at > kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173) > at > kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374) > at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown > Source) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209) > at
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019851#comment-17019851 ] leibo edited comment on KAFKA-8532 at 1/21/20 8:24 AM: --- [~junrao] I'm agree with your opinion "all pending ZK requests should complete with a SessionExpired error through the response callback". As the attachment controllerDeadLockAnalysis-2020-01-20.png I uploaded. RegisterBrokerAndReelect need a established zookeeper session to process finish by controller-event-thread,Expired need be process by controller-event-thread to established zookeeper session, but controller-event-thread is busy to handing RegisterBrokerAndReelect. According to current analysis, I think it's tow ways to deal with this problem: 1. Use more strict judgment conditions to prevent ControllerEvent RegisterBrokerAndReelect to be execute when zk session expired. 2. Found out the reason why handleRequest blocked when zookeeper session Expired. was (Author: lbdai3190): [~junrao] I'm agree with your opinion "all pending ZK requests should complete with a SessionExpired error through the response callback". As the attachment controllerDeadLockAnalysis-2020-01-20.png I uploaded. RegisterBrokerAndReelect need a established zookeeper session to process finish by controller-event-thread,Expired need be process by controller-event-thread to established zookeeper session, but controller-event-thread is busy to handing RegisterBrokerAndReelect. According to current analysis, I think it's tow ways to deal with this problem: 1. Use more strict judgment conditions to prevent ControllerEvent RegisterBrokerAndReelect to be execute when zk session expired. 2. Found out the reason why handleRequest blocked when zookeeper session Expired. > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: controllerDeadLockAnalysis-2020-01-20.png, js.log, > js0.log, js1.log, js2.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // > 等待controller-event-thread线程处理expireEvent > at > kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533) > at > kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173) > at > kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374) > at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown > Source) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209) > at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428) > at > kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown > Source) > at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) > at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown > Source) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) > at
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019294#comment-17019294 ] leibo edited comment on KAFKA-8532 at 1/21/20 1:44 AM: --- [~junrao], [~yuzhih...@gmail.com] Thank you for you reply, 1. I have tested the Exception in handleRequest(), and as you a said , excpetions throws in handleRequest method will not block on countDownLatch.await(). 2. I recheck the jstack log in attachment, and there is my analysis result as attachment *controllerDeadLockAnalysis-2020-01-20.png,* {code:java} //代码占位符 private def reinitialize(): Unit = { // Initialization callbacks are invoked outside of the lock to avoid deadlock potential since their completion // may require additional Zookeeper requests, which will block to acquire the initialization lock stateChangeHandlers.values.foreach(callBeforeInitializingSession _) inWriteLock(initializationLock) { // if (!connectionState.isAlive) { // 此处判断可能不准确 if (!connectionState.isConnected) { // If use isConnected to determine whether need reinitialize zookeeper session, program will not skip this and to execute some operation that need zookeeper be connected. zooKeeper.close() info(s"Initializing a new session to $connectString.") // retry forever until ZooKeeper can be instantiated var connected = false while (!connected) { try { zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher) connected = true } catch { case e: Exception => info("Error when recreating ZooKeeper, retrying after a short sleep", e) Thread.sleep(1000) } } } } stateChangeHandlers.values.foreach(callAfterInitializingSession _) }{code} Zookeeper Code: {code:java} //代码占位符 @InterfaceAudience.Public public enum States { CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY, CLOSED, AUTH_FAILED, NOT_CONNECTED; public boolean isAlive() { return this != CLOSED && this != AUTH_FAILED; } /** * Returns whether we are connected to a server (which * could possibly be read-only, if this client is allowed * to go to read-only mode) * */ public boolean isConnected() { return this == CONNECTED || this == CONNECTEDREADONLY; } } {code} I hope you pay attention on it. was (Author: lbdai3190): [~junrao], [~yuzhih...@gmail.com] Thank you for you reply, 1. I have tested the Exception in handleRequest(), and as you a said , excpetions throws in handleRequest method will not block on countDownLatch.await(). 2. I recheck the jstack log in attachment, and there is my analysis result as attachment controllerDeadLockAnalysis-2020-01-20.png, {code:java} //代码占位符 private def reinitialize(): Unit = { // Initialization callbacks are invoked outside of the lock to avoid deadlock potential since their completion // may require additional Zookeeper requests, which will block to acquire the initialization lock stateChangeHandlers.values.foreach(callBeforeInitializingSession _) inWriteLock(initializationLock) { // if (!connectionState.isAlive) { // 此处判断可能不准确 if (!connectionState.isConnected) { // If use isConnected to determine whether need reinitialize zookeeper session, program will not skip this and to execute some operation that need zookeeper be connected. zooKeeper.close() info(s"Initializing a new session to $connectString.") // retry forever until ZooKeeper can be instantiated var connected = false while (!connected) { try { zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher) connected = true } catch { case e: Exception => info("Error when recreating ZooKeeper, retrying after a short sleep", e) Thread.sleep(1000) } } } } stateChangeHandlers.values.foreach(callAfterInitializingSession _) }{code} Zookeeper Code: {code:java} //代码占位符 @InterfaceAudience.Public public enum States { CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY, CLOSED, AUTH_FAILED, NOT_CONNECTED; public boolean isAlive() { return this != CLOSED && this != AUTH_FAILED; } /** * Returns whether we are connected to a server (which * could possibly be read-only, if this client is allowed * to go to read-only mode) * */ public boolean isConnected() { return this == CONNECTED || this == CONNECTEDREADONLY; } } {code} I hope you pay attention on it. > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 >
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019294#comment-17019294 ] leibo edited comment on KAFKA-8532 at 1/21/20 1:44 AM: --- [~junrao], [~yuzhih...@gmail.com] Thank you for you reply, 1. I have tested the Exception in handleRequest(), and as you a said , excpetions throws in handleRequest method will not block on countDownLatch.await(). 2. I recheck the jstack log in attachment, and there is my analysis result as *attachment* *controllerDeadLockAnalysis-2020-01-20.png,* {code:java} //代码占位符 private def reinitialize(): Unit = { // Initialization callbacks are invoked outside of the lock to avoid deadlock potential since their completion // may require additional Zookeeper requests, which will block to acquire the initialization lock stateChangeHandlers.values.foreach(callBeforeInitializingSession _) inWriteLock(initializationLock) { // if (!connectionState.isAlive) { // 此处判断可能不准确 if (!connectionState.isConnected) { // If use isConnected to determine whether need reinitialize zookeeper session, program will not skip this and to execute some operation that need zookeeper be connected. zooKeeper.close() info(s"Initializing a new session to $connectString.") // retry forever until ZooKeeper can be instantiated var connected = false while (!connected) { try { zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher) connected = true } catch { case e: Exception => info("Error when recreating ZooKeeper, retrying after a short sleep", e) Thread.sleep(1000) } } } } stateChangeHandlers.values.foreach(callAfterInitializingSession _) }{code} Zookeeper Code: {code:java} //代码占位符 @InterfaceAudience.Public public enum States { CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY, CLOSED, AUTH_FAILED, NOT_CONNECTED; public boolean isAlive() { return this != CLOSED && this != AUTH_FAILED; } /** * Returns whether we are connected to a server (which * could possibly be read-only, if this client is allowed * to go to read-only mode) * */ public boolean isConnected() { return this == CONNECTED || this == CONNECTEDREADONLY; } } {code} I hope you pay attention on it. was (Author: lbdai3190): [~junrao], [~yuzhih...@gmail.com] Thank you for you reply, 1. I have tested the Exception in handleRequest(), and as you a said , excpetions throws in handleRequest method will not block on countDownLatch.await(). 2. I recheck the jstack log in attachment, and there is my analysis result as attachment *controllerDeadLockAnalysis-2020-01-20.png,* {code:java} //代码占位符 private def reinitialize(): Unit = { // Initialization callbacks are invoked outside of the lock to avoid deadlock potential since their completion // may require additional Zookeeper requests, which will block to acquire the initialization lock stateChangeHandlers.values.foreach(callBeforeInitializingSession _) inWriteLock(initializationLock) { // if (!connectionState.isAlive) { // 此处判断可能不准确 if (!connectionState.isConnected) { // If use isConnected to determine whether need reinitialize zookeeper session, program will not skip this and to execute some operation that need zookeeper be connected. zooKeeper.close() info(s"Initializing a new session to $connectString.") // retry forever until ZooKeeper can be instantiated var connected = false while (!connected) { try { zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher) connected = true } catch { case e: Exception => info("Error when recreating ZooKeeper, retrying after a short sleep", e) Thread.sleep(1000) } } } } stateChangeHandlers.values.foreach(callAfterInitializingSession _) }{code} Zookeeper Code: {code:java} //代码占位符 @InterfaceAudience.Public public enum States { CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY, CLOSED, AUTH_FAILED, NOT_CONNECTED; public boolean isAlive() { return this != CLOSED && this != AUTH_FAILED; } /** * Returns whether we are connected to a server (which * could possibly be read-only, if this client is allowed * to go to read-only mode) * */ public boolean isConnected() { return this == CONNECTED || this == CONNECTEDREADONLY; } } {code} I hope you pay attention on it. > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 >
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019294#comment-17019294 ] leibo edited comment on KAFKA-8532 at 1/21/20 1:43 AM: --- [~junrao], [~yuzhih...@gmail.com] Thank you for you reply, 1. I have tested the Exception in handleRequest(), and as you a said , excpetions throws in handleRequest method will not block on countDownLatch.await(). 2. I recheck the jstack log in attachment, and there is my analysis result as attachment controllerDeadLockAnalysis-2020-01-20.png, {code:java} //代码占位符 private def reinitialize(): Unit = { // Initialization callbacks are invoked outside of the lock to avoid deadlock potential since their completion // may require additional Zookeeper requests, which will block to acquire the initialization lock stateChangeHandlers.values.foreach(callBeforeInitializingSession _) inWriteLock(initializationLock) { // if (!connectionState.isAlive) { // 此处判断可能不准确 if (!connectionState.isConnected) { // If use isConnected to determine whether need reinitialize zookeeper session, program will not skip this and to execute some operation that need zookeeper be connected. zooKeeper.close() info(s"Initializing a new session to $connectString.") // retry forever until ZooKeeper can be instantiated var connected = false while (!connected) { try { zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher) connected = true } catch { case e: Exception => info("Error when recreating ZooKeeper, retrying after a short sleep", e) Thread.sleep(1000) } } } } stateChangeHandlers.values.foreach(callAfterInitializingSession _) }{code} Zookeeper Code: {code:java} //代码占位符 @InterfaceAudience.Public public enum States { CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY, CLOSED, AUTH_FAILED, NOT_CONNECTED; public boolean isAlive() { return this != CLOSED && this != AUTH_FAILED; } /** * Returns whether we are connected to a server (which * could possibly be read-only, if this client is allowed * to go to read-only mode) * */ public boolean isConnected() { return this == CONNECTED || this == CONNECTEDREADONLY; } } {code} I hope you pay attention on it. was (Author: lbdai3190): [~junrao], [~yuzhih...@gmail.com] Thank you for you reply, 1. I have tested the Exception in handleRequest(), and as you a said , excpetions throws in handleRequest method will not block on countDownLatch.await(). 2. I recheck the jstack log in attachment, and there is my analysis result as attachment controllerDeadLockAnalysis-2020-01-20.png, {code:java} //代码占位符 private def reinitialize(): Unit = { // Initialization callbacks are invoked outside of the lock to avoid deadlock potential since their completion // may require additional Zookeeper requests, which will block to acquire the initialization lock stateChangeHandlers.values.foreach(callBeforeInitializingSession _) inWriteLock(initializationLock) { // if (!connectionState.isAlive) { // 此处判断可能不准确 if (!connectionState.isConnected) { // If use isConnected to determine whether need reinitialize zookeeper session, program will not skip this and to execute some operation that need zookeeper be connected. zooKeeper.close() info(s"Initializing a new session to $connectString.") // retry forever until ZooKeeper can be instantiated var connected = false while (!connected) { try { zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher) connected = true } catch { case e: Exception => info("Error when recreating ZooKeeper, retrying after a short sleep", e) Thread.sleep(1000) } } } } stateChangeHandlers.values.foreach(callAfterInitializingSession _) }{code} Zookeeper Code: {code:java} //代码占位符 @InterfaceAudience.Public public enum States { CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY, CLOSED, AUTH_FAILED, NOT_CONNECTED; public boolean isAlive() { return this != CLOSED && this != AUTH_FAILED; // 此处判断不准确 } /** * Returns whether we are connected to a server (which * could possibly be read-only, if this client is allowed * to go to read-only mode) * */ public boolean isConnected() { return this == CONNECTED || this == CONNECTEDREADONLY; } } {code} I hope you pay attention on it. > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL:
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019294#comment-17019294 ] leibo edited comment on KAFKA-8532 at 1/21/20 1:43 AM: --- [~junrao], [~yuzhih...@gmail.com] Thank you for you reply, 1. I have tested the Exception in handleRequest(), and as you a said , excpetions throws in handleRequest method will not block on countDownLatch.await(). 2. I recheck the jstack log in attachment, and there is my analysis result as attachment controllerDeadLockAnalysis-2020-01-20.png, {code:java} //代码占位符 private def reinitialize(): Unit = { // Initialization callbacks are invoked outside of the lock to avoid deadlock potential since their completion // may require additional Zookeeper requests, which will block to acquire the initialization lock stateChangeHandlers.values.foreach(callBeforeInitializingSession _) inWriteLock(initializationLock) { // if (!connectionState.isAlive) { // 此处判断可能不准确 if (!connectionState.isConnected) { // If use isConnected to determine whether need reinitialize zookeeper session, program will not skip this and to execute some operation that need zookeeper be connected. zooKeeper.close() info(s"Initializing a new session to $connectString.") // retry forever until ZooKeeper can be instantiated var connected = false while (!connected) { try { zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher) connected = true } catch { case e: Exception => info("Error when recreating ZooKeeper, retrying after a short sleep", e) Thread.sleep(1000) } } } } stateChangeHandlers.values.foreach(callAfterInitializingSession _) }{code} Zookeeper Code: {code:java} //代码占位符 @InterfaceAudience.Public public enum States { CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY, CLOSED, AUTH_FAILED, NOT_CONNECTED; public boolean isAlive() { return this != CLOSED && this != AUTH_FAILED; // 此处判断不准确 } /** * Returns whether we are connected to a server (which * could possibly be read-only, if this client is allowed * to go to read-only mode) * */ public boolean isConnected() { return this == CONNECTED || this == CONNECTEDREADONLY; } } {code} I hope you pay attention on it. was (Author: lbdai3190): [~junrao], [~yuzhih...@gmail.com] Thank you for you reply, 1. I have tested the Exception in handleRequest(), and as you a said , excpetions throws in handleRequest method will not block on countDownLatch.await(). 2. I recheck the jstack log in attachment, and there is my analysis result as attachment controllerDeadLockAnalysis-2020-01-20.png, I hope you pay attention on it. > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: controllerDeadLockAnalysis-2020-01-20.png, js.log, > js0.log, js1.log, js2.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // > 等待controller-event-thread线程处理expireEvent > at > kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533) > at > kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173) > at > kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408) > at >
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019294#comment-17019294 ] leibo edited comment on KAFKA-8532 at 1/20/20 8:28 AM: --- [~junrao], [~yuzhih...@gmail.com] Thank you for you reply, 1. I have tested the Exception in handleRequest(), and as you a said , excpetions throws in handleRequest method will not block on countDownLatch.await(). 2. I recheck the jstack log in attachment, and there is my analysis result as attachment controllerDeadLockAnalysis-2020-01-20.png, I hope you pay attention on it. was (Author: lbdai3190): [~junrao] Thank you for you reply, 1. I have tested the Exception in handleRequest(), and as you a said , excpetions throws in handleRequest method will not block on countDownLatch.await(). 2. I recheck the jstack log in attachment, and there is my analysis result as attachment controllerDeadLockAnalysis-2020-01-20.png, I hope you pay attention on it. > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: controllerDeadLockAnalysis-2020-01-20.png, js.log, > js0.log, js1.log, js2.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // > 等待controller-event-thread线程处理expireEvent > at > kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533) > at > kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173) > at > kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374) > at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown > Source) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209) > at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428) > at > kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown > Source) > at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) > at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown > Source) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Locked ownable synchronizers: > - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker) > "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 > nid=0x310 waiting on condition [0x7fccb55c8000] >
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019294#comment-17019294 ] leibo edited comment on KAFKA-8532 at 1/20/20 8:26 AM: --- [~junrao] Thank you for you reply, 1. I have tested the Exception in handleRequest(), and as you a said , excpetions throws in handleRequest method will not block on countDownLatch.await(). 2. I recheck the jstack log in attachment, and there is my analysis result as attachment controllerDeadLockAnalysis-2020-01-20.png, I hope you pay attention on it. was (Author: lbdai3190): [~junrao] Thank you for you reply, 1. I have tested the Exception in handleRequest(), and as you a said , excpetions throws in handleRequest will not block on countDownLatch.await(). 2. I recheck the jstack log in attachment, and there is my analysis result as attachment controllerDeadLockAnalysis-2020-01-20.png, > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: controllerDeadLockAnalysis-2020-01-20.png, js.log, > js0.log, js1.log, js2.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // > 等待controller-event-thread线程处理expireEvent > at > kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533) > at > kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173) > at > kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374) > at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown > Source) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209) > at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428) > at > kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown > Source) > at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) > at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown > Source) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Locked ownable synchronizers: > - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker) > "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 > nid=0x310 waiting on condition [0x7fccb55c8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019294#comment-17019294 ] leibo edited comment on KAFKA-8532 at 1/20/20 8:21 AM: --- [~junrao] Thank you for you reply, 1. I have tested the Exception in handleRequest(), and as you a said , excpetions throws in handleRequest will not block on countDownLatch.await(). 2. I recheck the jstack log in attachment, and there is my analysis result as attachment controllerDeadLockAnalysis-2020-01-20.png, was (Author: lbdai3190): [~junrao] Thank you for you reply, 1. I have tested the Exception in handleRequest(), and as you a said , excpetions throws in handleRequest will not block on countDownLatch.await(). 2. I recheck the jstack log in attachment, and there is my analysis result as attachment controllerDeadLockAnalysis2020-01-20.png, !controllerDeadLockAnalysis-2020-01-20.png! > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: controllerDeadLockAnalysis-2020-01-20.png, js.log, > js0.log, js1.log, js2.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // > 等待controller-event-thread线程处理expireEvent > at > kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533) > at > kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173) > at > kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374) > at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown > Source) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209) > at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428) > at > kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown > Source) > at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) > at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown > Source) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Locked ownable synchronizers: > - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker) > "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 > nid=0x310 waiting on condition [0x7fccb55c8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019294#comment-17019294 ] leibo edited comment on KAFKA-8532 at 1/20/20 8:20 AM: --- [~junrao] Thank you for you reply, 1. I have tested the Exception in handleRequest(), and as you a said , excpetions throws in handleRequest will not block on countDownLatch.await(). 2. I recheck the jstack log in attachment, and there is my analysis result as attachment controllerDeadLockAnalysis2020-01-20.png, !controllerDeadLockAnalysis-2020-01-20.png! was (Author: lbdai3190): [~junrao] Thank you for you reply, 1. I have tested the Exception in handleRequest(), and as you a said , excpetions throws in handleRequest will not block on countDownLatch.await(). 2. I recheck the jstack log in attachment, and there is my analysis result as attachment controllerDeadLockAnalysis2020-01-20.png, !controllerDeadLockAnalysis2020-01-20.png! > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: controllerDeadLockAnalysis-2020-01-20.png, js.log, > js0.log, js1.log, js2.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // > 等待controller-event-thread线程处理expireEvent > at > kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533) > at > kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173) > at > kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374) > at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown > Source) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209) > at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428) > at > kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown > Source) > at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) > at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown > Source) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Locked ownable synchronizers: > - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker) > "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 > nid=0x310 waiting on condition [0x7fccb55c8000] > java.lang.Thread.State: WAITING
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018368#comment-17018368 ] Ted Yu edited comment on KAFKA-8532 at 1/18/20 2:26 AM: Looking at js2.log , I am not sure deadlock was observed. Maybe handleRequests() took very long to execute (yet to be verified by whether there was exception in this code path from server log). I wonder if we can utilize the following form of await: {code} public boolean await(long timeout, TimeUnit unit) {code} so that the execution time of handleRequests() can be bounded. {code} diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala index 6a0809e16..32e4380c0 100755 --- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala +++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala @@ -165,7 +165,7 @@ class ZooKeeperClient(connectString: String, countDownLatch.countDown() } } - countDownLatch.await() + countDownLatch.await(sessionTimeoutMs, TimeUnit.MILLISECONDS) responseQueue.asScala.toBuffer } } {code} was (Author: yuzhih...@gmail.com): Looking at js2.log , I am not sure deadlock was observed. Maybe handleRequests() took very long to execute (yet to be verified by whether there was exception in this code path from server log). I wonder if we can utilize the following form of await: {code} public boolean await(long timeout, TimeUnit unit) {code} so that the execution time of handleRequests() can be bounded. {code} diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala index 6a0809e16..32e4380c0 100755 --- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala +++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala @@ -165,7 +165,7 @@ class ZooKeeperClient(connectString: String, countDownLatch.countDown() } } - countDownLatch.await() + countDownLatch.await(connectionTimeoutMs, TimeUnit.MILLISECONDS) responseQueue.asScala.toBuffer } } {code} > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: js.log, js0.log, js1.log, js2.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // > 等待controller-event-thread线程处理expireEvent > at > kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533) > at > kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173) > at > kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374) > at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown > Source) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209) > at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428) > at >
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018368#comment-17018368 ] Ted Yu edited comment on KAFKA-8532 at 1/18/20 2:22 AM: Looking at js2.log , I am not sure deadlock was observed. Maybe handleRequests() took very long to execute (yet to be verified by whether there was exception in this code path from server log). I wonder if we can utilize the following form of await: {code} public boolean await(long timeout, TimeUnit unit) {code} so that the execution time of handleRequests() can be bounded. {code} diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala index 6a0809e16..32e4380c0 100755 --- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala +++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala @@ -165,7 +165,7 @@ class ZooKeeperClient(connectString: String, countDownLatch.countDown() } } - countDownLatch.await() + countDownLatch.await(connectionTimeoutMs, TimeUnit.MILLISECONDS) responseQueue.asScala.toBuffer } } {code} was (Author: yuzhih...@gmail.com): Looking at js2.log , I am not sure deadlock was observed. Maybe handleRequests() took very long to execute (yet to be verified by whether there was exception in this code path from server log). I wonder if we can utilize the following form of await: ``` public boolean await(long timeout, TimeUnit unit) ``` so that the execution time of handleRequests() can be bounded. ``` diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala index 6a0809e16..32e4380c0 100755 --- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala +++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala @@ -165,7 +165,7 @@ class ZooKeeperClient(connectString: String, countDownLatch.countDown() } } - countDownLatch.await() + countDownLatch.await(connectionTimeoutMs, TimeUnit.MILLISECONDS) responseQueue.asScala.toBuffer } } ``` > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: js.log, js0.log, js1.log, js2.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // > 等待controller-event-thread线程处理expireEvent > at > kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533) > at > kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173) > at > kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374) > at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown > Source) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209) > at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428) > at >
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018368#comment-17018368 ] Ted Yu edited comment on KAFKA-8532 at 1/18/20 2:21 AM: Looking at js2.log , I am not sure deadlock was observed. Maybe handleRequests() took very long to execute (yet to be verified by whether there was exception in this code path from server log). I wonder if we can utilize the following form of await: ``` public boolean await(long timeout, TimeUnit unit) ``` so that the execution time of handleRequests() can be bounded. ``` diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala index 6a0809e16..32e4380c0 100755 --- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala +++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala @@ -165,7 +165,7 @@ class ZooKeeperClient(connectString: String, countDownLatch.countDown() } } - countDownLatch.await() + countDownLatch.await(connectionTimeoutMs, TimeUnit.MILLISECONDS) responseQueue.asScala.toBuffer } } ``` was (Author: yuzhih...@gmail.com): Looking at js2.log , I am not sure deadlock was observed. Maybe handleRequests() took very long to execute (yet to be verified by whether there was exception in this code path from server log). I wonder if we can utilize the following form of await: ``` public boolean await(long timeout, TimeUnit unit) ``` so that the execution time of handleRequests() can be bounded. > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: js.log, js0.log, js1.log, js2.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // > 等待controller-event-thread线程处理expireEvent > at > kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533) > at > kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173) > at > kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374) > at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown > Source) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209) > at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428) > at > kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown > Source) > at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) > at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown > Source) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at >
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018368#comment-17018368 ] Ted Yu edited comment on KAFKA-8532 at 1/17/20 10:42 PM: - Looking at js2.log , I am not sure deadlock was observed. Maybe handleRequests() took very long to execute (yet to be verified by whether there was exception in this code path from server log). I wonder if we can utilize the following form of await: ``` public boolean await(long timeout, TimeUnit unit) ``` so that the execution time of handleRequests() can be bounded. was (Author: yuzhih...@gmail.com): Looking at js2.log , I am not sure deadlock was observed. Maybe handleRequests() took very long to execute. I wonder if we can utilize the following form of await: ``` public boolean await(long timeout, TimeUnit unit) ``` so that the execution time of handleRequests() can be bounded. > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: js.log, js0.log, js1.log, js2.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // > 等待controller-event-thread线程处理expireEvent > at > kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533) > at > kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173) > at > kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374) > at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown > Source) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209) > at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428) > at > kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown > Source) > at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) > at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown > Source) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Locked ownable synchronizers: > - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker) > "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 > nid=0x310 waiting on condition [0x7fccb55c8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018152#comment-17018152 ] Ted Yu edited comment on KAFKA-8532 at 1/17/20 4:13 PM: How about making the following change ? {code} diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala index 7b995931f..6a0809e16 100755 --- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala +++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala @@ -158,14 +158,11 @@ class ZooKeeperClient(connectString: String, inReadLock(initializationLock) { send(request) { response => responseQueue.add(response) - inFlightRequests.release() - countDownLatch.countDown() } } -} catch { - case e: Throwable => -inFlightRequests.release() -throw e + } finally { + inFlightRequests.release() + countDownLatch.countDown() } } countDownLatch.await() {code} countDownLatch is handled consistently with inFlightRequests. I have run through core:test which passed. I can send out a PR. was (Author: yuzhih...@gmail.com): How about making the following change ? {code} diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala index 7b995931f..6a0809e16 100755 --- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala +++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala @@ -158,14 +158,11 @@ class ZooKeeperClient(connectString: String, inReadLock(initializationLock) { send(request) { response => responseQueue.add(response) - inFlightRequests.release() - countDownLatch.countDown() } } -} catch { - case e: Throwable => -inFlightRequests.release() -throw e + } finally { + inFlightRequests.release() + countDownLatch.countDown() } } countDownLatch.await() {code} countDownLatch is handled consistently with inFlightRequests. > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: js.log, js0.log, js1.log, js2.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // > 等待controller-event-thread线程处理expireEvent > at > kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533) > at > kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173) > at > kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374) > at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown > Source) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209) > at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374) > at >
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932042#comment-16932042 ] leibo edited comment on KAFKA-8532 at 9/18/19 4:27 AM: --- [~junrao], [~ijuma] My analysis: When kafka disconnected with zookeeper, Controller-event-thread is handing ISRChangeNotification event, because kafka is disconnected with zk, controller event handle is blocked on retryRequestUntilConnected -> handleRequest method. At this time, zk-session-expired-handle-thread is try to reinitialize a new zk connection with zookeeper, it put a Controller event named Expire to ControllerEventManager LinkedBlockingQueue queue, due to controller-event-thread is blocked on ISRChangeNotification event, the Expire event will not be handle by ControllerEventThread. The problem is obviously. ISRChangeNotification and Expire are block each other. The problem is here: {code:java} //代码占位符 def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { if (requests.isEmpty) Seq.empty else { val countDownLatch = new CountDownLatch(requests.size) val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size) requests.foreach { request => inFlightRequests.acquire() try { inReadLock(initializationLock) { send(request) { response => responseQueue.add(response) inFlightRequests.release() countDownLatch.countDown() } } } catch { case e: Throwable => inFlightRequests.release() throw e } } countDownLatch.await() responseQueue.asScala.toBuffer } } {code} we can see, if exception is occurred, countDownLatch.countDown() will not be execute, and this method will always blocked here. was (Author: lbdai3190): [~junrao], [~ijuma] My analysis: When kafka disconnected with zookeeper, Controller-event-thread is handing ISRChangeNotification event, because kafka is disconnected with zk, controller event handle is blocked on retryRequestUntilConnected -> handleRequest method. At this time, zk-session-expired-handle-thread is try to reinitialize a new zk connection with zookeeper, it put a Controller event named Expire to ControllerEventManager LinkedBlockingQueue queue, due to controller-event-thread is blocked on ISRChangeNotification event, the Expire event will not be handle. The problem is obviously. ISRChangeNotification and Expire are block each other. The problem is here: {code:java} //代码占位符 def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { if (requests.isEmpty) Seq.empty else { val countDownLatch = new CountDownLatch(requests.size) val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size) requests.foreach { request => inFlightRequests.acquire() try { inReadLock(initializationLock) { send(request) { response => responseQueue.add(response) inFlightRequests.release() countDownLatch.countDown() } } } catch { case e: Throwable => inFlightRequests.release() throw e } } countDownLatch.await() responseQueue.asScala.toBuffer } } {code} we can see, if exception is occurred, countDownLatch.countDown() will not be execute, and this method will always blocked here. > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: js.log, js0.log, js1.log, js2.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at >
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932042#comment-16932042 ] leibo edited comment on KAFKA-8532 at 9/18/19 4:26 AM: --- [~junrao], [~ijuma] My analysis: When kafka disconnected with zookeeper, Controller-event-thread is handing ISRChangeNotification event, because kafka is disconnected with zk, controller event handle is blocked on retryRequestUntilConnected -> handleRequest method. At this time, zk-session-expired-handle-thread is try to reinitialize a new zk connection with zookeeper, it put a Controller event named Expire to ControllerEventManager LinkedBlockingQueue queue, due to controller-event-thread is blocked on ISRChangeNotification event, the Expire event will not be handle. The problem is obviously. ISRChangeNotification and Expire are block each other. The problem is here: {code:java} //代码占位符 def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { if (requests.isEmpty) Seq.empty else { val countDownLatch = new CountDownLatch(requests.size) val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size) requests.foreach { request => inFlightRequests.acquire() try { inReadLock(initializationLock) { send(request) { response => responseQueue.add(response) inFlightRequests.release() countDownLatch.countDown() } } } catch { case e: Throwable => inFlightRequests.release() throw e } } countDownLatch.await() responseQueue.asScala.toBuffer } } {code} we can see, if exception is occurred, countDownLatch.countDown() will not be execute, and this method will always blocked here. was (Author: lbdai3190): [~junrao], [~ijuma] My analysis: When kafka disconnected with zookeeper, Controller-event-thread is handing ISRChangeNotification event, because kafka is disconnected with zk, controller event handle is blocked on retryRequestUntilConnected -> handleRequest method. At this time, zk-session-expired-handle-thread is try to reinitialize a new zk connection with zookeeper, it put a Controller event named Expire to ControllerEventManager LinkedBlockingQueue queue, due to controller-event-thread is blocked on ISRChangeNotification event, the Expire event will not be handle. The problem is obviously. ISRChangeNotification and Expire are block each other. The problem is here: {code:java} //代码占位符 def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { if (requests.isEmpty) Seq.empty else { val countDownLatch = new CountDownLatch(requests.size) val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size) requests.foreach { request => inFlightRequests.acquire() try { inReadLock(initializationLock) { send(request) { response => responseQueue.add(response) inFlightRequests.release() countDownLatch.countDown() } } } catch { case e: Throwable => inFlightRequests.release() throw e } } countDownLatch.await() responseQueue.asScala.toBuffer } } {code} we can see, if exception is occurred, countDownLatch.countDown() will not be execute, and this method will always blocked here. > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: js.log, js0.log, js1.log, js2.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at >
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932042#comment-16932042 ] leibo edited comment on KAFKA-8532 at 9/18/19 4:26 AM: --- [~junrao], [~ijuma] My analysis: When kafka disconnected with zookeeper, Controller-event-thread is handing ISRChangeNotification event, because kafka is disconnected with zk, controller event handle is blocked on retryRequestUntilConnected -> handleRequest method. At this time, zk-session-expired-handle-thread is try to reinitialize a new zk connection with zookeeper, it put a Controller event named Expire to ControllerEventManager LinkedBlockingQueue queue, due to controller-event-thread is blocked on ISRChangeNotification event, the Expire event will not be handle. The problem is obviously. ISRChangeNotification and Expire are block each other. The problem is here: {code:java} //代码占位符 def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { if (requests.isEmpty) Seq.empty else { val countDownLatch = new CountDownLatch(requests.size) val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size) requests.foreach { request => inFlightRequests.acquire() try { inReadLock(initializationLock) { send(request) { response => responseQueue.add(response) inFlightRequests.release() countDownLatch.countDown() } } } catch { case e: Throwable => inFlightRequests.release() throw e } } countDownLatch.await() responseQueue.asScala.toBuffer } } {code} we can see, if exception is occurred, countDownLatch.countDown() will not be execute, and this method will always blocked here. was (Author: lbdai3190): [~junrao], [~ijuma] My analysis: When kafka disconnected with zookeeper, Controller-event-thread is handing ISRChangeNotification event, because kafka is disconnected with zk, controller event handle is blocked on retryRequestUntilConnected -> handleRequest method. At this time, zk-session-expired-handle thread is try to reinitialize a new zk connection with zookeeper, it put a Controller event named Expire to ControllerEventManager LinkedBlockingQueue queue, due to controller-event-thread is blocked on ISRChangeNotification event, the Expire event will not be handle. The problem is obviously. ISRChangeNotification and Expire are block each other. The problem is here: {code:java} //代码占位符 def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { if (requests.isEmpty) Seq.empty else { val countDownLatch = new CountDownLatch(requests.size) val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size) requests.foreach { request => inFlightRequests.acquire() try { inReadLock(initializationLock) { send(request) { response => responseQueue.add(response) inFlightRequests.release() countDownLatch.countDown() } } } catch { case e: Throwable => inFlightRequests.release() throw e } } countDownLatch.await() responseQueue.asScala.toBuffer } } {code} we can see, if exception is occurred, countDownLatch.countDown() will not be execute, and this method will always blocked here. > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: js.log, js0.log, js1.log, js2.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at >
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932042#comment-16932042 ] leibo edited comment on KAFKA-8532 at 9/18/19 4:24 AM: --- [~junrao] My analysis: When kafka disconnected with zookeeper, Controller-event-thread is handing ISRChangeNotification event, because kafka is disconnected with zk, controller event handle is blocked on retryRequestUntilConnected -> handleRequest method. At this time, zk-session-expired-handle thread is try to reinitialize a new zk connection with zookeeper, it put a Controller event named Expire to ControllerEventManager LinkedBlockingQueue queue, due to controller-event-thread is blocked on ISRChangeNotification event, the Expire event will not be handle. The problem is obviously. ISRChangeNotification and Expire are block each other. The problem is here: {code:java} //代码占位符 def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { if (requests.isEmpty) Seq.empty else { val countDownLatch = new CountDownLatch(requests.size) val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size) requests.foreach { request => inFlightRequests.acquire() try { inReadLock(initializationLock) { send(request) { response => responseQueue.add(response) inFlightRequests.release() countDownLatch.countDown() } } } catch { case e: Throwable => inFlightRequests.release() throw e } } countDownLatch.await() responseQueue.asScala.toBuffer } } {code} we can see, if exception is occurred, countDownLatch.countDown() will not be execute, and this method will always blocked here. was (Author: lbdai3190): My analysis: When kafka disconnected with zookeeper, Controller-event-thread is handing ISRChangeNotification event, because kafka is disconnected with zk, controller event handle is blocked on retryRequestUntilConnected -> handleRequest method. At this time, zk-session-expired-handle thread is try to reinitialize a new zk connection with zookeeper, it put a Controller event named Expire to ControllerEventManager LinkedBlockingQueue queue, due to controller-event-thread is blocked on ISRChangeNotification event, the Expire event will not be handle. The problem is obviously. ISRChangeNotification and Expire are block each other. The problem is here: {code:java} //代码占位符 def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { if (requests.isEmpty) Seq.empty else { val countDownLatch = new CountDownLatch(requests.size) val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size) requests.foreach { request => inFlightRequests.acquire() try { inReadLock(initializationLock) { send(request) { response => responseQueue.add(response) inFlightRequests.release() countDownLatch.countDown() } } } catch { case e: Throwable => inFlightRequests.release() throw e } } countDownLatch.await() responseQueue.asScala.toBuffer } } {code} we can see, if exception is occurred, countDownLatch.countDown() will not be execute, and this method will always blocked here. > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: js.log, js0.log, js1.log, js2.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at >
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932042#comment-16932042 ] leibo edited comment on KAFKA-8532 at 9/18/19 4:25 AM: --- [~junrao], [~ijuma] My analysis: When kafka disconnected with zookeeper, Controller-event-thread is handing ISRChangeNotification event, because kafka is disconnected with zk, controller event handle is blocked on retryRequestUntilConnected -> handleRequest method. At this time, zk-session-expired-handle thread is try to reinitialize a new zk connection with zookeeper, it put a Controller event named Expire to ControllerEventManager LinkedBlockingQueue queue, due to controller-event-thread is blocked on ISRChangeNotification event, the Expire event will not be handle. The problem is obviously. ISRChangeNotification and Expire are block each other. The problem is here: {code:java} //代码占位符 def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { if (requests.isEmpty) Seq.empty else { val countDownLatch = new CountDownLatch(requests.size) val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size) requests.foreach { request => inFlightRequests.acquire() try { inReadLock(initializationLock) { send(request) { response => responseQueue.add(response) inFlightRequests.release() countDownLatch.countDown() } } } catch { case e: Throwable => inFlightRequests.release() throw e } } countDownLatch.await() responseQueue.asScala.toBuffer } } {code} we can see, if exception is occurred, countDownLatch.countDown() will not be execute, and this method will always blocked here. was (Author: lbdai3190): [~junrao] My analysis: When kafka disconnected with zookeeper, Controller-event-thread is handing ISRChangeNotification event, because kafka is disconnected with zk, controller event handle is blocked on retryRequestUntilConnected -> handleRequest method. At this time, zk-session-expired-handle thread is try to reinitialize a new zk connection with zookeeper, it put a Controller event named Expire to ControllerEventManager LinkedBlockingQueue queue, due to controller-event-thread is blocked on ISRChangeNotification event, the Expire event will not be handle. The problem is obviously. ISRChangeNotification and Expire are block each other. The problem is here: {code:java} //代码占位符 def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { if (requests.isEmpty) Seq.empty else { val countDownLatch = new CountDownLatch(requests.size) val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size) requests.foreach { request => inFlightRequests.acquire() try { inReadLock(initializationLock) { send(request) { response => responseQueue.add(response) inFlightRequests.release() countDownLatch.countDown() } } } catch { case e: Throwable => inFlightRequests.release() throw e } } countDownLatch.await() responseQueue.asScala.toBuffer } } {code} we can see, if exception is occurred, countDownLatch.countDown() will not be execute, and this method will always blocked here. > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: js.log, js0.log, js1.log, js2.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at >
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865266#comment-16865266 ] leibo edited comment on KAFKA-8532 at 6/17/19 11:15 AM: [~ijuma] I have tried to reproduced this ,but failed, it will appear after i restart kafka cluster a few days. The following info may be helpful: # We are running kafka cluster into docker container managed by kubernetes. # Until now, every time this issue occurred, we found the zookeeper container CPU are set to 1C while the host have more than 50C CPU. # When we set to zookeeper container 1C cpu(The host have more than 50C cpu), due to docker's problem, the zookeeper running in docker container will have more than 38 GC work threads(According to JVM official document, 1C cpu should correspond 2 GC work threads), these gc work thread may slow down zookeeper , and maybe let kafka and zookeeper session timed out. # After we update zookeeper jvm *-XX:ParallelGCThreads=4* , this issue is not occurred yet, but I think this Optimizing measure is just reduce the probability of this problem, not solved it. According to above info, I think when this problem occurred , the zookeeper is not running well, and kafka cluster deadlock occurred, but after some time, zookeeper recover normal running, kafka is not . was (Author: lbdai3190): [~ijuma] I have tried to reproduced this ,but failed, it will appear after i restart kafka cluster a few days. The following info may be helpful: # We are running kafka cluster in to docker container managed by kubernetes. # Until now, every time this issue occurred, we found the zookeeper container CPU are set to 1C while the host have more than 50C CPU. # When we set to zookeeper container 1C cpu(The host have more than 50C cpu), due to docker's problem, the zookeeper running in docker container will have more than 38 GC work threads(According to JVM official document, 1C cpu should correspond 2 GC work threads), these gc work thread may slow down zookeeper , and maybe let kafka and zookeeper session timed out. # After we update zookeeper jvm *-XX:ParallelGCThreads=4* , this issue is not occurred yet, but I think this Optimizing measure is just reduce the probability of this problem, not solved it. According to above info, I think when this problem occurred , the zookeeper is not running well, and kafka cluster deadlock occurred, but after some time, zookeeper recover normal running, kafka is not . > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: js.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // > 等待controller-event-thread线程处理expireEvent > at > kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533) > at > kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173) > at > kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374) > at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown > Source) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865266#comment-16865266 ] leibo edited comment on KAFKA-8532 at 6/17/19 2:53 AM: --- [~ijuma] I have tried to reproduced this ,but failed, it will appear after i restart kafka cluster a few days. The following info may be helpful: # We are running kafka cluster in to docker container managed by kubernetes. # Until now, every time this issue occurred, we found the zookeeper container CPU are set to 1C while the host have more than 50C CPU. # When we set to zookeeper container 1C cpu(The host have more than 50C cpu), due to docker's problem, the zookeeper running in docker container will have more than 38 GC work threads(According to JVM official document, 1C cpu should correspond 2 GC work threads), these gc work thread may slow down zookeeper , and maybe let kafka and zookeeper session timed out. # After we update zookeeper jvm *-XX:ParallelGCThreads=4* , this issue is not occurred yet, but I think this Optimizing measure is just reduce the probability of this problem, not solved it. According to above info, I think when this problem occurred , the zookeeper is not running well, and kafka cluster deadlock occurred, but after some time, zookeeper recover normal running, kafka is not . was (Author: lbdai3190): [~ijuma] I have tried to reproduced this ,but failed, it will appear after i restart kafka cluster a few days. The following info may be helpful: # We are running kafka cluster in to docker container managed by kubernetes. # Until now, every time this issue occurred, we found the zookeeper container CPU are set to 1C while the host have more than 50C CPU. # When we set to zookeeper container 1C cpu(The host have more than 50C cpu), due to docker's problem, the zookeeper running in docker container will have more than 38 GC work threads(According to JVM official document, 1C cpu should correspond 2 GC work threads), these gc work thread may slow down zookeeper , and maybe let kafka and zookeeper session timed out. # After we update zookeeper jvm *-XX:ParallelGCThreads=4* , this issue is not occurred yet, but I think this Optimizing measure is just reduce the probability of this problem, not solved it. According to above info, I think when this problem occurred , the zookeeper is not running well, and kafka cluster deadlock occurred, but after some time, zookeeper recovery normal running, kafka is not . > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: js.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // > 等待controller-event-thread线程处理expireEvent > at > kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533) > at > kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173) > at > kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374) > at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown > Source) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862668#comment-16862668 ] leibo edited comment on KAFKA-8532 at 6/14/19 2:59 AM: --- Here is some analysis of mine according to the jstack log and source code: When the deadlock occurred, # The zk-session-expiry-handler0 thread is reinitializing a new zookeeper instance, the expireEvent was put to queue of ControllerEventManager and waitting to handle by controller-event-handle thread. (*In the time, kafka is disconnected with zookeeper yet*). # However , in the same time, the controller-event-thread is handling RegisterBrokerAndReelect(type of ControllerEvent), as follows: {code:java} //KafkaController.scala case object RegisterBrokerAndReelect extends ControllerEvent { override def state: ControllerState = ControllerState.ControllerChange override def process(): Unit = { zkClient.registerBroker(brokerInfo) Reelect.process() } } {code} In this process, it is blocked at ZookeeperClient.handleRequests, code as follows: {code:java} //代码占位符 def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { if (requests.isEmpty) Seq.empty else { val countDownLatch = new CountDownLatch(requests.size) val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size) requests.foreach { request => inFlightRequests.acquire() try { inReadLock(initializationLock) { send(request) { response => responseQueue.add(response) inFlightRequests.release() countDownLatch.countDown() } } } catch { case e: Throwable => inFlightRequests.release() throw e } } countDownLatch.await() responseQueue.asScala.toBuffer } } {code} At this time, Zookeeper server is running normally, and all function is working well. I suspect the key point is the *initializationLock* , because if isReadLock is success, it will throw exception when kafka disconnected with zookeeper, but we never see any exception log in the kafka server.log controller-event-thread can not doing inReadLock(initializationLock) perhaps due to writeLock was not been release. There are tow way to donging inWriteLock(initailizationLock): 1. close(): {code:java} expiryScheduler.startup() try waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS) catch { case e: Throwable => close() //此处close的时候,加写锁 throw e } def close(): Unit = { info("Closing.") inWriteLock(initializationLock) { zNodeChangeHandlers.clear() zNodeChildChangeHandlers.clear() stateChangeHandlers.clear() zooKeeper.close() metricNames.foreach(removeMetric(_)) } // Shutdown scheduler outside of lock to avoid deadlock if scheduler // is waiting for lock to process session expiry expiryScheduler.shutdown() info("Closed.") } {code} 2. reinitialize() {code:java} private def reinitialize(): Unit = { // Initialization callbacks are invoked outside of the lock to avoid deadlock potential since their completion // may require additional Zookeeper requests, which will block to acquire the initialization lock stateChangeHandlers.values.foreach(callBeforeInitializingSession _) // zk-session-expiry-handler0 is blocked here inWriteLock(initializationLock) {// program is not running here if (!connectionState.isAlive) { zooKeeper.close() info(s"Initializing a new session to $connectString.") // retry forever until ZooKeeper can be instantiated var connected = false while (!connected) { try { zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher) connected = true } catch { case e: Exception => info("Error when recreating ZooKeeper, retrying after a short sleep", e) Thread.sleep(1000) } } } } stateChangeHandlers.values.foreach(callAfterInitializingSession _) } {code} so i think the problem is occurred in function close() was (Author: lbdai3190): Here is some analysis of mine according to the jstack log and source code: When the deadlock occurred, # The zk-session-expiry-handler0 thread is reinitializing a new zookeeper instance, the expireEvent was put to queue of ControllerEventManager and waitting to handle by controller-event-handle thread. (*In the time, kafka is disconnected with zookeeper yet*). # However , in the same time, the controller-event-thread is handling Startup(type of ControllerEvent), as follows: {code:java} //KafkaController.scala case object Startup extends ControllerEvent { def state = ControllerState.ControllerChange override def process(): Unit = { zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler) elect() } } {code} In
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862772#comment-16862772 ] leibo edited comment on KAFKA-8532 at 6/13/19 12:20 PM: [~te...@apache.org] I saw you have handled KAFKA-6879, I wish you can help me to analysis this problem, thank you. was (Author: lbdai3190): [~te...@apache.org] As you have handled KAFKA-6879, I wish you can help me to analysis this problem, thank you. > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: js.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only one way > to recovery the kafka cluster is restart kafka server. The follows is the > jstack log of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // > 等待controller-event-thread线程处理expireEvent > at > kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533) > at > kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173) > at > kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374) > at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown > Source) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209) > at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374) > at > kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428) > at > kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown > Source) > at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) > at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown > Source) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Locked ownable synchronizers: > - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker) > "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 > nid=0x310 waiting on condition [0x7fccb55c8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005d1be5a00> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at >
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862668#comment-16862668 ] leibo edited comment on KAFKA-8532 at 6/13/19 6:46 AM: --- Here is some analysis of mine according to the jstack log and source code: When the deadlock occurred, # The zk-session-expiry-handler0 thread is reinitializing a new zookeeper instance, the expireEvent was put to queue of ControllerEventManager and waitting to handle by controller-event-handle thread. (*In the time, kafka is disconnected with zookeeper yet*). # However , in the same time, the controller-event-thread is handling Startup(type of ControllerEvent), as follows: {code:java} //KafkaController.scala case object Startup extends ControllerEvent { def state = ControllerState.ControllerChange override def process(): Unit = { zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler) elect() } } {code} In this process, it is blocked at ZookeeperClient.handleRequests, code as follows: {code:java} //代码占位符 def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { if (requests.isEmpty) Seq.empty else { val countDownLatch = new CountDownLatch(requests.size) val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size) requests.foreach { request => inFlightRequests.acquire() try { inReadLock(initializationLock) { send(request) { response => responseQueue.add(response) inFlightRequests.release() countDownLatch.countDown() } } } catch { case e: Throwable => inFlightRequests.release() throw e } } countDownLatch.await() responseQueue.asScala.toBuffer } } {code} At this time, Zookeeper server is running normally, and all function is working well. I suspect the key point is the *initializationLock* , because if isReadLock is success, it will throw exception when kafka disconnected with zookeeper, but we never see any exception log in the kafka server.log controller-event-thread can not doing inReadLock(initializationLock) perhaps due to writeLock was not been release. There are tow way to donging inWriteLock(initailizationLock): 1. close(): {code:java} expiryScheduler.startup() try waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS) catch { case e: Throwable => close() //此处close的时候,加写锁 throw e } def close(): Unit = { info("Closing.") inWriteLock(initializationLock) { zNodeChangeHandlers.clear() zNodeChildChangeHandlers.clear() stateChangeHandlers.clear() zooKeeper.close() metricNames.foreach(removeMetric(_)) } // Shutdown scheduler outside of lock to avoid deadlock if scheduler // is waiting for lock to process session expiry expiryScheduler.shutdown() info("Closed.") } {code} 2. reinitialize() {code:java} private def reinitialize(): Unit = { // Initialization callbacks are invoked outside of the lock to avoid deadlock potential since their completion // may require additional Zookeeper requests, which will block to acquire the initialization lock stateChangeHandlers.values.foreach(callBeforeInitializingSession _) // zk-session-expiry-handler0 is blocked here inWriteLock(initializationLock) {// program is not running here if (!connectionState.isAlive) { zooKeeper.close() info(s"Initializing a new session to $connectString.") // retry forever until ZooKeeper can be instantiated var connected = false while (!connected) { try { zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher) connected = true } catch { case e: Exception => info("Error when recreating ZooKeeper, retrying after a short sleep", e) Thread.sleep(1000) } } } } stateChangeHandlers.values.foreach(callAfterInitializingSession _) } {code} so i think the problem is occurred in function close() was (Author: lbdai3190): Here is some analysis of mine according to the jstack log and source code: When the deadlock occurred, # The zk-session-expiry-handler0 thread is reinitializing a new zookeeper instance, the expireEvent was put to queue of ControllerEventManager and waitting to handle by controller-event-handle thread. (*In the time, kafka is disconnected with zookeeper yet*). # However , in the same time, the controller-event-thread is handling Startup(type of ControllerEvent), as bellow: {code:java} //KafkaController.scala case object Startup extends ControllerEvent { def state = ControllerState.ControllerChange override def process(): Unit = { zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler) elect() } } {code} In this process, it is blocked
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862668#comment-16862668 ] leibo edited comment on KAFKA-8532 at 6/13/19 6:45 AM: --- Here is some analysis of mine according to the jstack log and source code: When the deadlock occurred, # The zk-session-expiry-handler0 thread is reinitializing a new zookeeper instance, the expireEvent was put to queue of ControllerEventManager and waitting to handle by controller-event-handle thread. (*In the time, kafka is disconnected with zookeeper yet*). # However , in the same time, the controller-event-thread is handling Startup(type of ControllerEvent), as bellow: {code:java} //KafkaController.scala case object Startup extends ControllerEvent { def state = ControllerState.ControllerChange override def process(): Unit = { zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler) elect() } } {code} In this process, it is blocked at ZookeeperClient.handleRequests, code as bellow: {code:java} //代码占位符 def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { if (requests.isEmpty) Seq.empty else { val countDownLatch = new CountDownLatch(requests.size) val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size) requests.foreach { request => inFlightRequests.acquire() try { inReadLock(initializationLock) { send(request) { response => responseQueue.add(response) inFlightRequests.release() countDownLatch.countDown() } } } catch { case e: Throwable => inFlightRequests.release() throw e } } countDownLatch.await() responseQueue.asScala.toBuffer } } {code} At this time, Zookeeper server is running normally, and all function is working well. I suspect the key point is the *initializationLock* , because if isReadLock is success, it will throw exception when kafka disconnected with zookeeper, but we never see any exception log in the kafka server.log controller-event-thread can not doing inReadLock(initializationLock) perhaps due to writeLock was not been release. There are tow way to donging inWriteLock(initailizationLock): 1. close(): {code:java} expiryScheduler.startup() try waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS) catch { case e: Throwable => close() //此处close的时候,加写锁 throw e } def close(): Unit = { info("Closing.") inWriteLock(initializationLock) { zNodeChangeHandlers.clear() zNodeChildChangeHandlers.clear() stateChangeHandlers.clear() zooKeeper.close() metricNames.foreach(removeMetric(_)) } // Shutdown scheduler outside of lock to avoid deadlock if scheduler // is waiting for lock to process session expiry expiryScheduler.shutdown() info("Closed.") } {code} 2. reinitialize() {code:java} private def reinitialize(): Unit = { // Initialization callbacks are invoked outside of the lock to avoid deadlock potential since their completion // may require additional Zookeeper requests, which will block to acquire the initialization lock stateChangeHandlers.values.foreach(callBeforeInitializingSession _) // zk-session-expiry-handler0 is blocked here inWriteLock(initializationLock) {// program is not running here if (!connectionState.isAlive) { zooKeeper.close() info(s"Initializing a new session to $connectString.") // retry forever until ZooKeeper can be instantiated var connected = false while (!connected) { try { zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher) connected = true } catch { case e: Exception => info("Error when recreating ZooKeeper, retrying after a short sleep", e) Thread.sleep(1000) } } } } stateChangeHandlers.values.foreach(callAfterInitializingSession _) } {code} so i think the problem is occurred in function close() was (Author: lbdai3190): Here is some analysis of mine according to the jstack log and source code: When the deadlock occurred, # The zk-session-expiry-handler0 thread is reinitializing a new zookeeper instance, the expireEvent was put to queue of ControllerEventManager and waitting to handle by controller-event-handle thread. (*In the time, kafka is disconnected with zookeeper yet*). # However , in the same time, the controller-event-thread is handling Startup type of ControllerEvent, as bellow: {code:java} //KafkaController.scala case object Startup extends ControllerEvent { def state = ControllerState.ControllerChange override def process(): Unit = { zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler) elect() } } {code} In this process, it is blocked at
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862668#comment-16862668 ] leibo edited comment on KAFKA-8532 at 6/13/19 3:58 AM: --- Here is some analysis of mine according to the jstack log and source code: When the deadlock occurred, # The zk-session-expiry-handler0 thread is reinitializing a new zookeeper instance, the expireEvent was put to queue of ControllerEventManager and waitting to handle by controller-event-handle thread. (*In the time, kafka is disconnected with zookeeper yet*). # However , in the same time, the controller-event-thread is handling Startup type of ControllerEvent, as bellow: {code:java} //KafkaController.scala case object Startup extends ControllerEvent { def state = ControllerState.ControllerChange override def process(): Unit = { zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler) elect() } } {code} In this process, it is blocked at ZookeeperClient.handleRequests, code as bellow: {code:java} //代码占位符 def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { if (requests.isEmpty) Seq.empty else { val countDownLatch = new CountDownLatch(requests.size) val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size) requests.foreach { request => inFlightRequests.acquire() try { inReadLock(initializationLock) { send(request) { response => responseQueue.add(response) inFlightRequests.release() countDownLatch.countDown() } } } catch { case e: Throwable => inFlightRequests.release() throw e } } countDownLatch.await() responseQueue.asScala.toBuffer } } {code} At this time, Zookeeper server is running normally, and all function is working well. I suspect the key point is the *initializationLock* , because if isReadLock is success, it will throw exception when kafka disconnected with zookeeper, but we never see any exception log in the kafka server.log controller-event-thread can not doing inReadLock(initializationLock) perhaps due to writeLock was not been release. There are tow way to donging inWriteLock(initailizationLock): 1. close(): {code:java} def close(): Unit = { info("Closing.") inWriteLock(initializationLock) { zNodeChangeHandlers.clear() zNodeChildChangeHandlers.clear() stateChangeHandlers.clear() zooKeeper.close() metricNames.foreach(removeMetric(_)) } // Shutdown scheduler outside of lock to avoid deadlock if scheduler // is waiting for lock to process session expiry expiryScheduler.shutdown() info("Closed.") } {code} 2. reinitialize() {code:java} private def reinitialize(): Unit = { // Initialization callbacks are invoked outside of the lock to avoid deadlock potential since their completion // may require additional Zookeeper requests, which will block to acquire the initialization lock stateChangeHandlers.values.foreach(callBeforeInitializingSession _) // zk-session-expiry-handler0 is blocked here inWriteLock(initializationLock) {// program is not running here if (!connectionState.isAlive) { zooKeeper.close() info(s"Initializing a new session to $connectString.") // retry forever until ZooKeeper can be instantiated var connected = false while (!connected) { try { zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher) connected = true } catch { case e: Exception => info("Error when recreating ZooKeeper, retrying after a short sleep", e) Thread.sleep(1000) } } } } stateChangeHandlers.values.foreach(callAfterInitializingSession _) } {code} so i think the problem is occurred in function close() was (Author: lbdai3190): Here is some analysis of mine according to the jstack log and source code: When the deadlock occurred, # The zk-session-expiry-handler0 thread is reinitializing a new zookeeper instance, the expireEvent was put to queue of ControllerEventManager and waitting to handle by controller-event-handle thread. (*In the time, kafka is disconnected with zookeeper yet*). # However , in the same time, the controller-event-thread is handling Startup type of ControllerEvent, as bellow: {code:java} //KafkaController.scala case object Startup extends ControllerEvent { def state = ControllerState.ControllerChange override def process(): Unit = { zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler) elect() } } {code} In this process, it is blocked at ZookeeperClient.handleRequests, code as bellow: {code:java} //代码占位符 def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { if
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862668#comment-16862668 ] leibo edited comment on KAFKA-8532 at 6/13/19 4:00 AM: --- Here is some analysis of mine according to the jstack log and source code: When the deadlock occurred, # The zk-session-expiry-handler0 thread is reinitializing a new zookeeper instance, the expireEvent was put to queue of ControllerEventManager and waitting to handle by controller-event-handle thread. (*In the time, kafka is disconnected with zookeeper yet*). # However , in the same time, the controller-event-thread is handling Startup type of ControllerEvent, as bellow: {code:java} //KafkaController.scala case object Startup extends ControllerEvent { def state = ControllerState.ControllerChange override def process(): Unit = { zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler) elect() } } {code} In this process, it is blocked at ZookeeperClient.handleRequests, code as bellow: {code:java} //代码占位符 def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { if (requests.isEmpty) Seq.empty else { val countDownLatch = new CountDownLatch(requests.size) val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size) requests.foreach { request => inFlightRequests.acquire() try { inReadLock(initializationLock) { send(request) { response => responseQueue.add(response) inFlightRequests.release() countDownLatch.countDown() } } } catch { case e: Throwable => inFlightRequests.release() throw e } } countDownLatch.await() responseQueue.asScala.toBuffer } } {code} At this time, Zookeeper server is running normally, and all function is working well. I suspect the key point is the *initializationLock* , because if isReadLock is success, it will throw exception when kafka disconnected with zookeeper, but we never see any exception log in the kafka server.log controller-event-thread can not doing inReadLock(initializationLock) perhaps due to writeLock was not been release. There are tow way to donging inWriteLock(initailizationLock): 1. close(): {code:java} expiryScheduler.startup() try waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS) catch { case e: Throwable => close() //此处close的时候,加写锁 throw e } def close(): Unit = { info("Closing.") inWriteLock(initializationLock) { zNodeChangeHandlers.clear() zNodeChildChangeHandlers.clear() stateChangeHandlers.clear() zooKeeper.close() metricNames.foreach(removeMetric(_)) } // Shutdown scheduler outside of lock to avoid deadlock if scheduler // is waiting for lock to process session expiry expiryScheduler.shutdown() info("Closed.") } {code} 2. reinitialize() {code:java} private def reinitialize(): Unit = { // Initialization callbacks are invoked outside of the lock to avoid deadlock potential since their completion // may require additional Zookeeper requests, which will block to acquire the initialization lock stateChangeHandlers.values.foreach(callBeforeInitializingSession _) // zk-session-expiry-handler0 is blocked here inWriteLock(initializationLock) {// program is not running here if (!connectionState.isAlive) { zooKeeper.close() info(s"Initializing a new session to $connectString.") // retry forever until ZooKeeper can be instantiated var connected = false while (!connected) { try { zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher) connected = true } catch { case e: Exception => info("Error when recreating ZooKeeper, retrying after a short sleep", e) Thread.sleep(1000) } } } } stateChangeHandlers.values.foreach(callAfterInitializingSession _) } {code} so i think the problem is occurred in function close() was (Author: lbdai3190): Here is some analysis of mine according to the jstack log and source code: When the deadlock occurred, # The zk-session-expiry-handler0 thread is reinitializing a new zookeeper instance, the expireEvent was put to queue of ControllerEventManager and waitting to handle by controller-event-handle thread. (*In the time, kafka is disconnected with zookeeper yet*). # However , in the same time, the controller-event-thread is handling Startup type of ControllerEvent, as bellow: {code:java} //KafkaController.scala case object Startup extends ControllerEvent { def state = ControllerState.ControllerChange override def process(): Unit = { zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler) elect() } } {code} In this process, it is blocked at
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862668#comment-16862668 ] leibo edited comment on KAFKA-8532 at 6/13/19 3:58 AM: --- Here is some analysis of mine according to the jstack log and source code: When the deadlock occurred, # The zk-session-expiry-handler0 thread is reinitializing a new zookeeper instance, the expireEvent was put to queue of ControllerEventManager and waitting to handle by controller-event-handle thread. (*In the time, kafka is disconnected with zookeeper yet*). # However , in the same time, the controller-event-thread is handling Startup type of ControllerEvent, as bellow: {code:java} //KafkaController.scala case object Startup extends ControllerEvent { def state = ControllerState.ControllerChange override def process(): Unit = { zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler) elect() } } {code} In this process, it is blocked at ZookeeperClient.handleRequests, code as bellow: {code:java} //代码占位符 def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { if (requests.isEmpty) Seq.empty else { val countDownLatch = new CountDownLatch(requests.size) val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size) requests.foreach { request => inFlightRequests.acquire() try { inReadLock(initializationLock) { send(request) { response => responseQueue.add(response) inFlightRequests.release() countDownLatch.countDown() } } } catch { case e: Throwable => inFlightRequests.release() throw e } } countDownLatch.await() responseQueue.asScala.toBuffer } } {code} At this time, Zookeeper server is running normally, and all function is working well. I suspect the key point is the *initializationLock* , because if isReadLock is success, it will throw exception when kafka disconnected with zookeeper, but we never see any exception log in the kafka server.log controller-event-thread can not doing inReadLock(initializationLock) perhaps due to writeLock was not been release. There are tow way to donging inWriteLock(initailizationLock): 1. close(): {code:java} def close(): Unit = { info("Closing.") inWriteLock(initializationLock) { zNodeChangeHandlers.clear() zNodeChildChangeHandlers.clear() stateChangeHandlers.clear() zooKeeper.close() metricNames.foreach(removeMetric(_)) } // Shutdown scheduler outside of lock to avoid deadlock if scheduler // is waiting for lock to process session expiry expiryScheduler.shutdown() info("Closed.") } {code} 2. reinitialize() {code:java} private def reinitialize(): Unit = { // Initialization callbacks are invoked outside of the lock to avoid deadlock potential since their completion // may require additional Zookeeper requests, which will block to acquire the initialization lock stateChangeHandlers.values.foreach(callBeforeInitializingSession _) // zk-session-expiry-handler0 is blocked here inWriteLock(initializationLock) {// program is not running here if (!connectionState.isAlive) { zooKeeper.close() info(s"Initializing a new session to $connectString.") // retry forever until ZooKeeper can be instantiated var connected = false while (!connected) { try { zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher) connected = true } catch { case e: Exception => info("Error when recreating ZooKeeper, retrying after a short sleep", e) Thread.sleep(1000) } } } } stateChangeHandlers.values.foreach(callAfterInitializingSession _) } {code} so i think the problem is occurred in function close() was (Author: lbdai3190): Here is some analysis of mine according to the jstack log and source code: When the deadlock occurred, # The zk-session-expiry-handler0 thread is reinitializing a new zookeeper instance, the expireEvent was put to queue of ControllerEventManager and waitting to handle by controller-event-handle thread. (*In the time, kafka is disconnected with zookeeper yet*). # However , in the same time, the controller-event-thread is handling Startup type of ControllerEvent, as bellow: {code:java} //KafkaController.scala case object Startup extends ControllerEvent { def state = ControllerState.ControllerChange override def process(): Unit = { zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler) elect() } } {code} In this process, it is blocked at ZookeeperClient.handleRequests, code as bellow: {code:java} //代码占位符 def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { if
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862668#comment-16862668 ] leibo edited comment on KAFKA-8532 at 6/13/19 3:58 AM: --- Here is some analysis of mine according to the jstack log and source code: When the deadlock occurred, # The zk-session-expiry-handler0 thread is reinitializing a new zookeeper instance, the expireEvent was put to queue of ControllerEventManager and waitting to handle by controller-event-handle thread. (*In the time, kafka is disconnected with zookeeper yet*). # However , in the same time, the controller-event-thread is handling Startup type of ControllerEvent, as bellow: {code:java} //KafkaController.scala case object Startup extends ControllerEvent { def state = ControllerState.ControllerChange override def process(): Unit = { zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler) elect() } } {code} In this process, it is blocked at ZookeeperClient.handleRequests, code as bellow: {code:java} //代码占位符 def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { if (requests.isEmpty) Seq.empty else { val countDownLatch = new CountDownLatch(requests.size) val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size) requests.foreach { request => inFlightRequests.acquire() try { inReadLock(initializationLock) { send(request) { response => responseQueue.add(response) inFlightRequests.release() countDownLatch.countDown() } } } catch { case e: Throwable => inFlightRequests.release() throw e } } countDownLatch.await() responseQueue.asScala.toBuffer } } {code} At this time, Zookeeper server is running normally, and all function is working well. I suspect the key point is the *initializationLock* , because if isReadLock is success, it will throw exception when kafka disconnected with zookeeper, but we never see any exception log in the kafka server.log controller-event-thread can not doing inReadLock(initializationLock) perhaps due to writeLock was not been release. There are tow way to donging inWriteLock(initailizationLock): 1. close(): {code:java} def close(): Unit = { info("Closing.") inWriteLock(initializationLock) { zNodeChangeHandlers.clear() zNodeChildChangeHandlers.clear() stateChangeHandlers.clear() zooKeeper.close() metricNames.foreach(removeMetric(_)) } // Shutdown scheduler outside of lock to avoid deadlock if scheduler // is waiting for lock to process session expiry expiryScheduler.shutdown() info("Closed.") } {code} 2. reinitialize() {code:java} private def reinitialize(): Unit = { // Initialization callbacks are invoked outside of the lock to avoid deadlock potential since their completion // may require additional Zookeeper requests, which will block to acquire the initialization lock stateChangeHandlers.values.foreach(callBeforeInitializingSession _) // zk-session-expiry-handler0 is blocked here inWriteLock(initializationLock) {// program is not running here if (!connectionState.isAlive) { zooKeeper.close() info(s"Initializing a new session to $connectString.") // retry forever until ZooKeeper can be instantiated var connected = false while (!connected) { try { zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher) connected = true } catch { case e: Exception => info("Error when recreating ZooKeeper, retrying after a short sleep", e) Thread.sleep(1000) } } } } stateChangeHandlers.values.foreach(callAfterInitializingSession _) } {code} so i think the problem is occurred in function close() was (Author: lbdai3190): Here is some analysis of mine according to the jstack log and source code: When the deadlock occurred, # The zk-session-expiry-handler0 thread is reinitializing a new zookeeper instance, the expireEvent was put to queue of ControllerEventManager and waitting to handle by controller-event-handle thread. (*In the time, kafka is disconnected with zookeeper yet*). # However , in the same time, the controller-event-thread is handling Startup type of ControllerEvent, as bellow: {code:java} //KafkaController.scala case object Startup extends ControllerEvent { def state = ControllerState.ControllerChange override def process(): Unit = { zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler) elect() } } {code} In this process, it is blocked at ZookeeperClient.handleRequests, code as bellow: {code:java} //代码占位符 def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { if (requests.isEmpty)
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862668#comment-16862668 ] leibo edited comment on KAFKA-8532 at 6/13/19 3:34 AM: --- Here is some analysis of mine according to the jstack log and source code: When the deadlock occurred, # The zk-session-expiry-handler0 thread is reinitializing a new zookeeper instance, the expireEvent was put to queue of ControllerEventManager and waitting to handle by controller-event-handle thread. (*In the time, kafka is disconnected with zookeeper yet*). # However , in the same time, the controller-event-thread is handling Startup type of ControllerEvent, as bellow: {code:java} //KafkaController.scala case object Startup extends ControllerEvent { def state = ControllerState.ControllerChange override def process(): Unit = { zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler) elect() } } {code} In this process, it is blocked at ZookeeperClient.handleRequests, code as bellow: {code:java} //代码占位符 def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { if (requests.isEmpty) Seq.empty else { val countDownLatch = new CountDownLatch(requests.size) val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size) requests.foreach { request => inFlightRequests.acquire() try { inReadLock(initializationLock) { send(request) { response => responseQueue.add(response) inFlightRequests.release() countDownLatch.countDown() } } } catch { case e: Throwable => inFlightRequests.release() throw e } } countDownLatch.await() responseQueue.asScala.toBuffer } } {code} At this time, Zookeeper server is running normally, and all function is working well. I suspect the key point is the *initializationLock* , because if isReadLock is success, it will throw exception when kafka disconnected with zookeeper, but we never see any exception log in the kafka server.log was (Author: lbdai3190): Here is some analysis of mine according to the jstack log and source code: When the deadlock occurred, # The zk-session-expiry-handler0 thread is reinitializing a new zookeeper instance, the expireEvent was put to queue of ControllerEventManager and waitting to handle by controller-event-handle thread. (*In the time, kafka is disconnected with zookeeper yet*). # However , in the same time, the controller-event-thread is handling Startup type of ControllerEvent, as bellow: {code:java} //KafkaController.scala case object Startup extends ControllerEvent { def state = ControllerState.ControllerChange override def process(): Unit = { zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler) elect() } } {code} In this process, it is blocked at ZookeeperClient.handleRequests, code as bellow: {code:java} //代码占位符 def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { if (requests.isEmpty) Seq.empty else { val countDownLatch = new CountDownLatch(requests.size) val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size) requests.foreach { request => inFlightRequests.acquire() try { inReadLock(initializationLock) { send(request) { response => responseQueue.add(response) inFlightRequests.release() countDownLatch.countDown() } } } catch { case e: Throwable => inFlightRequests.release() throw e } } countDownLatch.await() responseQueue.asScala.toBuffer } } {code} At this time, Zookeeper server is running normally, and all function is working well. I suspect the key point is the *initializationLock* , because is isReadLock is success, it will throw exception when kafka disconnected with zookeeper, but we never see any exception log in the kafka server.log > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: js.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only way to > recovery the kafka cluster is restart kafka server. Bellow is the jstack log > of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on
[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0
[ https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862668#comment-16862668 ] leibo edited comment on KAFKA-8532 at 6/13/19 3:31 AM: --- Here is some analysis of mine according to the jstack log and source code: When the deadlock occurred, # The zk-session-expiry-handler0 thread is reinitializing a new zookeeper instance, the expireEvent was put to queue of ControllerEventManager and waitting to handle by controller-event-handle thread. (*In the time, kafka is disconnected with zookeeper yet*). # However , in the same time, the controller-event-thread is handling Startup type of ControllerEvent, as bellow: {code:java} //KafkaController.scala case object Startup extends ControllerEvent { def state = ControllerState.ControllerChange override def process(): Unit = { zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler) elect() } } {code} In this process, it is blocked at ZookeeperClient.handleRequests, code as bellow: {code:java} //代码占位符 def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { if (requests.isEmpty) Seq.empty else { val countDownLatch = new CountDownLatch(requests.size) val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size) requests.foreach { request => inFlightRequests.acquire() try { inReadLock(initializationLock) { send(request) { response => responseQueue.add(response) inFlightRequests.release() countDownLatch.countDown() } } } catch { case e: Throwable => inFlightRequests.release() throw e } } countDownLatch.await() responseQueue.asScala.toBuffer } } {code} At this time, Zookeeper server is running normally, and all function is working well. I suspect the key point is the *initializationLock* , because is isReadLock is success, it will throw exception when kafka disconnected with zookeeper, but we never see any exception log in the kafka server.log was (Author: lbdai3190): Here is some analysis of mine according to the jstack log and source code: When the deadlock occurred, # The zk-session-expiry-handler0 thread is reinitializing a new zookeeper instance, the expireEvent was put to queue of ControllerEventManager and waitting to handle by controller-event-handle thread. (*In the time, kafka is disconnected with zookeeper yet*). # However , in the same time, the controller-event-thread is handling Startup type of ControllerEvent, as bellow: {code:java} //KafkaController.scala case object Startup extends ControllerEvent { def state = ControllerState.ControllerChange override def process(): Unit = { zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler) elect() } } {code} In this process, it is blocked at ZookeeperClient.handleRequests, code as bellow: {code:java} //代码占位符 def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { if (requests.isEmpty) Seq.empty else { val countDownLatch = new CountDownLatch(requests.size) val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size) requests.foreach { request => inFlightRequests.acquire() try { inReadLock(initializationLock) { send(request) { response => responseQueue.add(response) inFlightRequests.release() countDownLatch.countDown() } } } catch { case e: Throwable => inFlightRequests.release() throw e } } countDownLatch.await() responseQueue.asScala.toBuffer } } {code} > controller-event-thread deadlock with zk-session-expiry-handler0 > > > Key: KAFKA-8532 > URL: https://issues.apache.org/jira/browse/KAFKA-8532 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Blocker > Attachments: js.log > > > We have observed a serious deadlock between controller-event-thead and > zk-session-expirey-handle thread. When this issue occurred, it's only way to > recovery the kafka cluster is restart kafka server. Bellow is the jstack log > of controller-event-thead and zk-session-expiry-handle thread. > "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 > tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0005ee3f7000> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at >