That is exactly what we are doing. But in the shutdown path we see the following exception.
2013-12-19 23:21:54,249 FATAL [kafka.consumer.ZookeeperConsumerConnector] (EFKafkaMessageFetcher-retryLevelThree) [TestretryLevelThreeKafkaGroup_pshah-MacBook-Pro.local-1387494830478-efdbacea], error during consumer connector shutdown java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1279) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207) at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36) at kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:132) at kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:165) at kafka.javaapi.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:105) at com.ecofactor.kafka.consumer.AbstractEFKafkaThreadPooledConsumer$KafkaMessageFetcher.disconnect(AbstractEFKafkaThreadPooledConsumer.java:106) at com.ecofactor.kafka.consumer.AbstractEFKafkaThreadPooledConsumer.shutdown(AbstractEFKafkaThreadPooledConsumer.java:399) at com.ecofactor.kafka.consumer.AbstractRetryTopicConsumer.handleTimeouts(AbstractRetryTopicConsumer.java:127) at com.ecofactor.kafka.consumer.AbstractEFKafkaThreadPooledConsumer$KafkaMessageFetcher.run(AbstractEFKafkaThreadPooledConsumer.java:141) -- The result of the above exception is that the rest of shutdown logic is not getting executed. This involves closing the zookeeper connections.