[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-21 Thread leibo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020027#comment-17020027
 ] 

leibo edited comment on KAFKA-8532 at 1/22/20 12:56 AM:


[~junrao] I do a further analysis as below:

zk client heartbeat thread org.apache.zookeeper.ClientCnxn$SendThread
{code:java}
SendThread(ClientCnxnSocket clientCnxnSocket) {
super(ClientCnxn.makeThreadName("-SendThread()"));
ClientCnxn.this.state = States.CONNECTING;//init state
this.clientCnxnSocket = clientCnxnSocket;
this.setDaemon(true);
}
{code}
As we can see, zk heartbeat thread SendThread init state is *CONNECTING*,

And when zk session expired, zookeeper.getState() is *CONNECTING*.
{code:java}
//代码占位符
public enum States {
CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
CLOSED, AUTH_FAILED, NOT_CONNECTED;

public boolean isAlive() {  // when zk client state is CONNECTING 
or  NOT_CONNECTED, obviously, zk session is not valid, we can't regard as 
zookeeper is Alive
return this != CLOSED && this != AUTH_FAILED;
}
public boolean isConnected() {
return this == CONNECTED || this == CONNECTEDREADONLY;
}
}
{code}
According to above code, we can see that when zk session expired, 
ClientCnxn.SendThread will attempt to reconnect to zk sever and establish a new 
session.
 It's init state is CONNECTING, if zk-session-expired-thread enter to 
reinitialize method with zookeeper state CONNECTING, it will skip if 
*(!connectionState.isAlive)* condition and execute callAfterInitializingSession 
, trigger controller-event-thread process RegisterBrokerAndReelect,  while zk 
session is invalid now.


was (Author: lbdai3190):
[~junrao] I do a further analysis as below:

zk client heartbeat thread org.apache.zookeeper.ClientCnxn$SendThread
{code:java}
SendThread(ClientCnxnSocket clientCnxnSocket) {
super(ClientCnxn.makeThreadName("-SendThread()"));
ClientCnxn.this.state = States.CONNECTING;//init state
this.clientCnxnSocket = clientCnxnSocket;
this.setDaemon(true);
}
{code}
As we can see, zk heartbeat thread SendThread init state is *CONNECTING*,
{code:java}
//代码占位符
public enum States {
CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
CLOSED, AUTH_FAILED, NOT_CONNECTED;

public boolean isAlive() {  // when zk client state is CONNECTING 
or  NOT_CONNECTED, obviously, zk session is not valid, we can't regard as 
zookeeper is Alive
return this != CLOSED && this != AUTH_FAILED;
}
public boolean isConnected() {
return this == CONNECTED || this == CONNECTEDREADONLY;
}
}
{code}
According to above code, we can see that when zk session expired, 
ClientCnxn.SendThread will attempt to reconnect to zk sever and establish a new 
session.
 It's init state is CONNECTING, if zk-session-expired-thread enter to 
reinitialize method with zookeeper state CONNECTING, it will skip if 
*(!connectionState.isAlive)* condition and execute callAfterInitializingSession 
, trigger controller-event-thread process RegisterBrokerAndReelect,  while zk 
session is invalid now.

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: controllerDeadLockAnalysis-2020-01-20.png, js.log, 
> js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-21 Thread leibo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020027#comment-17020027
 ] 

leibo edited comment on KAFKA-8532 at 1/21/20 8:51 AM:
---

[~junrao] I do a further analysis as below:

zk client heartbeat thread org.apache.zookeeper.ClientCnxn$SendThread
{code:java}
SendThread(ClientCnxnSocket clientCnxnSocket) {
super(ClientCnxn.makeThreadName("-SendThread()"));
ClientCnxn.this.state = States.CONNECTING;//init state
this.clientCnxnSocket = clientCnxnSocket;
this.setDaemon(true);
}
{code}
As we can see, zk heartbeat thread SendThread init state is *CONNECTING*,
{code:java}
//代码占位符
public enum States {
CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
CLOSED, AUTH_FAILED, NOT_CONNECTED;

public boolean isAlive() {  // when zk client state is CONNECTING 
or  NOT_CONNECTED, obviously, zk session is not valid, we can't regard as 
zookeeper is Alive
return this != CLOSED && this != AUTH_FAILED;
}
public boolean isConnected() {
return this == CONNECTED || this == CONNECTEDREADONLY;
}
}
{code}
According to above code, we can see that when zk session expired, 
ClientCnxn.SendThread will attempt to reconnect to zk sever and establish a new 
session.
 It's init state is CONNECTING, if zk-session-expired-thread enter to 
reinitialize method with zookeeper state CONNECTING, it will skip if 
*(!connectionState.isAlive)* condition and execute callAfterInitializingSession 
, trigger controller-event-thread process RegisterBrokerAndReelect,  while zk 
session is invalid now.


was (Author: lbdai3190):
zk client heartbeat thread org.apache.zookeeper.ClientCnxn$SendThread
{code:java}
SendThread(ClientCnxnSocket clientCnxnSocket) {
super(ClientCnxn.makeThreadName("-SendThread()"));
ClientCnxn.this.state = States.CONNECTING;//init state
this.clientCnxnSocket = clientCnxnSocket;
this.setDaemon(true);
}
{code}
As we can see, zk heartbeat thread SendThread init state is *CONNECTING*,
{code:java}
//代码占位符
public enum States {
CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
CLOSED, AUTH_FAILED, NOT_CONNECTED;

public boolean isAlive() {  // when zk client state is CONNECTING 
or  NOT_CONNECTED, obviously, zk session is not valid, we can't regard as 
zookeeper is Alive
return this != CLOSED && this != AUTH_FAILED;
}
public boolean isConnected() {
return this == CONNECTED || this == CONNECTEDREADONLY;
}
}
{code}
According to above code, we can found that when zk session expired, 
ClientCnxn.SendThread will attempt to reconnect to zk sever and establish a new 
session.
 It's init state is CONNECTING, if zk-session-expired-thread enter to 
reinitialize method with zookeeper state CONNECTING, it will skip if 
*(!connectionState.isAlive)* condition and execute callAfterInitializingSession 
, trigger controller-event-thread process RegisterBrokerAndReelect,  while zk 
session is invalid now.

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: controllerDeadLockAnalysis-2020-01-20.png, js.log, 
> js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-21 Thread leibo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020027#comment-17020027
 ] 

leibo edited comment on KAFKA-8532 at 1/21/20 8:49 AM:
---

zk client heartbeat thread org.apache.zookeeper.ClientCnxn$SendThread
{code:java}
SendThread(ClientCnxnSocket clientCnxnSocket) {
super(ClientCnxn.makeThreadName("-SendThread()"));
ClientCnxn.this.state = States.CONNECTING;//init state
this.clientCnxnSocket = clientCnxnSocket;
this.setDaemon(true);
}
{code}
As we can see, zk heartbeat thread SendThread init state is *CONNECTING*,
{code:java}
//代码占位符
public enum States {
CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
CLOSED, AUTH_FAILED, NOT_CONNECTED;

public boolean isAlive() {  // when zk client state is CONNECTING 
or  NOT_CONNECTED, obviously, zk session is not valid, we can't regard as 
zookeeper is Alive
return this != CLOSED && this != AUTH_FAILED;
}
public boolean isConnected() {
return this == CONNECTED || this == CONNECTEDREADONLY;
}
}
{code}
According to above code, we can found that when zk session expired, 
ClientCnxn.SendThread will attempt to reconnect to zk sever and establish a new 
session.
 It's init state is CONNECTING, if zk-session-expired-thread enter to 
reinitialize method with zookeeper state CONNECTING, it will skip if 
*(!connectionState.isAlive)* condition and execute callAfterInitializingSession 
, trigger controller-event-thread process RegisterBrokerAndReelect,  while zk 
session is invalid now.


was (Author: lbdai3190):
{code:java}
//代码占位符
public enum States {
CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
CLOSED, AUTH_FAILED, NOT_CONNECTED;

public boolean isAlive() {  // when zk client state is CONNECTING 
or  NOT_CONNECTED, obviously, zk session is not valid, we can't regard as 
zookeeper is Alive
return this != CLOSED && this != AUTH_FAILED;
}

/**
 * Returns whether we are connected to a server (which
 * could possibly be read-only, if this client is allowed
 * to go to read-only mode)
 * */
public boolean isConnected() {
return this == CONNECTED || this == CONNECTEDREADONLY;
}
}
{code}

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: controllerDeadLockAnalysis-2020-01-20.png, js.log, 
> js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-21 Thread leibo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019851#comment-17019851
 ] 

leibo edited comment on KAFKA-8532 at 1/21/20 8:24 AM:
---

[~junrao] 

I'm agree with your opinion "all pending ZK requests should complete with a 
SessionExpired error through the response callback". 

As the attachment controllerDeadLockAnalysis-2020-01-20.png I uploaded.

RegisterBrokerAndReelect need a established zookeeper session to process finish 
by controller-event-thread,Expired need be process by controller-event-thread 
to established zookeeper session, but controller-event-thread is busy to 
handing RegisterBrokerAndReelect.

 

According to current analysis, I think it's tow ways to deal with this problem:

1.  Use more strict judgment conditions to prevent ControllerEvent  
RegisterBrokerAndReelect  to be execute when zk session expired.

2.  Found out the reason why handleRequest blocked when zookeeper session 
Expired. 


was (Author: lbdai3190):
[~junrao] 

I'm agree with your opinion "all pending ZK requests should complete with a 
SessionExpired error through the response callback". 

As the attachment controllerDeadLockAnalysis-2020-01-20.png I uploaded.

RegisterBrokerAndReelect need a established zookeeper session to process finish 
by controller-event-thread,Expired need be process by controller-event-thread 
to established zookeeper session, but controller-event-thread is busy to 
handing RegisterBrokerAndReelect.

 

According to current analysis, I think it's tow ways to deal with this problem:

1.  Use more strict judgment conditions to prevent ControllerEvent  
RegisterBrokerAndReelect  to be execute when zk session expired.

2.  Found out the reason why handleRequest blocked when zookeeper session 
Expired. 

 

 

 

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: controllerDeadLockAnalysis-2020-01-20.png, js.log, 
> js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
>  at 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-20 Thread leibo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019294#comment-17019294
 ] 

leibo edited comment on KAFKA-8532 at 1/21/20 1:44 AM:
---

[~junrao], [~yuzhih...@gmail.com]  Thank you for you reply,

1. I have tested the Exception in handleRequest(),  and as you a said , 
excpetions throws in handleRequest method will not block on 
countDownLatch.await().

2. I recheck the jstack log in attachment, and there is my analysis result as 
attachment *controllerDeadLockAnalysis-2020-01-20.png,* 
{code:java}
//代码占位符
private def reinitialize(): Unit = {
  // Initialization callbacks are invoked outside of the lock to avoid deadlock 
potential since their completion
  // may require additional Zookeeper requests, which will block to acquire the 
initialization lock
  stateChangeHandlers.values.foreach(callBeforeInitializingSession _)

  inWriteLock(initializationLock) {
//  if (!connectionState.isAlive) {   // 此处判断可能不准确
 if (!connectionState.isConnected) { // If use isConnected to 
determine whether need reinitialize zookeeper session, program will not skip 
this and to execute some operation that need zookeeper be connected.
  zooKeeper.close()
  info(s"Initializing a new session to $connectString.")
  // retry forever until ZooKeeper can be instantiated
  var connected = false
  while (!connected) {
try {
  zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, 
ZooKeeperClientWatcher)
  connected = true
} catch {
  case e: Exception =>
info("Error when recreating ZooKeeper, retrying after a short 
sleep", e)
Thread.sleep(1000)
}
  }
}
  }

  stateChangeHandlers.values.foreach(callAfterInitializingSession _)
}{code}
Zookeeper Code:
{code:java}
//代码占位符
@InterfaceAudience.Public
public enum States {
CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
CLOSED, AUTH_FAILED, NOT_CONNECTED;

public boolean isAlive() {
return this != CLOSED && this != AUTH_FAILED;   
}

/**
 * Returns whether we are connected to a server (which
 * could possibly be read-only, if this client is allowed
 * to go to read-only mode)
 * */
public boolean isConnected() {
return this == CONNECTED || this == CONNECTEDREADONLY;
}
}
{code}
I hope you pay attention on it.


was (Author: lbdai3190):
[~junrao], [~yuzhih...@gmail.com]  Thank you for you reply,

1. I have tested the Exception in handleRequest(),  and as you a said , 
excpetions throws in handleRequest method will not block on 
countDownLatch.await().

2. I recheck the jstack log in attachment, and there is my analysis result as 
attachment controllerDeadLockAnalysis-2020-01-20.png, 
{code:java}
//代码占位符
private def reinitialize(): Unit = {
  // Initialization callbacks are invoked outside of the lock to avoid deadlock 
potential since their completion
  // may require additional Zookeeper requests, which will block to acquire the 
initialization lock
  stateChangeHandlers.values.foreach(callBeforeInitializingSession _)

  inWriteLock(initializationLock) {
//  if (!connectionState.isAlive) {   // 此处判断可能不准确
 if (!connectionState.isConnected) { // If use isConnected to 
determine whether need reinitialize zookeeper session, program will not skip 
this and to execute some operation that need zookeeper be connected.
  zooKeeper.close()
  info(s"Initializing a new session to $connectString.")
  // retry forever until ZooKeeper can be instantiated
  var connected = false
  while (!connected) {
try {
  zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, 
ZooKeeperClientWatcher)
  connected = true
} catch {
  case e: Exception =>
info("Error when recreating ZooKeeper, retrying after a short 
sleep", e)
Thread.sleep(1000)
}
  }
}
  }

  stateChangeHandlers.values.foreach(callAfterInitializingSession _)
}{code}
Zookeeper Code:
{code:java}
//代码占位符
@InterfaceAudience.Public
public enum States {
CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
CLOSED, AUTH_FAILED, NOT_CONNECTED;

public boolean isAlive() {
return this != CLOSED && this != AUTH_FAILED;   
}

/**
 * Returns whether we are connected to a server (which
 * could possibly be read-only, if this client is allowed
 * to go to read-only mode)
 * */
public boolean isConnected() {
return this == CONNECTED || this == CONNECTEDREADONLY;
}
}
{code}
I hope you pay attention on it.

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
>   

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-20 Thread leibo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019294#comment-17019294
 ] 

leibo edited comment on KAFKA-8532 at 1/21/20 1:44 AM:
---

[~junrao], [~yuzhih...@gmail.com]  Thank you for you reply,

1. I have tested the Exception in handleRequest(),  and as you a said , 
excpetions throws in handleRequest method will not block on 
countDownLatch.await().

2. I recheck the jstack log in attachment, and there is my analysis result as 
*attachment* *controllerDeadLockAnalysis-2020-01-20.png,* 
{code:java}
//代码占位符
private def reinitialize(): Unit = {
  // Initialization callbacks are invoked outside of the lock to avoid deadlock 
potential since their completion
  // may require additional Zookeeper requests, which will block to acquire the 
initialization lock
  stateChangeHandlers.values.foreach(callBeforeInitializingSession _)

  inWriteLock(initializationLock) {
//  if (!connectionState.isAlive) {   // 此处判断可能不准确
 if (!connectionState.isConnected) { // If use isConnected to 
determine whether need reinitialize zookeeper session, program will not skip 
this and to execute some operation that need zookeeper be connected.
  zooKeeper.close()
  info(s"Initializing a new session to $connectString.")
  // retry forever until ZooKeeper can be instantiated
  var connected = false
  while (!connected) {
try {
  zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, 
ZooKeeperClientWatcher)
  connected = true
} catch {
  case e: Exception =>
info("Error when recreating ZooKeeper, retrying after a short 
sleep", e)
Thread.sleep(1000)
}
  }
}
  }

  stateChangeHandlers.values.foreach(callAfterInitializingSession _)
}{code}
Zookeeper Code:
{code:java}
//代码占位符
@InterfaceAudience.Public
public enum States {
CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
CLOSED, AUTH_FAILED, NOT_CONNECTED;

public boolean isAlive() {
return this != CLOSED && this != AUTH_FAILED;   
}

/**
 * Returns whether we are connected to a server (which
 * could possibly be read-only, if this client is allowed
 * to go to read-only mode)
 * */
public boolean isConnected() {
return this == CONNECTED || this == CONNECTEDREADONLY;
}
}
{code}
I hope you pay attention on it.


was (Author: lbdai3190):
[~junrao], [~yuzhih...@gmail.com]  Thank you for you reply,

1. I have tested the Exception in handleRequest(),  and as you a said , 
excpetions throws in handleRequest method will not block on 
countDownLatch.await().

2. I recheck the jstack log in attachment, and there is my analysis result as 
attachment *controllerDeadLockAnalysis-2020-01-20.png,* 
{code:java}
//代码占位符
private def reinitialize(): Unit = {
  // Initialization callbacks are invoked outside of the lock to avoid deadlock 
potential since their completion
  // may require additional Zookeeper requests, which will block to acquire the 
initialization lock
  stateChangeHandlers.values.foreach(callBeforeInitializingSession _)

  inWriteLock(initializationLock) {
//  if (!connectionState.isAlive) {   // 此处判断可能不准确
 if (!connectionState.isConnected) { // If use isConnected to 
determine whether need reinitialize zookeeper session, program will not skip 
this and to execute some operation that need zookeeper be connected.
  zooKeeper.close()
  info(s"Initializing a new session to $connectString.")
  // retry forever until ZooKeeper can be instantiated
  var connected = false
  while (!connected) {
try {
  zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, 
ZooKeeperClientWatcher)
  connected = true
} catch {
  case e: Exception =>
info("Error when recreating ZooKeeper, retrying after a short 
sleep", e)
Thread.sleep(1000)
}
  }
}
  }

  stateChangeHandlers.values.foreach(callAfterInitializingSession _)
}{code}
Zookeeper Code:
{code:java}
//代码占位符
@InterfaceAudience.Public
public enum States {
CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
CLOSED, AUTH_FAILED, NOT_CONNECTED;

public boolean isAlive() {
return this != CLOSED && this != AUTH_FAILED;   
}

/**
 * Returns whether we are connected to a server (which
 * could possibly be read-only, if this client is allowed
 * to go to read-only mode)
 * */
public boolean isConnected() {
return this == CONNECTED || this == CONNECTEDREADONLY;
}
}
{code}
I hope you pay attention on it.

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
>   

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-20 Thread leibo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019294#comment-17019294
 ] 

leibo edited comment on KAFKA-8532 at 1/21/20 1:43 AM:
---

[~junrao], [~yuzhih...@gmail.com]  Thank you for you reply,

1. I have tested the Exception in handleRequest(),  and as you a said , 
excpetions throws in handleRequest method will not block on 
countDownLatch.await().

2. I recheck the jstack log in attachment, and there is my analysis result as 
attachment controllerDeadLockAnalysis-2020-01-20.png, 
{code:java}
//代码占位符
private def reinitialize(): Unit = {
  // Initialization callbacks are invoked outside of the lock to avoid deadlock 
potential since their completion
  // may require additional Zookeeper requests, which will block to acquire the 
initialization lock
  stateChangeHandlers.values.foreach(callBeforeInitializingSession _)

  inWriteLock(initializationLock) {
//  if (!connectionState.isAlive) {   // 此处判断可能不准确
 if (!connectionState.isConnected) { // If use isConnected to 
determine whether need reinitialize zookeeper session, program will not skip 
this and to execute some operation that need zookeeper be connected.
  zooKeeper.close()
  info(s"Initializing a new session to $connectString.")
  // retry forever until ZooKeeper can be instantiated
  var connected = false
  while (!connected) {
try {
  zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, 
ZooKeeperClientWatcher)
  connected = true
} catch {
  case e: Exception =>
info("Error when recreating ZooKeeper, retrying after a short 
sleep", e)
Thread.sleep(1000)
}
  }
}
  }

  stateChangeHandlers.values.foreach(callAfterInitializingSession _)
}{code}
Zookeeper Code:
{code:java}
//代码占位符
@InterfaceAudience.Public
public enum States {
CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
CLOSED, AUTH_FAILED, NOT_CONNECTED;

public boolean isAlive() {
return this != CLOSED && this != AUTH_FAILED;   
}

/**
 * Returns whether we are connected to a server (which
 * could possibly be read-only, if this client is allowed
 * to go to read-only mode)
 * */
public boolean isConnected() {
return this == CONNECTED || this == CONNECTEDREADONLY;
}
}
{code}
I hope you pay attention on it.


was (Author: lbdai3190):
[~junrao], [~yuzhih...@gmail.com]  Thank you for you reply,

1. I have tested the Exception in handleRequest(),  and as you a said , 
excpetions throws in handleRequest method will not block on 
countDownLatch.await().

2. I recheck the jstack log in attachment, and there is my analysis result as 
attachment controllerDeadLockAnalysis-2020-01-20.png, 
{code:java}
//代码占位符
private def reinitialize(): Unit = {
  // Initialization callbacks are invoked outside of the lock to avoid deadlock 
potential since their completion
  // may require additional Zookeeper requests, which will block to acquire the 
initialization lock
  stateChangeHandlers.values.foreach(callBeforeInitializingSession _)

  inWriteLock(initializationLock) {
//  if (!connectionState.isAlive) {   // 此处判断可能不准确
 if (!connectionState.isConnected) { // If use isConnected to 
determine whether need reinitialize zookeeper session, program will not skip 
this and to execute some operation that need zookeeper be connected.
  zooKeeper.close()
  info(s"Initializing a new session to $connectString.")
  // retry forever until ZooKeeper can be instantiated
  var connected = false
  while (!connected) {
try {
  zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, 
ZooKeeperClientWatcher)
  connected = true
} catch {
  case e: Exception =>
info("Error when recreating ZooKeeper, retrying after a short 
sleep", e)
Thread.sleep(1000)
}
  }
}
  }

  stateChangeHandlers.values.foreach(callAfterInitializingSession _)
}{code}
Zookeeper Code:
{code:java}
//代码占位符
@InterfaceAudience.Public
public enum States {
CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
CLOSED, AUTH_FAILED, NOT_CONNECTED;

public boolean isAlive() {
return this != CLOSED && this != AUTH_FAILED;   // 此处判断不准确
}

/**
 * Returns whether we are connected to a server (which
 * could possibly be read-only, if this client is allowed
 * to go to read-only mode)
 * */
public boolean isConnected() {
return this == CONNECTED || this == CONNECTEDREADONLY;
}
}
{code}
I hope you pay attention on it.

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-20 Thread leibo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019294#comment-17019294
 ] 

leibo edited comment on KAFKA-8532 at 1/21/20 1:43 AM:
---

[~junrao], [~yuzhih...@gmail.com]  Thank you for you reply,

1. I have tested the Exception in handleRequest(),  and as you a said , 
excpetions throws in handleRequest method will not block on 
countDownLatch.await().

2. I recheck the jstack log in attachment, and there is my analysis result as 
attachment controllerDeadLockAnalysis-2020-01-20.png, 
{code:java}
//代码占位符
private def reinitialize(): Unit = {
  // Initialization callbacks are invoked outside of the lock to avoid deadlock 
potential since their completion
  // may require additional Zookeeper requests, which will block to acquire the 
initialization lock
  stateChangeHandlers.values.foreach(callBeforeInitializingSession _)

  inWriteLock(initializationLock) {
//  if (!connectionState.isAlive) {   // 此处判断可能不准确
 if (!connectionState.isConnected) { // If use isConnected to 
determine whether need reinitialize zookeeper session, program will not skip 
this and to execute some operation that need zookeeper be connected.
  zooKeeper.close()
  info(s"Initializing a new session to $connectString.")
  // retry forever until ZooKeeper can be instantiated
  var connected = false
  while (!connected) {
try {
  zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, 
ZooKeeperClientWatcher)
  connected = true
} catch {
  case e: Exception =>
info("Error when recreating ZooKeeper, retrying after a short 
sleep", e)
Thread.sleep(1000)
}
  }
}
  }

  stateChangeHandlers.values.foreach(callAfterInitializingSession _)
}{code}
Zookeeper Code:
{code:java}
//代码占位符
@InterfaceAudience.Public
public enum States {
CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
CLOSED, AUTH_FAILED, NOT_CONNECTED;

public boolean isAlive() {
return this != CLOSED && this != AUTH_FAILED;   // 此处判断不准确
}

/**
 * Returns whether we are connected to a server (which
 * could possibly be read-only, if this client is allowed
 * to go to read-only mode)
 * */
public boolean isConnected() {
return this == CONNECTED || this == CONNECTEDREADONLY;
}
}
{code}
I hope you pay attention on it.


was (Author: lbdai3190):
[~junrao], [~yuzhih...@gmail.com]  Thank you for you reply,

1. I have tested the Exception in handleRequest(),  and as you a said , 
excpetions throws in handleRequest method will not block on 
countDownLatch.await().

2. I recheck the jstack log in attachment, and there is my analysis result as 
attachment controllerDeadLockAnalysis-2020-01-20.png,  I hope you pay attention 
on it.

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: controllerDeadLockAnalysis-2020-01-20.png, js.log, 
> js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-20 Thread leibo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019294#comment-17019294
 ] 

leibo edited comment on KAFKA-8532 at 1/20/20 8:28 AM:
---

[~junrao], [~yuzhih...@gmail.com]  Thank you for you reply,

1. I have tested the Exception in handleRequest(),  and as you a said , 
excpetions throws in handleRequest method will not block on 
countDownLatch.await().

2. I recheck the jstack log in attachment, and there is my analysis result as 
attachment controllerDeadLockAnalysis-2020-01-20.png,  I hope you pay attention 
on it.


was (Author: lbdai3190):
[~junrao] Thank you for you reply,

1. I have tested the Exception in handleRequest(),  and as you a said , 
excpetions throws in handleRequest method will not block on 
countDownLatch.await().

2. I recheck the jstack log in attachment, and there is my analysis result as 
attachment controllerDeadLockAnalysis-2020-01-20.png,  I hope you pay attention 
on it.

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: controllerDeadLockAnalysis-2020-01-20.png, js.log, 
> js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Locked ownable synchronizers:
>  - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker)
> "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 
> nid=0x310 waiting on condition [0x7fccb55c8000]
>  

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-20 Thread leibo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019294#comment-17019294
 ] 

leibo edited comment on KAFKA-8532 at 1/20/20 8:26 AM:
---

[~junrao] Thank you for you reply,

1. I have tested the Exception in handleRequest(),  and as you a said , 
excpetions throws in handleRequest method will not block on 
countDownLatch.await().

2. I recheck the jstack log in attachment, and there is my analysis result as 
attachment controllerDeadLockAnalysis-2020-01-20.png,  I hope you pay attention 
on it.


was (Author: lbdai3190):
[~junrao] Thank you for you reply,

1. I have tested the Exception in handleRequest(),  and as you a said , 
excpetions throws in handleRequest will not block on countDownLatch.await().

2. I recheck the jstack log in attachment, and there is my analysis result as 
attachment controllerDeadLockAnalysis-2020-01-20.png, 

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: controllerDeadLockAnalysis-2020-01-20.png, js.log, 
> js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Locked ownable synchronizers:
>  - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker)
> "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 
> nid=0x310 waiting on condition [0x7fccb55c8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-20 Thread leibo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019294#comment-17019294
 ] 

leibo edited comment on KAFKA-8532 at 1/20/20 8:21 AM:
---

[~junrao] Thank you for you reply,

1. I have tested the Exception in handleRequest(),  and as you a said , 
excpetions throws in handleRequest will not block on countDownLatch.await().

2. I recheck the jstack log in attachment, and there is my analysis result as 
attachment controllerDeadLockAnalysis-2020-01-20.png, 


was (Author: lbdai3190):
[~junrao] Thank you for you reply,

1. I have tested the Exception in handleRequest(),  and as you a said , 
excpetions throws in handleRequest will not block on countDownLatch.await().

2. I recheck the jstack log in attachment, and there is my analysis result as 
attachment controllerDeadLockAnalysis2020-01-20.png, 

!controllerDeadLockAnalysis-2020-01-20.png!

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: controllerDeadLockAnalysis-2020-01-20.png, js.log, 
> js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Locked ownable synchronizers:
>  - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker)
> "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 
> nid=0x310 waiting on condition [0x7fccb55c8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-20 Thread leibo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019294#comment-17019294
 ] 

leibo edited comment on KAFKA-8532 at 1/20/20 8:20 AM:
---

[~junrao] Thank you for you reply,

1. I have tested the Exception in handleRequest(),  and as you a said , 
excpetions throws in handleRequest will not block on countDownLatch.await().

2. I recheck the jstack log in attachment, and there is my analysis result as 
attachment controllerDeadLockAnalysis2020-01-20.png, 

!controllerDeadLockAnalysis-2020-01-20.png!


was (Author: lbdai3190):
[~junrao] Thank you for you reply,

1. I have tested the Exception in handleRequest(),  and as you a said , 
excpetions throws in handleRequest will not block on countDownLatch.await().

2. I recheck the jstack log in attachment, and there is my analysis result as 
attachment controllerDeadLockAnalysis2020-01-20.png, 
!controllerDeadLockAnalysis2020-01-20.png!

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: controllerDeadLockAnalysis-2020-01-20.png, js.log, 
> js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Locked ownable synchronizers:
>  - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker)
> "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 
> nid=0x310 waiting on condition [0x7fccb55c8000]
>  java.lang.Thread.State: WAITING 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-17 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018368#comment-17018368
 ] 

Ted Yu edited comment on KAFKA-8532 at 1/18/20 2:26 AM:


Looking at js2.log , I am not sure deadlock was observed. Maybe 
handleRequests() took very long to execute (yet to be verified by whether there 
was exception in this code path from server log).
I wonder if we can utilize the following form of await:
{code}
public boolean await(long timeout, TimeUnit unit)
{code}
so that the execution time of handleRequests() can be bounded.
{code}
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala 
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 6a0809e16..32e4380c0 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -165,7 +165,7 @@ class ZooKeeperClient(connectString: String,
   countDownLatch.countDown()
 }
   }
-  countDownLatch.await()
+  countDownLatch.await(sessionTimeoutMs, TimeUnit.MILLISECONDS)
   responseQueue.asScala.toBuffer
 }
   }
{code}


was (Author: yuzhih...@gmail.com):
Looking at js2.log , I am not sure deadlock was observed. Maybe 
handleRequests() took very long to execute (yet to be verified by whether there 
was exception in this code path from server log).
I wonder if we can utilize the following form of await:
{code}
public boolean await(long timeout, TimeUnit unit)
{code}
so that the execution time of handleRequests() can be bounded.
{code}
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala 
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 6a0809e16..32e4380c0 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -165,7 +165,7 @@ class ZooKeeperClient(connectString: String,
   countDownLatch.countDown()
 }
   }
-  countDownLatch.await()
+  countDownLatch.await(connectionTimeoutMs, TimeUnit.MILLISECONDS)
   responseQueue.asScala.toBuffer
 }
   }
{code}

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-17 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018368#comment-17018368
 ] 

Ted Yu edited comment on KAFKA-8532 at 1/18/20 2:22 AM:


Looking at js2.log , I am not sure deadlock was observed. Maybe 
handleRequests() took very long to execute (yet to be verified by whether there 
was exception in this code path from server log).
I wonder if we can utilize the following form of await:
{code}
public boolean await(long timeout, TimeUnit unit)
{code}
so that the execution time of handleRequests() can be bounded.
{code}
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala 
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 6a0809e16..32e4380c0 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -165,7 +165,7 @@ class ZooKeeperClient(connectString: String,
   countDownLatch.countDown()
 }
   }
-  countDownLatch.await()
+  countDownLatch.await(connectionTimeoutMs, TimeUnit.MILLISECONDS)
   responseQueue.asScala.toBuffer
 }
   }
{code}


was (Author: yuzhih...@gmail.com):
Looking at js2.log , I am not sure deadlock was observed. Maybe 
handleRequests() took very long to execute (yet to be verified by whether there 
was exception in this code path from server log).
I wonder if we can utilize the following form of await:
```
public boolean await(long timeout, TimeUnit unit)
```
so that the execution time of handleRequests() can be bounded.
```
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala 
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 6a0809e16..32e4380c0 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -165,7 +165,7 @@ class ZooKeeperClient(connectString: String,
   countDownLatch.countDown()
 }
   }
-  countDownLatch.await()
+  countDownLatch.await(connectionTimeoutMs, TimeUnit.MILLISECONDS)
   responseQueue.asScala.toBuffer
 }
   }
```

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-17 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018368#comment-17018368
 ] 

Ted Yu edited comment on KAFKA-8532 at 1/18/20 2:21 AM:


Looking at js2.log , I am not sure deadlock was observed. Maybe 
handleRequests() took very long to execute (yet to be verified by whether there 
was exception in this code path from server log).
I wonder if we can utilize the following form of await:
```
public boolean await(long timeout, TimeUnit unit)
```
so that the execution time of handleRequests() can be bounded.
```
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala 
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 6a0809e16..32e4380c0 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -165,7 +165,7 @@ class ZooKeeperClient(connectString: String,
   countDownLatch.countDown()
 }
   }
-  countDownLatch.await()
+  countDownLatch.await(connectionTimeoutMs, TimeUnit.MILLISECONDS)
   responseQueue.asScala.toBuffer
 }
   }
```


was (Author: yuzhih...@gmail.com):
Looking at js2.log , I am not sure deadlock was observed. Maybe 
handleRequests() took very long to execute (yet to be verified by whether there 
was exception in this code path from server log).
I wonder if we can utilize the following form of await:
```
public boolean await(long timeout, TimeUnit unit)
```
so that the execution time of handleRequests() can be bounded.

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-17 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018368#comment-17018368
 ] 

Ted Yu edited comment on KAFKA-8532 at 1/17/20 10:42 PM:
-

Looking at js2.log , I am not sure deadlock was observed. Maybe 
handleRequests() took very long to execute (yet to be verified by whether there 
was exception in this code path from server log).
I wonder if we can utilize the following form of await:
```
public boolean await(long timeout, TimeUnit unit)
```
so that the execution time of handleRequests() can be bounded.


was (Author: yuzhih...@gmail.com):
Looking at js2.log , I am not sure deadlock was observed. Maybe 
handleRequests() took very long to execute.
I wonder if we can utilize the following form of await:
```
public boolean await(long timeout, TimeUnit unit)
```
so that the execution time of handleRequests() can be bounded.

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Locked ownable synchronizers:
>  - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker)
> "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 
> nid=0x310 waiting on condition [0x7fccb55c8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-17 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018152#comment-17018152
 ] 

Ted Yu edited comment on KAFKA-8532 at 1/17/20 4:13 PM:


How about making the following change ?
{code}
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala 
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 7b995931f..6a0809e16 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -158,14 +158,11 @@ class ZooKeeperClient(connectString: String,
   inReadLock(initializationLock) {
 send(request) { response =>
   responseQueue.add(response)
-  inFlightRequests.release()
-  countDownLatch.countDown()
 }
   }
-} catch {
-  case e: Throwable =>
-inFlightRequests.release()
-throw e
+   } finally {
+  inFlightRequests.release()
+  countDownLatch.countDown()
 }
   }
   countDownLatch.await()
{code}
countDownLatch is handled consistently with inFlightRequests.

I have run through core:test which passed.

I can send out a PR.


was (Author: yuzhih...@gmail.com):
How about making the following change ?
{code}
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala 
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 7b995931f..6a0809e16 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -158,14 +158,11 @@ class ZooKeeperClient(connectString: String,
   inReadLock(initializationLock) {
 send(request) { response =>
   responseQueue.add(response)
-  inFlightRequests.release()
-  countDownLatch.countDown()
 }
   }
-} catch {
-  case e: Throwable =>
-inFlightRequests.release()
-throw e
+   } finally {
+  inFlightRequests.release()
+  countDownLatch.countDown()
 }
   }
   countDownLatch.await()
{code}
countDownLatch is handled consistently with inFlightRequests.

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2019-09-17 Thread leibo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932042#comment-16932042
 ] 

leibo edited comment on KAFKA-8532 at 9/18/19 4:27 AM:
---

[~junrao], [~ijuma]

My analysis: 

      When kafka disconnected with zookeeper,  Controller-event-thread is 
handing ISRChangeNotification event,  because kafka is disconnected with zk,  
controller event handle is blocked on retryRequestUntilConnected -> 
handleRequest method.

     At this time, zk-session-expired-handle-thread is try to reinitialize a 
new zk connection with zookeeper, it put a Controller event named Expire to 
ControllerEventManager LinkedBlockingQueue queue, due to 
controller-event-thread is blocked on ISRChangeNotification event, the Expire 
event will not be handle by ControllerEventThread.

    The problem is obviously. ISRChangeNotification  and Expire are block each 
other.

The problem is here:
{code:java}
//代码占位符
def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] 
= {
  if (requests.isEmpty)
Seq.empty
  else {
val countDownLatch = new CountDownLatch(requests.size)
val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)

requests.foreach { request =>
  inFlightRequests.acquire()
  try {
inReadLock(initializationLock) {
  send(request) { response =>
responseQueue.add(response)
inFlightRequests.release()
countDownLatch.countDown()
  }
}
  } catch {
case e: Throwable =>
  inFlightRequests.release()
  throw e
  }
}
countDownLatch.await()
responseQueue.asScala.toBuffer
  }
}
{code}
we can see, if exception is occurred,  countDownLatch.countDown() will not be 
execute, and this method will always blocked here.


was (Author: lbdai3190):
[~junrao], [~ijuma]

My analysis: 

      When kafka disconnected with zookeeper,  Controller-event-thread is 
handing ISRChangeNotification event,  because kafka is disconnected with zk,  
controller event handle is blocked on retryRequestUntilConnected -> 
handleRequest method.

     At this time, zk-session-expired-handle-thread is try to reinitialize a 
new zk connection with zookeeper, it put a Controller event named Expire to 
ControllerEventManager LinkedBlockingQueue queue, due to 
controller-event-thread is blocked on ISRChangeNotification event, the Expire 
event will not be handle.

    The problem is obviously. ISRChangeNotification  and Expire are block each 
other.

The problem is here:
{code:java}
//代码占位符
def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] 
= {
  if (requests.isEmpty)
Seq.empty
  else {
val countDownLatch = new CountDownLatch(requests.size)
val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)

requests.foreach { request =>
  inFlightRequests.acquire()
  try {
inReadLock(initializationLock) {
  send(request) { response =>
responseQueue.add(response)
inFlightRequests.release()
countDownLatch.countDown()
  }
}
  } catch {
case e: Throwable =>
  inFlightRequests.release()
  throw e
  }
}
countDownLatch.await()
responseQueue.asScala.toBuffer
  }
}
{code}
we can see, if exception is occurred,  countDownLatch.countDown() will not be 
execute, and this method will always blocked here.

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2019-09-17 Thread leibo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932042#comment-16932042
 ] 

leibo edited comment on KAFKA-8532 at 9/18/19 4:26 AM:
---

[~junrao], [~ijuma]

My analysis: 

      When kafka disconnected with zookeeper,  Controller-event-thread is 
handing ISRChangeNotification event,  because kafka is disconnected with zk,  
controller event handle is blocked on retryRequestUntilConnected -> 
handleRequest method.

     At this time, zk-session-expired-handle-thread is try to reinitialize a 
new zk connection with zookeeper, it put a Controller event named Expire to 
ControllerEventManager LinkedBlockingQueue queue, due to 
controller-event-thread is blocked on ISRChangeNotification event, the Expire 
event will not be handle.

    The problem is obviously. ISRChangeNotification  and Expire are block each 
other.

The problem is here:
{code:java}
//代码占位符
def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] 
= {
  if (requests.isEmpty)
Seq.empty
  else {
val countDownLatch = new CountDownLatch(requests.size)
val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)

requests.foreach { request =>
  inFlightRequests.acquire()
  try {
inReadLock(initializationLock) {
  send(request) { response =>
responseQueue.add(response)
inFlightRequests.release()
countDownLatch.countDown()
  }
}
  } catch {
case e: Throwable =>
  inFlightRequests.release()
  throw e
  }
}
countDownLatch.await()
responseQueue.asScala.toBuffer
  }
}
{code}
we can see, if exception is occurred,  countDownLatch.countDown() will not be 
execute, and this method will always blocked here.


was (Author: lbdai3190):
[~junrao], [~ijuma]

My analysis: 

      When kafka disconnected with zookeeper,  Controller-event-thread is 
handing ISRChangeNotification event,  because kafka is disconnected with zk,  
controller event handle is blocked on retryRequestUntilConnected -> 
handleRequest method.

     At this time, zk-session-expired-handle-thread is try to reinitialize a 
new zk connection with zookeeper, it put a Controller event named 

Expire to ControllerEventManager LinkedBlockingQueue queue, due to 
controller-event-thread is blocked on ISRChangeNotification event, the Expire 
event will not be handle.

    The problem is obviously. ISRChangeNotification  and Expire are block each 
other.

The problem is here:
{code:java}
//代码占位符
def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] 
= {
  if (requests.isEmpty)
Seq.empty
  else {
val countDownLatch = new CountDownLatch(requests.size)
val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)

requests.foreach { request =>
  inFlightRequests.acquire()
  try {
inReadLock(initializationLock) {
  send(request) { response =>
responseQueue.add(response)
inFlightRequests.release()
countDownLatch.countDown()
  }
}
  } catch {
case e: Throwable =>
  inFlightRequests.release()
  throw e
  }
}
countDownLatch.await()
responseQueue.asScala.toBuffer
  }
}
{code}
we can see, if exception is occurred,  countDownLatch.countDown() will not be 
execute, and this method will always blocked here.

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2019-09-17 Thread leibo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932042#comment-16932042
 ] 

leibo edited comment on KAFKA-8532 at 9/18/19 4:26 AM:
---

[~junrao], [~ijuma]

My analysis: 

      When kafka disconnected with zookeeper,  Controller-event-thread is 
handing ISRChangeNotification event,  because kafka is disconnected with zk,  
controller event handle is blocked on retryRequestUntilConnected -> 
handleRequest method.

     At this time, zk-session-expired-handle-thread is try to reinitialize a 
new zk connection with zookeeper, it put a Controller event named 

Expire to ControllerEventManager LinkedBlockingQueue queue, due to 
controller-event-thread is blocked on ISRChangeNotification event, the Expire 
event will not be handle.

    The problem is obviously. ISRChangeNotification  and Expire are block each 
other.

The problem is here:
{code:java}
//代码占位符
def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] 
= {
  if (requests.isEmpty)
Seq.empty
  else {
val countDownLatch = new CountDownLatch(requests.size)
val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)

requests.foreach { request =>
  inFlightRequests.acquire()
  try {
inReadLock(initializationLock) {
  send(request) { response =>
responseQueue.add(response)
inFlightRequests.release()
countDownLatch.countDown()
  }
}
  } catch {
case e: Throwable =>
  inFlightRequests.release()
  throw e
  }
}
countDownLatch.await()
responseQueue.asScala.toBuffer
  }
}
{code}
we can see, if exception is occurred,  countDownLatch.countDown() will not be 
execute, and this method will always blocked here.


was (Author: lbdai3190):
[~junrao], [~ijuma]

My analysis: 

      When kafka disconnected with zookeeper,  Controller-event-thread is 
handing ISRChangeNotification event,  because kafka is disconnected with zk,  
controller event handle is blocked on retryRequestUntilConnected -> 
handleRequest method.

     At this time, zk-session-expired-handle thread is try to reinitialize a 
new zk connection with zookeeper, it put a Controller event named 

Expire to ControllerEventManager LinkedBlockingQueue queue, due to 
controller-event-thread is blocked on ISRChangeNotification event, the Expire 
event will not be handle.

    The problem is obviously. ISRChangeNotification  and Expire are block each 
other.

The problem is here:
{code:java}
//代码占位符
def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] 
= {
  if (requests.isEmpty)
Seq.empty
  else {
val countDownLatch = new CountDownLatch(requests.size)
val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)

requests.foreach { request =>
  inFlightRequests.acquire()
  try {
inReadLock(initializationLock) {
  send(request) { response =>
responseQueue.add(response)
inFlightRequests.release()
countDownLatch.countDown()
  }
}
  } catch {
case e: Throwable =>
  inFlightRequests.release()
  throw e
  }
}
countDownLatch.await()
responseQueue.asScala.toBuffer
  }
}
{code}
we can see, if exception is occurred,  countDownLatch.countDown() will not be 
execute, and this method will always blocked here.

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2019-09-17 Thread leibo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932042#comment-16932042
 ] 

leibo edited comment on KAFKA-8532 at 9/18/19 4:24 AM:
---

[~junrao]

My analysis: 

      When kafka disconnected with zookeeper,  Controller-event-thread is 
handing ISRChangeNotification event,  because kafka is disconnected with zk,  
controller event handle is blocked on retryRequestUntilConnected -> 
handleRequest method.

     At this time, zk-session-expired-handle thread is try to reinitialize a 
new zk connection with zookeeper, it put a Controller event named 

Expire to ControllerEventManager LinkedBlockingQueue queue, due to 
controller-event-thread is blocked on ISRChangeNotification event, the Expire 
event will not be handle.

    The problem is obviously. ISRChangeNotification  and Expire are block each 
other.

The problem is here:
{code:java}
//代码占位符
def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] 
= {
  if (requests.isEmpty)
Seq.empty
  else {
val countDownLatch = new CountDownLatch(requests.size)
val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)

requests.foreach { request =>
  inFlightRequests.acquire()
  try {
inReadLock(initializationLock) {
  send(request) { response =>
responseQueue.add(response)
inFlightRequests.release()
countDownLatch.countDown()
  }
}
  } catch {
case e: Throwable =>
  inFlightRequests.release()
  throw e
  }
}
countDownLatch.await()
responseQueue.asScala.toBuffer
  }
}
{code}
we can see, if exception is occurred,  countDownLatch.countDown() will not be 
execute, and this method will always blocked here.


was (Author: lbdai3190):
My analysis: 

      When kafka disconnected with zookeeper,  Controller-event-thread is 
handing ISRChangeNotification event,  because kafka is disconnected with zk,  
controller event handle is blocked on retryRequestUntilConnected -> 
handleRequest method.

     At this time, zk-session-expired-handle thread is try to reinitialize a 
new zk connection with zookeeper, it put a Controller event named 

Expire to ControllerEventManager LinkedBlockingQueue queue, due to 
controller-event-thread is blocked on ISRChangeNotification event, the Expire 
event will not be handle.

    The problem is obviously. ISRChangeNotification  and Expire are block each 
other.

The problem is here:
{code:java}
//代码占位符
def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] 
= {
  if (requests.isEmpty)
Seq.empty
  else {
val countDownLatch = new CountDownLatch(requests.size)
val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)

requests.foreach { request =>
  inFlightRequests.acquire()
  try {
inReadLock(initializationLock) {
  send(request) { response =>
responseQueue.add(response)
inFlightRequests.release()
countDownLatch.countDown()
  }
}
  } catch {
case e: Throwable =>
  inFlightRequests.release()
  throw e
  }
}
countDownLatch.await()
responseQueue.asScala.toBuffer
  }
}
{code}
we can see, if exception is occurred,  countDownLatch.countDown() will not be 
execute, and this method will always blocked here.

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2019-09-17 Thread leibo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932042#comment-16932042
 ] 

leibo edited comment on KAFKA-8532 at 9/18/19 4:25 AM:
---

[~junrao], [~ijuma]

My analysis: 

      When kafka disconnected with zookeeper,  Controller-event-thread is 
handing ISRChangeNotification event,  because kafka is disconnected with zk,  
controller event handle is blocked on retryRequestUntilConnected -> 
handleRequest method.

     At this time, zk-session-expired-handle thread is try to reinitialize a 
new zk connection with zookeeper, it put a Controller event named 

Expire to ControllerEventManager LinkedBlockingQueue queue, due to 
controller-event-thread is blocked on ISRChangeNotification event, the Expire 
event will not be handle.

    The problem is obviously. ISRChangeNotification  and Expire are block each 
other.

The problem is here:
{code:java}
//代码占位符
def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] 
= {
  if (requests.isEmpty)
Seq.empty
  else {
val countDownLatch = new CountDownLatch(requests.size)
val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)

requests.foreach { request =>
  inFlightRequests.acquire()
  try {
inReadLock(initializationLock) {
  send(request) { response =>
responseQueue.add(response)
inFlightRequests.release()
countDownLatch.countDown()
  }
}
  } catch {
case e: Throwable =>
  inFlightRequests.release()
  throw e
  }
}
countDownLatch.await()
responseQueue.asScala.toBuffer
  }
}
{code}
we can see, if exception is occurred,  countDownLatch.countDown() will not be 
execute, and this method will always blocked here.


was (Author: lbdai3190):
[~junrao]

My analysis: 

      When kafka disconnected with zookeeper,  Controller-event-thread is 
handing ISRChangeNotification event,  because kafka is disconnected with zk,  
controller event handle is blocked on retryRequestUntilConnected -> 
handleRequest method.

     At this time, zk-session-expired-handle thread is try to reinitialize a 
new zk connection with zookeeper, it put a Controller event named 

Expire to ControllerEventManager LinkedBlockingQueue queue, due to 
controller-event-thread is blocked on ISRChangeNotification event, the Expire 
event will not be handle.

    The problem is obviously. ISRChangeNotification  and Expire are block each 
other.

The problem is here:
{code:java}
//代码占位符
def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] 
= {
  if (requests.isEmpty)
Seq.empty
  else {
val countDownLatch = new CountDownLatch(requests.size)
val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)

requests.foreach { request =>
  inFlightRequests.acquire()
  try {
inReadLock(initializationLock) {
  send(request) { response =>
responseQueue.add(response)
inFlightRequests.release()
countDownLatch.countDown()
  }
}
  } catch {
case e: Throwable =>
  inFlightRequests.release()
  throw e
  }
}
countDownLatch.await()
responseQueue.asScala.toBuffer
  }
}
{code}
we can see, if exception is occurred,  countDownLatch.countDown() will not be 
execute, and this method will always blocked here.

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2019-06-17 Thread leibo (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865266#comment-16865266
 ] 

leibo edited comment on KAFKA-8532 at 6/17/19 11:15 AM:


[~ijuma] I have tried to reproduced this ,but failed,  it will appear after i 
restart kafka cluster a few days.

The following info may be helpful:
 # We are running kafka cluster into docker container managed by kubernetes.
 # Until now, every time this issue occurred, we found the zookeeper container 
CPU are set to 1C while the host have more than 50C CPU.
 # When we set to zookeeper container 1C cpu(The host have more than 50C cpu),  
due to docker's problem, the zookeeper running in docker container will have 
more than 38 GC work threads(According to JVM official document, 1C cpu should 
correspond 2 GC work threads), these gc work thread may slow down zookeeper , 
and maybe let kafka and zookeeper session timed out.
 #  After we update zookeeper jvm *-XX:ParallelGCThreads=4* , this issue is not 
occurred yet, but I think this Optimizing measure is just reduce the 
probability of this problem, not solved it.

According to above info, I think when this problem occurred , the zookeeper is 
not running well, and kafka cluster deadlock occurred, but after some time, 
zookeeper recover normal running, kafka is not .


was (Author: lbdai3190):
[~ijuma] I have tried to reproduced this ,but failed,  it will appear after i 
restart kafka cluster a few days.

The following info may be helpful:
 # We are running kafka cluster in to docker container managed by kubernetes.
 # Until now, every time this issue occurred, we found the zookeeper container 
CPU are set to 1C while the host have more than 50C CPU.
 # When we set to zookeeper container 1C cpu(The host have more than 50C cpu),  
due to docker's problem, the zookeeper running in docker container will have 
more than 38 GC work threads(According to JVM official document, 1C cpu should 
correspond 2 GC work threads), these gc work thread may slow down zookeeper , 
and maybe let kafka and zookeeper session timed out.
 #  After we update zookeeper jvm *-XX:ParallelGCThreads=4* , this issue is not 
occurred yet, but I think this Optimizing measure is just reduce the 
probability of this problem, not solved it.

According to above info, I think when this problem occurred , the zookeeper is 
not running well, and kafka cluster deadlock occurred, but after some time, 
zookeeper recover normal running, kafka is not .

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2019-06-16 Thread leibo (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865266#comment-16865266
 ] 

leibo edited comment on KAFKA-8532 at 6/17/19 2:53 AM:
---

[~ijuma] I have tried to reproduced this ,but failed,  it will appear after i 
restart kafka cluster a few days.

The following info may be helpful:
 # We are running kafka cluster in to docker container managed by kubernetes.
 # Until now, every time this issue occurred, we found the zookeeper container 
CPU are set to 1C while the host have more than 50C CPU.
 # When we set to zookeeper container 1C cpu(The host have more than 50C cpu),  
due to docker's problem, the zookeeper running in docker container will have 
more than 38 GC work threads(According to JVM official document, 1C cpu should 
correspond 2 GC work threads), these gc work thread may slow down zookeeper , 
and maybe let kafka and zookeeper session timed out.
 #  After we update zookeeper jvm *-XX:ParallelGCThreads=4* , this issue is not 
occurred yet, but I think this Optimizing measure is just reduce the 
probability of this problem, not solved it.

According to above info, I think when this problem occurred , the zookeeper is 
not running well, and kafka cluster deadlock occurred, but after some time, 
zookeeper recover normal running, kafka is not .


was (Author: lbdai3190):
[~ijuma] I have tried to reproduced this ,but failed,  it will appear after i 
restart kafka cluster a few days.

The following info may be helpful:
 # We are running kafka cluster in to docker container managed by kubernetes.
 # Until now, every time this issue occurred, we found the zookeeper container 
CPU are set to 1C while the host have more than 50C CPU.
 # When we set to zookeeper container 1C cpu(The host have more than 50C cpu),  
due to docker's problem, the zookeeper running in docker container will have 
more than 38 GC work threads(According to JVM official document, 1C cpu should 
correspond 2 GC work threads), these gc work thread may slow down zookeeper , 
and maybe let kafka and zookeeper session timed out.
 #  After we update zookeeper jvm *-XX:ParallelGCThreads=4* , this issue is not 
occurred yet, but I think this Optimizing measure is just reduce the 
probability of this problem, not solved it.

According to above info, I think when this problem occurred , the zookeeper is 
not running well, and kafka cluster deadlock occurred, but after some time, 
zookeeper recovery normal running, kafka is not .

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2019-06-13 Thread leibo (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862668#comment-16862668
 ] 

leibo edited comment on KAFKA-8532 at 6/14/19 2:59 AM:
---

Here is some analysis of mine according to the jstack log and source code:

When the deadlock occurred,
 # The zk-session-expiry-handler0 thread is reinitializing a new zookeeper 
instance, the expireEvent was put to queue of ControllerEventManager and 
waitting to handle by controller-event-handle thread. (*In the time, kafka is 
disconnected with zookeeper yet*).
 # However , in the same time, the controller-event-thread is handling 
RegisterBrokerAndReelect(type of ControllerEvent), as follows:

{code:java}
//KafkaController.scala
case object RegisterBrokerAndReelect extends ControllerEvent {
  override def state: ControllerState = ControllerState.ControllerChange

  override def process(): Unit = {
zkClient.registerBroker(brokerInfo)
Reelect.process()
  }
}
{code}
In this process,  it is blocked at ZookeeperClient.handleRequests, code as 
follows:
{code:java}
//代码占位符
def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] 
= {
  if (requests.isEmpty)
Seq.empty
  else {
val countDownLatch = new CountDownLatch(requests.size)
val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)

requests.foreach { request =>
  inFlightRequests.acquire()
  try {
inReadLock(initializationLock) {
  send(request) { response =>
responseQueue.add(response)
inFlightRequests.release()
countDownLatch.countDown()
  }
}
  } catch {
case e: Throwable =>
  inFlightRequests.release()
  throw e
  }
}
countDownLatch.await()
responseQueue.asScala.toBuffer
  }
}
{code}
At this time, Zookeeper server is running normally, and all function is working 
well.

I suspect the key point is the *initializationLock* , because if isReadLock is 
success, it will throw exception when kafka disconnected with zookeeper, but we 
never see any exception log in the kafka server.log

controller-event-thread can not doing inReadLock(initializationLock) perhaps 
due to writeLock was not been release.

There are tow way to donging inWriteLock(initailizationLock):

 1.  close():
{code:java}
expiryScheduler.startup()
try waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS)
catch {
  case e: Throwable =>
close() //此处close的时候,加写锁
throw e
}

def close(): Unit = {
  info("Closing.")
  inWriteLock(initializationLock) {
zNodeChangeHandlers.clear()
zNodeChildChangeHandlers.clear()
stateChangeHandlers.clear()
zooKeeper.close()
metricNames.foreach(removeMetric(_))
  }
  // Shutdown scheduler outside of lock to avoid deadlock if scheduler
  // is waiting for lock to process session expiry
  expiryScheduler.shutdown()
  info("Closed.")
}
{code}
2.  reinitialize()
{code:java}
private def reinitialize(): Unit = {
  // Initialization callbacks are invoked outside of the lock to avoid deadlock 
potential since their completion
  // may require additional Zookeeper requests, which will block to acquire the 
initialization lock
  stateChangeHandlers.values.foreach(callBeforeInitializingSession _)  //  
zk-session-expiry-handler0 is blocked here

  inWriteLock(initializationLock) {//  program is not running here
if (!connectionState.isAlive) {
  zooKeeper.close()
  info(s"Initializing a new session to $connectString.")
  // retry forever until ZooKeeper can be instantiated
  var connected = false
  while (!connected) {
try {
  zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, 
ZooKeeperClientWatcher)
  connected = true
} catch {
  case e: Exception =>
info("Error when recreating ZooKeeper, retrying after a short 
sleep", e)
Thread.sleep(1000)
}
  }
}
  }

  stateChangeHandlers.values.foreach(callAfterInitializingSession _)
}
{code}
so i think the problem is occurred in function close()

 


was (Author: lbdai3190):
Here is some analysis of mine according to the jstack log and source code:

When the deadlock occurred,
 # The zk-session-expiry-handler0 thread is reinitializing a new zookeeper 
instance, the expireEvent was put to queue of ControllerEventManager and 
waitting to handle by controller-event-handle thread. (*In the time, kafka is 
disconnected with zookeeper yet*).
 # However , in the same time, the controller-event-thread is handling 
Startup(type of ControllerEvent), as follows:

{code:java}
//KafkaController.scala
case object Startup extends ControllerEvent {

  def state = ControllerState.ControllerChange

  override def process(): Unit = {

zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
elect()
  }
}
{code}
In 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2019-06-13 Thread leibo (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862772#comment-16862772
 ] 

leibo edited comment on KAFKA-8532 at 6/13/19 12:20 PM:


[~te...@apache.org]  I saw you have handled KAFKA-6879, I wish you can help me 
to analysis this problem, thank you.


was (Author: lbdai3190):
[~te...@apache.org]   As you have handled KAFKA-6879, I wish you can help me to 
analysis this problem, thank you.

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Locked ownable synchronizers:
>  - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker)
> "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 
> nid=0x310 waiting on condition [0x7fccb55c8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005d1be5a00> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2019-06-13 Thread leibo (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862668#comment-16862668
 ] 

leibo edited comment on KAFKA-8532 at 6/13/19 6:46 AM:
---

Here is some analysis of mine according to the jstack log and source code:

When the deadlock occurred,
 # The zk-session-expiry-handler0 thread is reinitializing a new zookeeper 
instance, the expireEvent was put to queue of ControllerEventManager and 
waitting to handle by controller-event-handle thread. (*In the time, kafka is 
disconnected with zookeeper yet*).
 # However , in the same time, the controller-event-thread is handling 
Startup(type of ControllerEvent), as follows:

{code:java}
//KafkaController.scala
case object Startup extends ControllerEvent {

  def state = ControllerState.ControllerChange

  override def process(): Unit = {

zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
elect()
  }
}
{code}
In this process,  it is blocked at ZookeeperClient.handleRequests, code as 
follows:
{code:java}
//代码占位符
def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] 
= {
  if (requests.isEmpty)
Seq.empty
  else {
val countDownLatch = new CountDownLatch(requests.size)
val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)

requests.foreach { request =>
  inFlightRequests.acquire()
  try {
inReadLock(initializationLock) {
  send(request) { response =>
responseQueue.add(response)
inFlightRequests.release()
countDownLatch.countDown()
  }
}
  } catch {
case e: Throwable =>
  inFlightRequests.release()
  throw e
  }
}
countDownLatch.await()
responseQueue.asScala.toBuffer
  }
}
{code}
At this time, Zookeeper server is running normally, and all function is working 
well.

I suspect the key point is the *initializationLock* , because if isReadLock is 
success, it will throw exception when kafka disconnected with zookeeper, but we 
never see any exception log in the kafka server.log

controller-event-thread can not doing inReadLock(initializationLock) perhaps 
due to writeLock was not been release.

There are tow way to donging inWriteLock(initailizationLock):

 1.  close():
{code:java}
expiryScheduler.startup()
try waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS)
catch {
  case e: Throwable =>
close() //此处close的时候,加写锁
throw e
}

def close(): Unit = {
  info("Closing.")
  inWriteLock(initializationLock) {
zNodeChangeHandlers.clear()
zNodeChildChangeHandlers.clear()
stateChangeHandlers.clear()
zooKeeper.close()
metricNames.foreach(removeMetric(_))
  }
  // Shutdown scheduler outside of lock to avoid deadlock if scheduler
  // is waiting for lock to process session expiry
  expiryScheduler.shutdown()
  info("Closed.")
}
{code}
2.  reinitialize()
{code:java}
private def reinitialize(): Unit = {
  // Initialization callbacks are invoked outside of the lock to avoid deadlock 
potential since their completion
  // may require additional Zookeeper requests, which will block to acquire the 
initialization lock
  stateChangeHandlers.values.foreach(callBeforeInitializingSession _)  //  
zk-session-expiry-handler0 is blocked here

  inWriteLock(initializationLock) {//  program is not running here
if (!connectionState.isAlive) {
  zooKeeper.close()
  info(s"Initializing a new session to $connectString.")
  // retry forever until ZooKeeper can be instantiated
  var connected = false
  while (!connected) {
try {
  zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, 
ZooKeeperClientWatcher)
  connected = true
} catch {
  case e: Exception =>
info("Error when recreating ZooKeeper, retrying after a short 
sleep", e)
Thread.sleep(1000)
}
  }
}
  }

  stateChangeHandlers.values.foreach(callAfterInitializingSession _)
}
{code}
so i think the problem is occurred in function close()

 


was (Author: lbdai3190):
Here is some analysis of mine according to the jstack log and source code:

When the deadlock occurred,
 # The zk-session-expiry-handler0 thread is reinitializing a new zookeeper 
instance, the expireEvent was put to queue of ControllerEventManager and 
waitting to handle by controller-event-handle thread. (*In the time, kafka is 
disconnected with zookeeper yet*).
 # However , in the same time, the controller-event-thread is handling 
Startup(type of ControllerEvent), as bellow:

{code:java}
//KafkaController.scala
case object Startup extends ControllerEvent {

  def state = ControllerState.ControllerChange

  override def process(): Unit = {

zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
elect()
  }
}
{code}
In this process,  it is blocked 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2019-06-13 Thread leibo (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862668#comment-16862668
 ] 

leibo edited comment on KAFKA-8532 at 6/13/19 6:45 AM:
---

Here is some analysis of mine according to the jstack log and source code:

When the deadlock occurred,
 # The zk-session-expiry-handler0 thread is reinitializing a new zookeeper 
instance, the expireEvent was put to queue of ControllerEventManager and 
waitting to handle by controller-event-handle thread. (*In the time, kafka is 
disconnected with zookeeper yet*).
 # However , in the same time, the controller-event-thread is handling 
Startup(type of ControllerEvent), as bellow:

{code:java}
//KafkaController.scala
case object Startup extends ControllerEvent {

  def state = ControllerState.ControllerChange

  override def process(): Unit = {

zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
elect()
  }
}
{code}
In this process,  it is blocked at ZookeeperClient.handleRequests, code as 
bellow:
{code:java}
//代码占位符
def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] 
= {
  if (requests.isEmpty)
Seq.empty
  else {
val countDownLatch = new CountDownLatch(requests.size)
val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)

requests.foreach { request =>
  inFlightRequests.acquire()
  try {
inReadLock(initializationLock) {
  send(request) { response =>
responseQueue.add(response)
inFlightRequests.release()
countDownLatch.countDown()
  }
}
  } catch {
case e: Throwable =>
  inFlightRequests.release()
  throw e
  }
}
countDownLatch.await()
responseQueue.asScala.toBuffer
  }
}
{code}
At this time, Zookeeper server is running normally, and all function is working 
well.

I suspect the key point is the *initializationLock* , because if isReadLock is 
success, it will throw exception when kafka disconnected with zookeeper, but we 
never see any exception log in the kafka server.log

controller-event-thread can not doing inReadLock(initializationLock) perhaps 
due to writeLock was not been release.

There are tow way to donging inWriteLock(initailizationLock):

 1.  close():
{code:java}
expiryScheduler.startup()
try waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS)
catch {
  case e: Throwable =>
close() //此处close的时候,加写锁
throw e
}

def close(): Unit = {
  info("Closing.")
  inWriteLock(initializationLock) {
zNodeChangeHandlers.clear()
zNodeChildChangeHandlers.clear()
stateChangeHandlers.clear()
zooKeeper.close()
metricNames.foreach(removeMetric(_))
  }
  // Shutdown scheduler outside of lock to avoid deadlock if scheduler
  // is waiting for lock to process session expiry
  expiryScheduler.shutdown()
  info("Closed.")
}
{code}
2.  reinitialize()
{code:java}
private def reinitialize(): Unit = {
  // Initialization callbacks are invoked outside of the lock to avoid deadlock 
potential since their completion
  // may require additional Zookeeper requests, which will block to acquire the 
initialization lock
  stateChangeHandlers.values.foreach(callBeforeInitializingSession _)  //  
zk-session-expiry-handler0 is blocked here

  inWriteLock(initializationLock) {//  program is not running here
if (!connectionState.isAlive) {
  zooKeeper.close()
  info(s"Initializing a new session to $connectString.")
  // retry forever until ZooKeeper can be instantiated
  var connected = false
  while (!connected) {
try {
  zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, 
ZooKeeperClientWatcher)
  connected = true
} catch {
  case e: Exception =>
info("Error when recreating ZooKeeper, retrying after a short 
sleep", e)
Thread.sleep(1000)
}
  }
}
  }

  stateChangeHandlers.values.foreach(callAfterInitializingSession _)
}
{code}
so i think the problem is occurred in function close()

 


was (Author: lbdai3190):
Here is some analysis of mine according to the jstack log and source code:

When the deadlock occurred,
 # The zk-session-expiry-handler0 thread is reinitializing a new zookeeper 
instance, the expireEvent was put to queue of ControllerEventManager and 
waitting to handle by controller-event-handle thread. (*In the time, kafka is 
disconnected with zookeeper yet*).
 # However , in the same time, the controller-event-thread is handling Startup 
type of ControllerEvent, as bellow:

{code:java}
//KafkaController.scala
case object Startup extends ControllerEvent {

  def state = ControllerState.ControllerChange

  override def process(): Unit = {

zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
elect()
  }
}
{code}
In this process,  it is blocked at 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2019-06-12 Thread leibo (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862668#comment-16862668
 ] 

leibo edited comment on KAFKA-8532 at 6/13/19 3:58 AM:
---

Here is some analysis of mine according to the jstack log and source code:

When the deadlock occurred,
 # The zk-session-expiry-handler0 thread is reinitializing a new zookeeper 
instance, the expireEvent was put to queue of ControllerEventManager and 
waitting to handle by controller-event-handle thread. (*In the time, kafka is 
disconnected with zookeeper yet*).
 # However , in the same time, the controller-event-thread is handling Startup 
type of ControllerEvent, as bellow:

{code:java}
//KafkaController.scala
case object Startup extends ControllerEvent {

  def state = ControllerState.ControllerChange

  override def process(): Unit = {

zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
elect()
  }
}
{code}
In this process,  it is blocked at ZookeeperClient.handleRequests, code as 
bellow:
{code:java}
//代码占位符
def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] 
= {
  if (requests.isEmpty)
Seq.empty
  else {
val countDownLatch = new CountDownLatch(requests.size)
val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)

requests.foreach { request =>
  inFlightRequests.acquire()
  try {
inReadLock(initializationLock) {
  send(request) { response =>
responseQueue.add(response)
inFlightRequests.release()
countDownLatch.countDown()
  }
}
  } catch {
case e: Throwable =>
  inFlightRequests.release()
  throw e
  }
}
countDownLatch.await()
responseQueue.asScala.toBuffer
  }
}
{code}
At this time, Zookeeper server is running normally, and all function is working 
well.

I suspect the key point is the *initializationLock* , because if isReadLock is 
success, it will throw exception when kafka disconnected with zookeeper, but we 
never see any exception log in the kafka server.log

controller-event-thread can not doing inReadLock(initializationLock) perhaps 
due to writeLock was not been release.

There are tow way to donging inWriteLock(initailizationLock):

 1.  close():

 
{code:java}
def close(): Unit = {
  info("Closing.")
  inWriteLock(initializationLock) {
zNodeChangeHandlers.clear()
zNodeChildChangeHandlers.clear()
stateChangeHandlers.clear()
zooKeeper.close()
metricNames.foreach(removeMetric(_))
  }
  // Shutdown scheduler outside of lock to avoid deadlock if scheduler
  // is waiting for lock to process session expiry
  expiryScheduler.shutdown()
  info("Closed.")
}
{code}
2.  reinitialize()
{code:java}
private def reinitialize(): Unit = {
  // Initialization callbacks are invoked outside of the lock to avoid deadlock 
potential since their completion
  // may require additional Zookeeper requests, which will block to acquire the 
initialization lock
  stateChangeHandlers.values.foreach(callBeforeInitializingSession _)  //  
zk-session-expiry-handler0 is blocked here

  inWriteLock(initializationLock) {//  program is not running here
if (!connectionState.isAlive) {
  zooKeeper.close()
  info(s"Initializing a new session to $connectString.")
  // retry forever until ZooKeeper can be instantiated
  var connected = false
  while (!connected) {
try {
  zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, 
ZooKeeperClientWatcher)
  connected = true
} catch {
  case e: Exception =>
info("Error when recreating ZooKeeper, retrying after a short 
sleep", e)
Thread.sleep(1000)
}
  }
}
  }

  stateChangeHandlers.values.foreach(callAfterInitializingSession _)
}
{code}
so i think the problem is occurred in function close()

 


was (Author: lbdai3190):
Here is some analysis of mine according to the jstack log and source code:

When the deadlock occurred,
 # The zk-session-expiry-handler0 thread is reinitializing a new zookeeper 
instance, the expireEvent was put to queue of ControllerEventManager and 
waitting to handle by controller-event-handle thread. (*In the time, kafka is 
disconnected with zookeeper yet*).
 # However , in the same time, the controller-event-thread is handling Startup 
type of ControllerEvent, as bellow:

{code:java}
//KafkaController.scala
case object Startup extends ControllerEvent {

  def state = ControllerState.ControllerChange

  override def process(): Unit = {

zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
elect()
  }
}
{code}
In this process,  it is blocked at ZookeeperClient.handleRequests, code as 
bellow:
{code:java}
//代码占位符
def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] 
= {
  if 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2019-06-12 Thread leibo (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862668#comment-16862668
 ] 

leibo edited comment on KAFKA-8532 at 6/13/19 4:00 AM:
---

Here is some analysis of mine according to the jstack log and source code:

When the deadlock occurred,
 # The zk-session-expiry-handler0 thread is reinitializing a new zookeeper 
instance, the expireEvent was put to queue of ControllerEventManager and 
waitting to handle by controller-event-handle thread. (*In the time, kafka is 
disconnected with zookeeper yet*).
 # However , in the same time, the controller-event-thread is handling Startup 
type of ControllerEvent, as bellow:

{code:java}
//KafkaController.scala
case object Startup extends ControllerEvent {

  def state = ControllerState.ControllerChange

  override def process(): Unit = {

zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
elect()
  }
}
{code}
In this process,  it is blocked at ZookeeperClient.handleRequests, code as 
bellow:
{code:java}
//代码占位符
def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] 
= {
  if (requests.isEmpty)
Seq.empty
  else {
val countDownLatch = new CountDownLatch(requests.size)
val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)

requests.foreach { request =>
  inFlightRequests.acquire()
  try {
inReadLock(initializationLock) {
  send(request) { response =>
responseQueue.add(response)
inFlightRequests.release()
countDownLatch.countDown()
  }
}
  } catch {
case e: Throwable =>
  inFlightRequests.release()
  throw e
  }
}
countDownLatch.await()
responseQueue.asScala.toBuffer
  }
}
{code}
At this time, Zookeeper server is running normally, and all function is working 
well.

I suspect the key point is the *initializationLock* , because if isReadLock is 
success, it will throw exception when kafka disconnected with zookeeper, but we 
never see any exception log in the kafka server.log

controller-event-thread can not doing inReadLock(initializationLock) perhaps 
due to writeLock was not been release.

There are tow way to donging inWriteLock(initailizationLock):

 1.  close():
{code:java}
expiryScheduler.startup()
try waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS)
catch {
  case e: Throwable =>
close() //此处close的时候,加写锁
throw e
}

def close(): Unit = {
  info("Closing.")
  inWriteLock(initializationLock) {
zNodeChangeHandlers.clear()
zNodeChildChangeHandlers.clear()
stateChangeHandlers.clear()
zooKeeper.close()
metricNames.foreach(removeMetric(_))
  }
  // Shutdown scheduler outside of lock to avoid deadlock if scheduler
  // is waiting for lock to process session expiry
  expiryScheduler.shutdown()
  info("Closed.")
}
{code}
2.  reinitialize()
{code:java}
private def reinitialize(): Unit = {
  // Initialization callbacks are invoked outside of the lock to avoid deadlock 
potential since their completion
  // may require additional Zookeeper requests, which will block to acquire the 
initialization lock
  stateChangeHandlers.values.foreach(callBeforeInitializingSession _)  //  
zk-session-expiry-handler0 is blocked here

  inWriteLock(initializationLock) {//  program is not running here
if (!connectionState.isAlive) {
  zooKeeper.close()
  info(s"Initializing a new session to $connectString.")
  // retry forever until ZooKeeper can be instantiated
  var connected = false
  while (!connected) {
try {
  zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, 
ZooKeeperClientWatcher)
  connected = true
} catch {
  case e: Exception =>
info("Error when recreating ZooKeeper, retrying after a short 
sleep", e)
Thread.sleep(1000)
}
  }
}
  }

  stateChangeHandlers.values.foreach(callAfterInitializingSession _)
}
{code}
so i think the problem is occurred in function close()

 


was (Author: lbdai3190):
Here is some analysis of mine according to the jstack log and source code:

When the deadlock occurred,
 # The zk-session-expiry-handler0 thread is reinitializing a new zookeeper 
instance, the expireEvent was put to queue of ControllerEventManager and 
waitting to handle by controller-event-handle thread. (*In the time, kafka is 
disconnected with zookeeper yet*).
 # However , in the same time, the controller-event-thread is handling Startup 
type of ControllerEvent, as bellow:

{code:java}
//KafkaController.scala
case object Startup extends ControllerEvent {

  def state = ControllerState.ControllerChange

  override def process(): Unit = {

zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
elect()
  }
}
{code}
In this process,  it is blocked at 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2019-06-12 Thread leibo (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862668#comment-16862668
 ] 

leibo edited comment on KAFKA-8532 at 6/13/19 3:58 AM:
---

Here is some analysis of mine according to the jstack log and source code:

When the deadlock occurred,
 # The zk-session-expiry-handler0 thread is reinitializing a new zookeeper 
instance, the expireEvent was put to queue of ControllerEventManager and 
waitting to handle by controller-event-handle thread. (*In the time, kafka is 
disconnected with zookeeper yet*).
 # However , in the same time, the controller-event-thread is handling Startup 
type of ControllerEvent, as bellow:

{code:java}
//KafkaController.scala
case object Startup extends ControllerEvent {

  def state = ControllerState.ControllerChange

  override def process(): Unit = {

zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
elect()
  }
}
{code}
In this process,  it is blocked at ZookeeperClient.handleRequests, code as 
bellow:
{code:java}
//代码占位符
def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] 
= {
  if (requests.isEmpty)
Seq.empty
  else {
val countDownLatch = new CountDownLatch(requests.size)
val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)

requests.foreach { request =>
  inFlightRequests.acquire()
  try {
inReadLock(initializationLock) {
  send(request) { response =>
responseQueue.add(response)
inFlightRequests.release()
countDownLatch.countDown()
  }
}
  } catch {
case e: Throwable =>
  inFlightRequests.release()
  throw e
  }
}
countDownLatch.await()
responseQueue.asScala.toBuffer
  }
}
{code}
At this time, Zookeeper server is running normally, and all function is working 
well.

I suspect the key point is the *initializationLock* , because if isReadLock is 
success, it will throw exception when kafka disconnected with zookeeper, but we 
never see any exception log in the kafka server.log

controller-event-thread can not doing inReadLock(initializationLock) perhaps 
due to writeLock was not been release.

There are tow way to donging inWriteLock(initailizationLock):

 1.  close():

 
{code:java}
def close(): Unit = {
  info("Closing.")
  inWriteLock(initializationLock) {
zNodeChangeHandlers.clear()
zNodeChildChangeHandlers.clear()
stateChangeHandlers.clear()
zooKeeper.close()
metricNames.foreach(removeMetric(_))
  }
  // Shutdown scheduler outside of lock to avoid deadlock if scheduler
  // is waiting for lock to process session expiry
  expiryScheduler.shutdown()
  info("Closed.")
}
{code}
2.  reinitialize()

 

 
{code:java}
private def reinitialize(): Unit = {
  // Initialization callbacks are invoked outside of the lock to avoid deadlock 
potential since their completion
  // may require additional Zookeeper requests, which will block to acquire the 
initialization lock
  stateChangeHandlers.values.foreach(callBeforeInitializingSession _)  //  
zk-session-expiry-handler0 is blocked here

  inWriteLock(initializationLock) {//  program is not running here
if (!connectionState.isAlive) {
  zooKeeper.close()
  info(s"Initializing a new session to $connectString.")
  // retry forever until ZooKeeper can be instantiated
  var connected = false
  while (!connected) {
try {
  zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, 
ZooKeeperClientWatcher)
  connected = true
} catch {
  case e: Exception =>
info("Error when recreating ZooKeeper, retrying after a short 
sleep", e)
Thread.sleep(1000)
}
  }
}
  }

  stateChangeHandlers.values.foreach(callAfterInitializingSession _)
}
{code}
so i think the problem is occurred in function close()

 


was (Author: lbdai3190):
Here is some analysis of mine according to the jstack log and source code:

When the deadlock occurred,
 # The zk-session-expiry-handler0 thread is reinitializing a new zookeeper 
instance, the expireEvent was put to queue of ControllerEventManager and 
waitting to handle by controller-event-handle thread. (*In the time, kafka is 
disconnected with zookeeper yet*).
 # However , in the same time, the controller-event-thread is handling Startup 
type of ControllerEvent, as bellow:

{code:java}
//KafkaController.scala
case object Startup extends ControllerEvent {

  def state = ControllerState.ControllerChange

  override def process(): Unit = {

zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
elect()
  }
}
{code}
In this process,  it is blocked at ZookeeperClient.handleRequests, code as 
bellow:
{code:java}
//代码占位符
def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] 
= {
  if 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2019-06-12 Thread leibo (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862668#comment-16862668
 ] 

leibo edited comment on KAFKA-8532 at 6/13/19 3:58 AM:
---

Here is some analysis of mine according to the jstack log and source code:

When the deadlock occurred,
 # The zk-session-expiry-handler0 thread is reinitializing a new zookeeper 
instance, the expireEvent was put to queue of ControllerEventManager and 
waitting to handle by controller-event-handle thread. (*In the time, kafka is 
disconnected with zookeeper yet*).
 # However , in the same time, the controller-event-thread is handling Startup 
type of ControllerEvent, as bellow:

{code:java}
//KafkaController.scala
case object Startup extends ControllerEvent {

  def state = ControllerState.ControllerChange

  override def process(): Unit = {

zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
elect()
  }
}
{code}
In this process,  it is blocked at ZookeeperClient.handleRequests, code as 
bellow:
{code:java}
//代码占位符
def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] 
= {
  if (requests.isEmpty)
Seq.empty
  else {
val countDownLatch = new CountDownLatch(requests.size)
val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)

requests.foreach { request =>
  inFlightRequests.acquire()
  try {
inReadLock(initializationLock) {
  send(request) { response =>
responseQueue.add(response)
inFlightRequests.release()
countDownLatch.countDown()
  }
}
  } catch {
case e: Throwable =>
  inFlightRequests.release()
  throw e
  }
}
countDownLatch.await()
responseQueue.asScala.toBuffer
  }
}
{code}
At this time, Zookeeper server is running normally, and all function is working 
well.

I suspect the key point is the *initializationLock* , because if isReadLock is 
success, it will throw exception when kafka disconnected with zookeeper, but we 
never see any exception log in the kafka server.log

controller-event-thread can not doing inReadLock(initializationLock) perhaps 
due to writeLock was not been release.

There are tow way to donging inWriteLock(initailizationLock):

 1.  close():
{code:java}
def close(): Unit = {
  info("Closing.")
  inWriteLock(initializationLock) {
zNodeChangeHandlers.clear()
zNodeChildChangeHandlers.clear()
stateChangeHandlers.clear()
zooKeeper.close()
metricNames.foreach(removeMetric(_))
  }
  // Shutdown scheduler outside of lock to avoid deadlock if scheduler
  // is waiting for lock to process session expiry
  expiryScheduler.shutdown()
  info("Closed.")
}
{code}
2.  reinitialize()
{code:java}
private def reinitialize(): Unit = {
  // Initialization callbacks are invoked outside of the lock to avoid deadlock 
potential since their completion
  // may require additional Zookeeper requests, which will block to acquire the 
initialization lock
  stateChangeHandlers.values.foreach(callBeforeInitializingSession _)  //  
zk-session-expiry-handler0 is blocked here

  inWriteLock(initializationLock) {//  program is not running here
if (!connectionState.isAlive) {
  zooKeeper.close()
  info(s"Initializing a new session to $connectString.")
  // retry forever until ZooKeeper can be instantiated
  var connected = false
  while (!connected) {
try {
  zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, 
ZooKeeperClientWatcher)
  connected = true
} catch {
  case e: Exception =>
info("Error when recreating ZooKeeper, retrying after a short 
sleep", e)
Thread.sleep(1000)
}
  }
}
  }

  stateChangeHandlers.values.foreach(callAfterInitializingSession _)
}
{code}
so i think the problem is occurred in function close()

 


was (Author: lbdai3190):
Here is some analysis of mine according to the jstack log and source code:

When the deadlock occurred,
 # The zk-session-expiry-handler0 thread is reinitializing a new zookeeper 
instance, the expireEvent was put to queue of ControllerEventManager and 
waitting to handle by controller-event-handle thread. (*In the time, kafka is 
disconnected with zookeeper yet*).
 # However , in the same time, the controller-event-thread is handling Startup 
type of ControllerEvent, as bellow:

{code:java}
//KafkaController.scala
case object Startup extends ControllerEvent {

  def state = ControllerState.ControllerChange

  override def process(): Unit = {

zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
elect()
  }
}
{code}
In this process,  it is blocked at ZookeeperClient.handleRequests, code as 
bellow:
{code:java}
//代码占位符
def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] 
= {
  if (requests.isEmpty)
  

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2019-06-12 Thread leibo (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862668#comment-16862668
 ] 

leibo edited comment on KAFKA-8532 at 6/13/19 3:34 AM:
---

Here is some analysis of mine according to the jstack log and source code:

When the deadlock occurred,
 # The zk-session-expiry-handler0 thread is reinitializing a new zookeeper 
instance, the expireEvent was put to queue of ControllerEventManager and 
waitting to handle by controller-event-handle thread. (*In the time, kafka is 
disconnected with zookeeper yet*).
 # However , in the same time, the controller-event-thread is handling Startup 
type of ControllerEvent, as bellow:

{code:java}
//KafkaController.scala
case object Startup extends ControllerEvent {

  def state = ControllerState.ControllerChange

  override def process(): Unit = {

zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
elect()
  }
}
{code}
In this process,  it is blocked at ZookeeperClient.handleRequests, code as 
bellow:
{code:java}
//代码占位符
def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] 
= {
  if (requests.isEmpty)
Seq.empty
  else {
val countDownLatch = new CountDownLatch(requests.size)
val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)

requests.foreach { request =>
  inFlightRequests.acquire()
  try {
inReadLock(initializationLock) {
  send(request) { response =>
responseQueue.add(response)
inFlightRequests.release()
countDownLatch.countDown()
  }
}
  } catch {
case e: Throwable =>
  inFlightRequests.release()
  throw e
  }
}
countDownLatch.await()
responseQueue.asScala.toBuffer
  }
}
{code}
At this time, Zookeeper server is running normally, and all function is working 
well.

I suspect the key point is the *initializationLock* , because if isReadLock is 
success, it will throw exception when kafka disconnected with zookeeper, but we 
never see any exception log in the kafka server.log


was (Author: lbdai3190):
Here is some analysis of mine according to the jstack log and source code:

When the deadlock occurred,
 # The zk-session-expiry-handler0 thread is reinitializing a new zookeeper 
instance, the expireEvent was put to queue of ControllerEventManager and 
waitting to handle by controller-event-handle thread. (*In the time, kafka is 
disconnected with zookeeper yet*).
 # However , in the same time, the controller-event-thread is handling Startup 
type of ControllerEvent, as bellow:

{code:java}
//KafkaController.scala
case object Startup extends ControllerEvent {

  def state = ControllerState.ControllerChange

  override def process(): Unit = {

zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
elect()
  }
}
{code}
In this process,  it is blocked at ZookeeperClient.handleRequests, code as 
bellow:
{code:java}
//代码占位符
def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] 
= {
  if (requests.isEmpty)
Seq.empty
  else {
val countDownLatch = new CountDownLatch(requests.size)
val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)

requests.foreach { request =>
  inFlightRequests.acquire()
  try {
inReadLock(initializationLock) {
  send(request) { response =>
responseQueue.add(response)
inFlightRequests.release()
countDownLatch.countDown()
  }
}
  } catch {
case e: Throwable =>
  inFlightRequests.release()
  throw e
  }
}
countDownLatch.await()
responseQueue.asScala.toBuffer
  }
}
{code}
At this time, Zookeeper server is running normally, and all function is working 
well.

I suspect the key point is the *initializationLock* , because is isReadLock is 
success, it will throw exception when kafka disconnected with zookeeper, but we 
never see any exception log in the kafka server.log

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only way to 
> recovery the kafka cluster is restart kafka server. Bellow is the jstack log 
> of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2019-06-12 Thread leibo (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862668#comment-16862668
 ] 

leibo edited comment on KAFKA-8532 at 6/13/19 3:31 AM:
---

Here is some analysis of mine according to the jstack log and source code:

When the deadlock occurred,
 # The zk-session-expiry-handler0 thread is reinitializing a new zookeeper 
instance, the expireEvent was put to queue of ControllerEventManager and 
waitting to handle by controller-event-handle thread. (*In the time, kafka is 
disconnected with zookeeper yet*).
 # However , in the same time, the controller-event-thread is handling Startup 
type of ControllerEvent, as bellow:

{code:java}
//KafkaController.scala
case object Startup extends ControllerEvent {

  def state = ControllerState.ControllerChange

  override def process(): Unit = {

zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
elect()
  }
}
{code}
In this process,  it is blocked at ZookeeperClient.handleRequests, code as 
bellow:
{code:java}
//代码占位符
def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] 
= {
  if (requests.isEmpty)
Seq.empty
  else {
val countDownLatch = new CountDownLatch(requests.size)
val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)

requests.foreach { request =>
  inFlightRequests.acquire()
  try {
inReadLock(initializationLock) {
  send(request) { response =>
responseQueue.add(response)
inFlightRequests.release()
countDownLatch.countDown()
  }
}
  } catch {
case e: Throwable =>
  inFlightRequests.release()
  throw e
  }
}
countDownLatch.await()
responseQueue.asScala.toBuffer
  }
}
{code}
At this time, Zookeeper server is running normally, and all function is working 
well.

I suspect the key point is the *initializationLock* , because is isReadLock is 
success, it will throw exception when kafka disconnected with zookeeper, but we 
never see any exception log in the kafka server.log


was (Author: lbdai3190):
Here is some analysis of mine according to the jstack log and source code:

When the deadlock occurred,
 # The zk-session-expiry-handler0 thread is reinitializing a new zookeeper 
instance, the expireEvent was put to queue of ControllerEventManager and 
waitting to handle by controller-event-handle thread. (*In the time, kafka is 
disconnected with zookeeper yet*).
 # However , in the same time, the controller-event-thread is handling Startup 
type of ControllerEvent, as bellow:

{code:java}
//KafkaController.scala
case object Startup extends ControllerEvent {

  def state = ControllerState.ControllerChange

  override def process(): Unit = {

zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
elect()
  }
}
{code}
In this process,  it is blocked at ZookeeperClient.handleRequests, code as 
bellow:
{code:java}
//代码占位符
def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] 
= {
  if (requests.isEmpty)
Seq.empty
  else {
val countDownLatch = new CountDownLatch(requests.size)
val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)

requests.foreach { request =>
  inFlightRequests.acquire()
  try {
inReadLock(initializationLock) {
  send(request) { response =>
responseQueue.add(response)
inFlightRequests.release()
countDownLatch.countDown()
  }
}
  } catch {
case e: Throwable =>
  inFlightRequests.release()
  throw e
  }
}
countDownLatch.await()
responseQueue.asScala.toBuffer
  }
}

{code}

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only way to 
> recovery the kafka cluster is restart kafka server. Bellow is the jstack log 
> of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
>