[ https://issues.apache.org/jira/browse/KAFKA-212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13158807#comment-13158807 ]
Neha Narkhede commented on KAFKA-212: ------------------------------------- >> which is how do we know if we are leaking threads? Good question. From what I see, the entire thread run() method is guarded by a try-catch-finally block. In the finally block, we count down the latch. So if the thread itself runs into some exception/error, it will exit the run() method and shut itself down. The other case of thread shutdown is when the mirroring thread itself calls the shutdown API. Here, we wait until the current producer send operation succeeds to count down the latch. In both cases, I don't see how we can leak threads >> I guess the patch doesn't make it better or worse, since we definitely don't >> want to keep them in the list, but can you assess what happens if shutdown >> fails? Not true. The patch fixes the bug filed here. Your concerns are about the shutdown logic of the thread, which if you suspect is a bug, can go in a separate JIRA. >> Do we log it? Yes, in any case of shutdown, it gets logged as a FATAL error. >> Or is there a guarantee that the thread must shutdown in some bounded period >> of time? Maybe. If the producer send operation hangs indefinitely, which is a serious bug in the producer send logic. > IllegalThreadStateException in topic watcher for Kafka mirroring > ---------------------------------------------------------------- > > Key: KAFKA-212 > URL: https://issues.apache.org/jira/browse/KAFKA-212 > Project: Kafka > Issue Type: Bug > Reporter: Neha Narkhede > Assignee: Neha Narkhede > Fix For: 0.7.1 > > Attachments: KAFKA-212.patch > > > If the kafka mirroring embedded consumer receives a new topic watcher > notification, it runs into the following exception > [2011-11-23 02:49:15,612] FATAL java.lang.IllegalThreadStateException > (kafka.consumer.ZookeeperTopicEventWatcher) > [2011-11-23 02:49:15,612] FATAL java.lang.IllegalThreadStateException > at java.lang.Thread.start(Thread.java:595) > at > kafka.server.EmbeddedConsumer$$anonfun$startNewConsumerThreads$3.apply(KafkaServerStartable.scala:142) > at > kafka.server.EmbeddedConsumer$$anonfun$startNewConsumerThreads$3.apply(KafkaServerStartable.scala:142) > at > scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) > at scala.collection.immutable.List.foreach(List.scala:45) > at > kafka.server.EmbeddedConsumer.startNewConsumerThreads(KafkaServerStartable.scala:142) > at > kafka.server.EmbeddedConsumer.handleTopicEvent(KafkaServerStartable.scala:109) > at > kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.liftedTree2$1(ZookeeperTopicEventWatcher.scala:83) > at > kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.handleChildChange(ZookeeperTopicEventWatcher.scala:78) > at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > (kafka.consumer.ZookeeperTopicEventWatcher) > This happens since it tries to start a thread which has finished executing -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira