[ 
https://issues.apache.org/jira/browse/KAFKA-13428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17437697#comment-17437697
 ] 

yuhuo commented on KAFKA-13428:
-------------------------------

Thanks [~dajac]. that’s the same question.

> server hang on shutdown
> -----------------------
>
>                 Key: KAFKA-13428
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13428
>             Project: Kafka
>          Issue Type: Improvement
>          Components: core
>    Affects Versions: 2.5.0
>            Reporter: yuhuo
>            Priority: Major
>
> Kafka Server start as follow step:  
>     1. socketServer.startup
>     2. zkClient.registerBroker
>     3. socketServer.startDataPlaneProcessors
> after step1,the port can be connnected, but default processors queue size is 
> 20, if there is many connections, the AcceptorThread will wait on processors 
> queue put. and then, if registerBroker error(such as zk session not expired, 
> the broker id still exists), server will go shutdown and never start network 
> processors,  AcceptorThread will shutdown fail because thread still wait on 
> queue, at last server is hang.
> stack:
> {code:java}
> //代码占位符
> ...
> "data-plane-kafka-socket-acceptor-ListenerName(ING_INSIDE)-PLAINTEXT-9094" 
> #35 prio=5 os_prio=0 tid=0x000055fe58048800 nid=0x6c5 runnable 
> [0x00007f5f60f8b000]
>    java.lang.Thread.State: RUNNABLE
>       at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>       at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>       at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
>       at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
>       - locked <0x00000006497b86a0> (a sun.nio.ch.Util$3)
>       - locked <0x00000006497b8690> (a java.util.Collections$UnmodifiableSet)
>       - locked <0x00000006497b86b0> (a sun.nio.ch.EPollSelectorImpl)
>       at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
>       at kafka.network.Acceptor.run(SocketServer.scala:534)
>       at java.lang.Thread.run(Thread.java:748)
> "data-plane-kafka-socket-acceptor-ListenerName(INSIDE)-PLAINTEXT-9092" #34 
> prio=5 os_prio=0 tid=0x000055fe55773800 nid=0x6c4 waiting on condition 
> [0x00007f5f63315000]
>    java.lang.Thread.State: WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00000006497b88d8> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>       at 
> java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:353)
>       at kafka.network.Processor.accept(SocketServer.scala:1002)
>       at kafka.network.Acceptor.assignNewConnection(SocketServer.scala:633)
>       at kafka.network.Acceptor.$anonfun$run$1(SocketServer.scala:560)
>       at kafka.network.Acceptor.run(SocketServer.scala:544)
>       at java.lang.Thread.run(Thread.java:748)
> ...
> "main" #1 prio=5 os_prio=0 tid=0x000055fe5519a000 nid=0x69f waiting on 
> condition [0x00007f5f8a0cf000]
>    java.lang.Thread.State: WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00000006497b8df8> (a 
> java.util.concurrent.CountDownLatch$Sync)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>       at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>       at kafka.network.AbstractServerThread.shutdown(SocketServer.scala:430)
>       at kafka.network.Acceptor.shutdown(SocketServer.scala:517)
>       at 
> kafka.network.SocketServer.$anonfun$stopProcessingRequests$2(SocketServer.scala:267)
>       at 
> kafka.network.SocketServer.$anonfun$stopProcessingRequests$2$adapted(SocketServer.scala:267)
>       at kafka.network.SocketServer$$Lambda$408/1620459733.apply(Unknown 
> Source)
>       at scala.collection.Iterator.foreach(Iterator.scala:941)
>       at scala.collection.Iterator.foreach$(Iterator.scala:941)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
>       at 
> scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:213)
>       at 
> kafka.network.SocketServer.stopProcessingRequests(SocketServer.scala:267)
>       - locked <0x00000006497b8e98> (a kafka.network.SocketServer)
>       at kafka.server.KafkaServer.$anonfun$shutdown$4(KafkaServer.scala:617)
>       at kafka.server.KafkaServer$$Lambda$406/1338368149.apply$mcV$sp(Unknown 
> Source)
>       at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:67)
>       at kafka.server.KafkaServer.shutdown(KafkaServer.scala:617)
>       at kafka.server.KafkaServer.startup(KafkaServer.scala:358)
>       at 
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
>       at kafka.Kafka$.main(Kafka.scala:82)
>       at kafka.Kafka.main(Kafka.scala)
>  
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to