This was fixed in commit 6ab9b1ecd8 for KAFKA-1235 and it looks like that
will only be included in 0.8.2.

Guozhang, it looks like you wrote the patch, Jun reviewed it, but the bug
is still open and there's a comment that moved it to 0.9 after the commit
was already made. Was the commit a mistake or did we just forget to close
it?

On Tue, Jan 27, 2015 at 10:20 AM, Rajiv Kurian <ra...@signalfuse.com> wrote:

> Here is the relevant stack trace:
>
> java.nio.channels.UnresolvedAddressException: null
>
>         at sun.nio.ch.Net.checkAddress(Net.java:127) ~[na:1.7.0_55]
>
>         at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:644)
> ~[na:1.7.0_55]
>
>         at kafka.network.BlockingChannel.connect(Unknown Source)
> ~[kafka_2.10-0.8.0.jar:0.8.0]
>
>         at kafka.consumer.SimpleConsumer.connect(Unknown Source)
> ~[kafka_2.10-0.8.0.jar:0.8.0]
>
>         at kafka.consumer.SimpleConsumer.getOrMakeConnection(Unknown
> Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
>
>         at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(Unknown
> Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
>
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Unknown
> Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
>
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
> Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
>
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
> Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
>
>         at kafka.metrics.KafkaTimer.time(Unknown Source)
> ~[kafka_2.10-0.8.0.jar:0.8.0]
>
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(Unknown Source)
> ~[kafka_2.10-0.8.0.jar:0.8.0]
>
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
> Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
>
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
> Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
>
>         at kafka.metrics.KafkaTimer.time(Unknown Source)
> ~[kafka_2.10-0.8.0.jar:0.8.0]
>
>         at kafka.consumer.SimpleConsumer.fetch(Unknown Source)
> ~[kafka_2.10-0.8.0.jar:0.8.0]
>
>         at kafka.javaapi.consumer.SimpleConsumer.fetch(Unknown Source)
> ~[kafka_2.10-0.8.0.jar:0.8.0]
>
> On Tue, Jan 27, 2015 at 10:19 AM, Rajiv Kurian <ra...@signalfuse.com>
> wrote:
>
> > I am using 0.8.1. The source is here:
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
> >
> > Here is the definition of disconnect():
> > private def disconnect() = {
> >     if(blockingChannel.isConnected) {
> >       debug("Disconnecting from " + host + ":" + port)
> >       blockingChannel.disconnect()
> >     }
> >   }
> > It checks if blockingChannel.isConnected before calling
> > blockingChannel.disconnect(). I think if there is an
> > UnresolvedAddressException, the isConnected is never set and the
> > blockingChannel.disconnect() is never called. But by this point we have
> > already created a socket and will leak it.
> >
> > The same problem might be present in the connect method of the
> > BlockingChannel at
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/network/BlockingChannel.scala
> .
> > Though its own disconnect method seems to check for both the connected:
> >
> > def disconnect() = lock synchronized {
> >     // My comment: connected will not be set if we get an
> > UnresolvedAddressException but channel should NOT  be null, so we will
> > probably still do the right thing.
> >     if(connected || channel != null) {
> >       // closing the main socket channel *should* close the read channel
> >       // but let's do it to be sure.
> >       swallow(channel.close())
> >       swallow(channel.socket.close())
> >       if(readChannel != null) swallow(readChannel.close())
> >       channel = null; readChannel = null; writeChannel = null
> >       connected = false
> >     }
> >   }
> >
> >
> >
> > On Tue, Jan 27, 2015 at 9:03 AM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> >> Rajiv,
> >>
> >> Which version of Kafka are you using? I just checked SimpleConsumer's
> >> code,
> >> and in its close() function, disconnect() is called, which will close
> the
> >> socket.
> >>
> >> Guozhang
> >>
> >>
> >> On Mon, Jan 26, 2015 at 2:36 PM, Rajiv Kurian <ra...@signalfuse.com>
> >> wrote:
> >>
> >> > Meant to write a run loop.
> >> >
> >> > void run() {
> >> >   while (running) {
> >> >     if (simpleConsumer == null) {
> >> >       simpleConsumer = new SimpleConsumer(host, port,
> >> > (int) kafkaSocketTimeout, kafkaRExeiveBufferSize, clientName);
> >> >     }
> >> >     try {
> >> >       // Do stuff with simpleConsumer.
> >> >     } catch (Exception e) {
> >> >       logger.error(e);  // Assume UnresolvedAddressException.
> >> >       if (consumer != null) {
> >> >         simpleConsumer.close();
> >> >         simpleConsumer = null;
> >> >       }
> >> >     }
> >> >   }
> >> > }
> >> >
> >> > On Mon, Jan 26, 2015 at 2:27 PM, Rajiv Kurian <ra...@signalfuse.com>
> >> > wrote:
> >> >
> >> > > Here is my typical flow:
> >> > > void run() {
> >> > >   if (simpleConsumer == null) {
> >> > >     simpleConsumer = new SimpleConsumer(host, port, (int)
> >> > kafkaSocketTimeout,
> >> > > kafkaRExeiveBufferSize, clientName);
> >> > >   }
> >> > >   try {
> >> > >     // Do stuff with simpleConsumer.
> >> > >    } catch (Exception e) {
> >> > >      if (consumer != null) {
> >> > >        simpleConsumer.close();
> >> > >        simpleConsumer = null;
> >> > >      }
> >> > >   }
> >> > > }
> >> > >
> >> > > If there is a problem with the host name, or some DNS issues, we get
> >> an
> >> > > UnresolvedAddressException as expected and attempt to close the
> >> > > simpleConsumer. However this does not really get rid of the
> underlying
> >> > > socket. So we end up leaking a FD every time this happens. Though
> >> this is
> >> > > not a common case I think there needs to be a way on the
> >> SimpleConsumer
> >> > to
> >> > > get rid of all OS resources that it is holding onto. Right now if
> this
> >> > > keeps happening the number of FDs being consumed by the process
> keeps
> >> > > increasing till we hit the OS limits. As a user I cannot do anything
> >> else
> >> > > but call simpleConsumer.close(). We need to be able to close the
> >> > underlying
> >> > > socketChannel/socket when this kind of an error happens.
> >> > >
> >> > > To reproduce, one can just run this code but just put in any garbage
> >> host
> >> > > name, running lsof -p while running this will show that the open FDs
> >> > > increases without limit.
> >> > >
> >> > > Thanks,
> >> > > Rajiv
> >> > >
> >> > >
> >> >
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
>



-- 
Thanks,
Ewen

Reply via email to