[ 
https://issues.apache.org/jira/browse/KAFKA-13428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuhuo updated KAFKA-13428:
--------------------------
    Description: 
Kafka Server start as follow step:  

    1. socketServer.startup

    2. zkClient.registerBroker

    3. socketServer.startDataPlaneProcessors

after step1,the port can be connnected, but default processors queue size is 
20, if there is many connections, the AcceptorThread will wait on processors 
queue put. and then, if registerBroker error(such as zk session not expired, 
the broker id still exists), server will go shutdown and never start network 
processors,  AcceptorThread will shutdown fail because thread still wait on 
queue, at last server is hang.

stack:
{code:java}
//代码占位符
...
"data-plane-kafka-socket-acceptor-ListenerName(ING_INSIDE)-PLAINTEXT-9094" #35 
prio=5 os_prio=0 tid=0x000055fe58048800 nid=0x6c5 runnable [0x00007f5f60f8b000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        - locked <0x00000006497b86a0> (a sun.nio.ch.Util$3)
        - locked <0x00000006497b8690> (a java.util.Collections$UnmodifiableSet)
        - locked <0x00000006497b86b0> (a sun.nio.ch.EPollSelectorImpl)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at kafka.network.Acceptor.run(SocketServer.scala:534)
        at java.lang.Thread.run(Thread.java:748)

"data-plane-kafka-socket-acceptor-ListenerName(INSIDE)-PLAINTEXT-9092" #34 
prio=5 os_prio=0 tid=0x000055fe55773800 nid=0x6c4 waiting on condition 
[0x00007f5f63315000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000006497b88d8> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at 
java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:353)
        at kafka.network.Processor.accept(SocketServer.scala:1002)
        at kafka.network.Acceptor.assignNewConnection(SocketServer.scala:633)
        at kafka.network.Acceptor.$anonfun$run$1(SocketServer.scala:560)
        at kafka.network.Acceptor.run(SocketServer.scala:544)
        at java.lang.Thread.run(Thread.java:748)
...
"main" #1 prio=5 os_prio=0 tid=0x000055fe5519a000 nid=0x69f waiting on 
condition [0x00007f5f8a0cf000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000006497b8df8> (a 
java.util.concurrent.CountDownLatch$Sync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
        at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
        at kafka.network.AbstractServerThread.shutdown(SocketServer.scala:430)
        at kafka.network.Acceptor.shutdown(SocketServer.scala:517)
        at 
kafka.network.SocketServer.$anonfun$stopProcessingRequests$2(SocketServer.scala:267)
        at 
kafka.network.SocketServer.$anonfun$stopProcessingRequests$2$adapted(SocketServer.scala:267)
        at kafka.network.SocketServer$$Lambda$408/1620459733.apply(Unknown 
Source)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at 
scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:213)
        at 
kafka.network.SocketServer.stopProcessingRequests(SocketServer.scala:267)
        - locked <0x00000006497b8e98> (a kafka.network.SocketServer)
        at kafka.server.KafkaServer.$anonfun$shutdown$4(KafkaServer.scala:617)
        at kafka.server.KafkaServer$$Lambda$406/1338368149.apply$mcV$sp(Unknown 
Source)
        at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:67)
        at kafka.server.KafkaServer.shutdown(KafkaServer.scala:617)
        at kafka.server.KafkaServer.startup(KafkaServer.scala:358)
        at 
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
        at kafka.Kafka$.main(Kafka.scala:82)
        at kafka.Kafka.main(Kafka.scala)
 
{code}

  was:
Kafka Server start as follow step:  

    1. socketServer.startup

    2. zkClient.registerBroker

    3. socketServer.startDataPlaneProcessors

after step1,the port can be connnected, but default processors queue size is 
20, if there is many connections,the AcceptorThread will wait on processors 
queue put. and then, if registerBroker error(such as zk session not expired, 
the broker id still exist), server will shutdown and never start network 
processors,  AcceptorThread will shutdown fail because thread still wait on 
queue, at last server is hang.

stack:
{code:java}
//代码占位符
...
"data-plane-kafka-socket-acceptor-ListenerName(ING_INSIDE)-PLAINTEXT-9094" #35 
prio=5 os_prio=0 tid=0x000055fe58048800 nid=0x6c5 runnable [0x00007f5f60f8b000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        - locked <0x00000006497b86a0> (a sun.nio.ch.Util$3)
        - locked <0x00000006497b8690> (a java.util.Collections$UnmodifiableSet)
        - locked <0x00000006497b86b0> (a sun.nio.ch.EPollSelectorImpl)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at kafka.network.Acceptor.run(SocketServer.scala:534)
        at java.lang.Thread.run(Thread.java:748)

"data-plane-kafka-socket-acceptor-ListenerName(INSIDE)-PLAINTEXT-9092" #34 
prio=5 os_prio=0 tid=0x000055fe55773800 nid=0x6c4 waiting on condition 
[0x00007f5f63315000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000006497b88d8> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at 
java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:353)
        at kafka.network.Processor.accept(SocketServer.scala:1002)
        at kafka.network.Acceptor.assignNewConnection(SocketServer.scala:633)
        at kafka.network.Acceptor.$anonfun$run$1(SocketServer.scala:560)
        at kafka.network.Acceptor.run(SocketServer.scala:544)
        at java.lang.Thread.run(Thread.java:748)
...
"main" #1 prio=5 os_prio=0 tid=0x000055fe5519a000 nid=0x69f waiting on 
condition [0x00007f5f8a0cf000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000006497b8df8> (a 
java.util.concurrent.CountDownLatch$Sync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
        at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
        at kafka.network.AbstractServerThread.shutdown(SocketServer.scala:430)
        at kafka.network.Acceptor.shutdown(SocketServer.scala:517)
        at 
kafka.network.SocketServer.$anonfun$stopProcessingRequests$2(SocketServer.scala:267)
        at 
kafka.network.SocketServer.$anonfun$stopProcessingRequests$2$adapted(SocketServer.scala:267)
        at kafka.network.SocketServer$$Lambda$408/1620459733.apply(Unknown 
Source)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at 
scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:213)
        at 
kafka.network.SocketServer.stopProcessingRequests(SocketServer.scala:267)
        - locked <0x00000006497b8e98> (a kafka.network.SocketServer)
        at kafka.server.KafkaServer.$anonfun$shutdown$4(KafkaServer.scala:617)
        at kafka.server.KafkaServer$$Lambda$406/1338368149.apply$mcV$sp(Unknown 
Source)
        at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:67)
        at kafka.server.KafkaServer.shutdown(KafkaServer.scala:617)
        at kafka.server.KafkaServer.startup(KafkaServer.scala:358)
        at 
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
        at kafka.Kafka$.main(Kafka.scala:82)
        at kafka.Kafka.main(Kafka.scala)
 
{code}


> server hang on shutdown
> -----------------------
>
>                 Key: KAFKA-13428
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13428
>             Project: Kafka
>          Issue Type: Improvement
>          Components: core
>    Affects Versions: 2.5.0
>            Reporter: yuhuo
>            Priority: Major
>
> Kafka Server start as follow step:  
>     1. socketServer.startup
>     2. zkClient.registerBroker
>     3. socketServer.startDataPlaneProcessors
> after step1,the port can be connnected, but default processors queue size is 
> 20, if there is many connections, the AcceptorThread will wait on processors 
> queue put. and then, if registerBroker error(such as zk session not expired, 
> the broker id still exists), server will go shutdown and never start network 
> processors,  AcceptorThread will shutdown fail because thread still wait on 
> queue, at last server is hang.
> stack:
> {code:java}
> //代码占位符
> ...
> "data-plane-kafka-socket-acceptor-ListenerName(ING_INSIDE)-PLAINTEXT-9094" 
> #35 prio=5 os_prio=0 tid=0x000055fe58048800 nid=0x6c5 runnable 
> [0x00007f5f60f8b000]
>    java.lang.Thread.State: RUNNABLE
>       at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>       at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>       at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
>       at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
>       - locked <0x00000006497b86a0> (a sun.nio.ch.Util$3)
>       - locked <0x00000006497b8690> (a java.util.Collections$UnmodifiableSet)
>       - locked <0x00000006497b86b0> (a sun.nio.ch.EPollSelectorImpl)
>       at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
>       at kafka.network.Acceptor.run(SocketServer.scala:534)
>       at java.lang.Thread.run(Thread.java:748)
> "data-plane-kafka-socket-acceptor-ListenerName(INSIDE)-PLAINTEXT-9092" #34 
> prio=5 os_prio=0 tid=0x000055fe55773800 nid=0x6c4 waiting on condition 
> [0x00007f5f63315000]
>    java.lang.Thread.State: WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00000006497b88d8> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>       at 
> java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:353)
>       at kafka.network.Processor.accept(SocketServer.scala:1002)
>       at kafka.network.Acceptor.assignNewConnection(SocketServer.scala:633)
>       at kafka.network.Acceptor.$anonfun$run$1(SocketServer.scala:560)
>       at kafka.network.Acceptor.run(SocketServer.scala:544)
>       at java.lang.Thread.run(Thread.java:748)
> ...
> "main" #1 prio=5 os_prio=0 tid=0x000055fe5519a000 nid=0x69f waiting on 
> condition [0x00007f5f8a0cf000]
>    java.lang.Thread.State: WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00000006497b8df8> (a 
> java.util.concurrent.CountDownLatch$Sync)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>       at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>       at kafka.network.AbstractServerThread.shutdown(SocketServer.scala:430)
>       at kafka.network.Acceptor.shutdown(SocketServer.scala:517)
>       at 
> kafka.network.SocketServer.$anonfun$stopProcessingRequests$2(SocketServer.scala:267)
>       at 
> kafka.network.SocketServer.$anonfun$stopProcessingRequests$2$adapted(SocketServer.scala:267)
>       at kafka.network.SocketServer$$Lambda$408/1620459733.apply(Unknown 
> Source)
>       at scala.collection.Iterator.foreach(Iterator.scala:941)
>       at scala.collection.Iterator.foreach$(Iterator.scala:941)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
>       at 
> scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:213)
>       at 
> kafka.network.SocketServer.stopProcessingRequests(SocketServer.scala:267)
>       - locked <0x00000006497b8e98> (a kafka.network.SocketServer)
>       at kafka.server.KafkaServer.$anonfun$shutdown$4(KafkaServer.scala:617)
>       at kafka.server.KafkaServer$$Lambda$406/1338368149.apply$mcV$sp(Unknown 
> Source)
>       at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:67)
>       at kafka.server.KafkaServer.shutdown(KafkaServer.scala:617)
>       at kafka.server.KafkaServer.startup(KafkaServer.scala:358)
>       at 
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
>       at kafka.Kafka$.main(Kafka.scala:82)
>       at kafka.Kafka.main(Kafka.scala)
>  
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to