Re: SimpleConsumer leaks sockets on an UnresolvedAddressException

2015-01-27 Thread Rajiv Kurian
I am using 0.8.1. The source is here: https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala Here is the definition of disconnect(): private def disconnect() = { if(blockingChannel.isConnected) { debug(Disconnecting from + host + : + port)

Re: SimpleConsumer leaks sockets on an UnresolvedAddressException

2015-01-27 Thread Rajiv Kurian
Here is the relevant stack trace: java.nio.channels.UnresolvedAddressException: null at sun.nio.ch.Net.checkAddress(Net.java:127) ~[na:1.7.0_55] at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:644) ~[na:1.7.0_55] at

Re: SimpleConsumer leaks sockets on an UnresolvedAddressException

2015-01-27 Thread Guozhang Wang
Ewen, you are right, the patch is committed on Feb.20th last year, I will leave a comment and close that ticket. On Tue, Jan 27, 2015 at 7:24 PM, Ewen Cheslack-Postava e...@confluent.io wrote: This was fixed in commit 6ab9b1ecd8 for KAFKA-1235 and it looks like that will only be included in

Re: SimpleConsumer leaks sockets on an UnresolvedAddressException

2015-01-27 Thread Guozhang Wang
Rajiv, Which version of Kafka are you using? I just checked SimpleConsumer's code, and in its close() function, disconnect() is called, which will close the socket. Guozhang On Mon, Jan 26, 2015 at 2:36 PM, Rajiv Kurian ra...@signalfuse.com wrote: Meant to write a run loop. void run() {

Re: SimpleConsumer leaks sockets on an UnresolvedAddressException

2015-01-27 Thread Ewen Cheslack-Postava
This was fixed in commit 6ab9b1ecd8 for KAFKA-1235 and it looks like that will only be included in 0.8.2. Guozhang, it looks like you wrote the patch, Jun reviewed it, but the bug is still open and there's a comment that moved it to 0.9 after the commit was already made. Was the commit a mistake

SimpleConsumer leaks sockets on an UnresolvedAddressException

2015-01-26 Thread Rajiv Kurian
Here is my typical flow: void run() { if (simpleConsumer == null) { simpleConsumer = new SimpleConsumer(host, port, (int) kafkaSocketTimeout, kafkaRExeiveBufferSize, clientName); } try { // Do stuff with simpleConsumer. } catch (Exception e) { if (consumer != null) {

Re: SimpleConsumer leaks sockets on an UnresolvedAddressException

2015-01-26 Thread Rajiv Kurian
Meant to write a run loop. void run() { while (running) { if (simpleConsumer == null) { simpleConsumer = new SimpleConsumer(host, port, (int) kafkaSocketTimeout, kafkaRExeiveBufferSize, clientName); } try { // Do stuff with simpleConsumer. } catch (Exception e) {