Further more, the problem is not just restricted to ReplicaFetcherThread. Kafka consumer server also leaks sockets due to SendThread using same code . See below stack trace:
2014-01-23 06:48:09,699 INFO [org.apache.zookeeper.ClientCnxn] (OurKafkaMessageFetcher-blah1-SendThread(pkafka3.our.com:2181)) Opening socket connection to server pkafka2.our.com/10.58.0.191:2181 2014-01-23 06:48:10,124 INFO [org.apache.zookeeper.ClientCnxn] (OurKafkaMessageFetcher-blah2-SendThread(pkafka1.our.com:2181)) Opening socket connection to server pkafka2.our.com/10.58.0.191:2181 2014-01-23 06:48:10,272 INFO [org.apache.zookeeper.ClientCnxn] (OurKafkaMessageFetcher-blah3-SendThread(pkafka2.our.com:2181)) Opening socket connection to server 10.58.0.190/10.58.0.190:2181 2014-01-23 06:48:11,033 INFO [org.apache.zookeeper.ClientCnxn] (OurKafkaMessageFetcher-blah1-SendThread(pkafka2.our.com:2181)) Client session timed out, have not heard from server in 3257ms for sessionid 0x3436ae2b16a0071, closing socket connection and attempting reconnect 2014-01-23 06:48:11,070 WARN [kafka.consumer.ConsumerFetcherThread] (ConsumerFetcherThread-blah1KafkaGroup_pjb1.our.com-1390441665650-635d2718-0-2) [ConsumerFetcherThread-blah1KafkaGroup_pjb1.our.com-1390441665650-635d2718-0-2], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 176434; ClientId: blah1KafkaGroup-ConsumerFetcherThread-blah1KafkaGroup_pjb1.our.com-1390441665650-635d2718-0-2; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [blah,0] -> PartitionFetchInfo(0,1048576),[blah,4] -> PartitionFetchInfo(0,1048576) java.nio.channels.UnresolvedAddressException at sun.nio.ch.Net.checkAddress(Net.java:30) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:512) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44) at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)