This is fixed in https://issues.apache.org/jira/browse/KAFKA-1228 and will
be included in 0.8.1 release.

Thanks,

Jun


On Wed, Feb 12, 2014 at 6:28 PM, Priya Matpadi
<priya.matp...@ecofactor.com>wrote:

> Hello,
> Is there any progress on this issue? We also experience socket leak in case
> of network outage.
> Thanks,
> Priya
>
>
> On Fri, Jan 24, 2014 at 7:30 AM, Jun Rao <jun...@gmail.com> wrote:
>
> > Thanks for find this out. We probably should disconnect on any exception.
> > Could you file a jira and perhaps attach a patch?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Fri, Jan 24, 2014 at 6:06 AM, Ahmy Yulrizka <a...@yulrizka.com>
> wrote:
> >
> > > Hi,
> > >
> > > I Think I found out the problem..
> > >
> > > this is part of the stack trace. First i think there is connection
> > problem,
> > > and when connection restore it get new information from the zookeeper
> > >
> > > [2014-01-23 23:24:55,391] INFO Opening socket connection to server
> > > host2.provider.com/2.2.2.2:2181 (org.apache.zookeeper.ClientCnxn)
> > > [2014-01-23 23:24:55,391] INFO Socket connection established to
> > > host2.provider.com/2.2.2.2:2181, initiating session
> > > (org.apache.zookeeper.ClientCnxn)
> > > [2014-01-23 23:24:55,391] DEBUG Session establishment request sent on
> > > host2.provider.com/2.2.2.2:2181 (org.apache.zookeeper.ClientCnxn)
> > > [2014-01-23 23:24:57,393] INFO Client session timed out, have not heard
> > > from server in 2002ms for sessionid 0x0, closing socket connection and
> > > attempting reconnect (org.apache.zookeeper.ClientCnxn)
> > > [2014-01-23 23:24:58,047] INFO Opening socket connection to server
> > > host3.provider.com/3.3.3.3:2181 (org.apache.zookeeper.ClientCnxn)
> > > [2014-01-23 23:24:59,048] INFO Socket connection established to
> > > host3.provider.com/3.3.3.3:2181, initiating session
> > > (org.apache.zookeeper.ClientCnxn)
> > > [2014-01-23 23:24:59,049] DEBUG Session establishment request sent on
> > > host3.provider.com/3.3.3.3:2181 (org.apache.zookeeper.ClientCnxn)
> > > [2014-01-23 23:24:59,539] INFO Session establishment complete on server
> > > host3.provider.com/3.3.3.3:2181, sessionid = 0x343c13436e50001,
> > negotiated
> > > timeout = 6000 (org.apache.zookeeper.ClientCnxn)
> > > [2014-01-23 23:24:59,539] DEBUG Received event: WatchedEvent
> > > state:SyncConnected type:None path:null (org.I0Itec.zkclient.ZkClient)
> > > [2014-01-23 23:24:59,539] INFO zookeeper state changed (SyncConnected)
> > > (org.I0Itec.zkclient.ZkClient)
> > > [2014-01-23 23:24:59,539] DEBUG New event: ZkEvent[State changed to
> > > SyncConnected sent to
> > > kafka.server.KafkaZooKeeper$SessionExpireListener@163e1f27]
> > > (org.I0Itec.zkclient.ZkEventThread)
> > > [2014-01-23 23:24:59,539] DEBUG New event: ZkEvent[State changed to
> > > SyncConnected sent to
> > > kafka.controller.KafkaController$SessionExpirationListener@486f44d9]
> > > (org.I0Itec.zkclient.ZkEventThread)
> > > [2014-01-23 23:24:59,539] DEBUG Leaving process event
> > > (org.I0Itec.zkclient.ZkClient)
> > > [2014-01-23 23:24:59,539] DEBUG State is SyncConnected
> > > (org.I0Itec.zkclient.ZkClient)
> > > [2014-01-23 23:24:59,540] DEBUG State is SyncConnected
> > > (org.I0Itec.zkclient.ZkClient)
> > >
> > > Then the ReplicaFetcher Thread tries to reconnect.
> > > At this point it tries to connect to other brokers. but then it can't
> > > resolve the IP address and throws :
> > > "java.nio.channels.UnresolvedAddressException"
> > >
> > > [2014-01-23 23:25:01,067] WARN [ReplicaFetcherThread-0-1], Error in
> fetch
> > > Name: FetchRequest; Version: 0; CorrelationId: 478411; ClientId:
> > > ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1
> > bytes;
> > > RequestInfo: [some-topic,0] ->
> > > PartitionFetchInfo(1247662,1048576),[some-topic,3] ->
> > > PartitionFetchInfo(839677,1048576) (kafka.server.ReplicaFetcherThread)
> > > java.nio.channels.UnresolvedAddressException
> > > at sun.nio.ch.Net.checkAddress(Net.java:89)
> > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:510)
> > >  at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> > >  at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> > >  at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> > >  at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > >  at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> > >  at
> > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> > > at
> > >
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> > >  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > [2014-01-23 23:25:01,077] INFO Reconnect due to socket error: null
> > > (kafka.consumer.SimpleConsumer)
> > > [2014-01-23 23:25:01,078] WARN [ReplicaFetcherThread-0-1], Error in
> fetch
> > > Name: FetchRequest; Version: 0; CorrelationId: 478412; ClientId:
> > > ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1
> > bytes;
> > > RequestInfo: [some-topic,0] ->
> > > PartitionFetchInfo(1247662,1048576),[some-topic,3] ->
> > > PartitionFetchInfo(839677,1048576) (kafka.server.ReplicaFetcherThread)
> > > java.nio.channels.UnresolvedAddressException
> > > at sun.nio.ch.Net.checkAddress(Net.java:89)
> > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:510)
> > >  at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> > >  at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> > >  at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> > >  at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > >  at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> > >  at
> > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> > > at
> > >
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> > >  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > [2014-01-23 23:25:01,079] INFO Reconnect due to socket error: null
> > > (kafka.consumer.SimpleConsumer)
> > >
> > >
> > > it throws bunch of this error until too many open file...
> > >
> > > [2014-01-23 23:25:03,756] INFO Reconnect due to socket error: null
> > > (kafka.consumer.SimpleConsumer)
> > > [2014-01-23 23:25:03,756] WARN [ReplicaFetcherThread-0-1], Error in
> fetch
> > > Name: FetchRequest; Version: 0; CorrelationId: 482406; ClientId:
> > > ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1
> > bytes;
> > > RequestInfo: [some-topic,0] ->
> > > PartitionFetchInfo(1247662,1048576),[some-topic,2] ->
> > > PartitionFetchInfo(1135106,1048576),[some-topic,3] ->
> > > PartitionFetchInfo(839677,1048576),[some-topic,1] ->
> > > PartitionFetchInfo(1273826,1048576) (kafka.server.ReplicaFetcherThread)
> > > java.nio.channels.UnresolvedAddressException
> > > at sun.nio.ch.Net.checkAddress(Net.java:89)
> > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:510)
> > >  at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> > >  at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> > >  at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> > >  at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > >  at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> > >  at
> > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> > > at
> > >
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> > >  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > [2014-01-23 23:25:03,757] INFO Reconnect due to socket error: null
> > > (kafka.consumer.SimpleConsumer)
> > > [2014-01-23 23:25:03,757] WARN [ReplicaFetcherThread-0-1], Error in
> fetch
> > > Name: FetchRequest; Version: 0; CorrelationId: 482407; ClientId:
> > > ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1
> > bytes;
> > > RequestInfo: [some-topic,0] ->
> > > PartitionFetchInfo(1247662,1048576),[some-topic,2] ->
> > > PartitionFetchInfo(1135106,1048576),[some-topic,3] ->
> > > PartitionFetchInfo(839677,1048576),[some-topic,1] ->
> > > PartitionFetchInfo(1273826,1048576) (kafka.server.ReplicaFetcherThread)
> > > java.net.SocketException: Too many open files
> > > at sun.nio.ch.Net.socket0(Native Method)
> > > at sun.nio.ch.Net.socket(Net.java:156)
> > >  at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:102)
> > > at
> > >
> > >
> >
> sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:55)
> > >  at java.nio.channels.SocketChannel.open(SocketChannel.java:122)
> > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:48)
> > >  at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> > > at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > >  at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > >  at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > >  at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > >  at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > >  at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > >  at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> > > at
> > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> > >  at
> > >
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > [2014-01-23 23:25:03,757] INFO Reconnect due to socket error: null
> > > (kafka.consumer.SimpleConsumer)
> > > [2014-01-23 23:25:03,757] WARN [ReplicaFetcherThread-0-1], Error in
> fetch
> > > Name: FetchRequest; Version: 0; CorrelationId: 482408; ClientId:
> > > ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1
> > bytes;
> > > RequestInfo: [some-topic,0] ->
> > > PartitionFetchInfo(1247662,1048576),[some-topic,2] ->
> > > PartitionFetchInfo(1135106,1048576),[some-topic,3] ->
> > > PartitionFetchInfo(839677,1048576),[some-topic,1] ->
> > > PartitionFetchInfo(1273826,1048576) (kafka.server.ReplicaFetcherThread)
> > > java.net.SocketException: Too many open files
> > > at sun.nio.ch.Net.socket0(Native Method)
> > > at sun.nio.ch.Net.socket(Net.java:156)
> > >  at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:102)
> > > at
> > >
> > >
> >
> sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:55)
> > >  at java.nio.channels.SocketChannel.open(SocketChannel.java:122)
> > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:48)
> > >  at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> > > at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > >  at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > >  at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > >  at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > >  at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > >  at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > >  at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> > > at
> > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> > >  at
> > >
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > [2014-01-23 23:25:03,758] INFO Reconnect due to socket error: null
> > > (kafka.consumer.SimpleConsumer)
> > >
> > >
> > >
> > > I look into the source code of
> > > core/src/main/scala/kafka/consumer/SimpleConsumer.scala line 79,
> > >
> > > it tries to reconnect(). but only close the connection when it is
> > > "java.io.IOException" which did not catch
> > > "java.nio.channels.UnresolvedAddressException"
> > >
> > >   private def sendRequest(request: RequestOrResponse): Receive = {
> > >     lock synchronized {
> > >       getOrMakeConnection()
> > >       var response: Receive = null
> > >       try {
> > >         blockingChannel.send(request)
> > >         response = blockingChannel.receive()
> > >       } catch {
> > >         case e : java.io.IOException =>
> > >           info("Reconnect due to socket error:
> %s".format(e.getMessage))
> > >           // retry once
> > >           try {
> > >             reconnect()
> > >             blockingChannel.send(request)
> > >             response = blockingChannel.receive()
> > >           } catch {
> > >             case ioe: java.io.IOException =>
> > >               disconnect()
> > >               throw ioe
> > >           }
> > >         case e: Throwable => throw e
> > >       }
> > >       response
> > >     }
> > >   }
> > >
> > > This is my production setting
> > >
> > > OS: Ubuntu 12.04
> > > kafka : kafka_2.8.0-0.8.0.jar
> > >
> > > java :
> > > java version "1.6.0_27"
> > > OpenJDK Runtime Environment (IcedTea6 1.12.6)
> > > (6b27-1.12.6-1ubuntu0.12.04.4)
> > > OpenJDK 64-Bit Server VM (build 20.0-b12, mixed mode)
> > >
> > > Some interesting fact that if i one of the file descriptor of the
> process
> > > using
> > >
> > > $ gdb -p KAFKA_PID
> > > > call close(4567)
> > >
> > > it reuse that file description and establish connection to a consumer
> > >
> > > java       9708      kafka 4087u     sock                0,7
> 0t0
> > > 3258461771 can't identify protocol
> > > java       9708      kafka 4088u     IPv4         3441430493
> 0t0
> > >      TCP host2.provider.com:9092->consumer.host.com:38208(ESTABLISHED)
> > > java       9708      kafka 4089u     sock                0,7
> 0t0
> > > 3258461773 can't identify protocol
> > > java       9708      kafka 4090u     sock                0,7
> 0t0
> > > 3258461774 can't identify protocol
> > >
> > >
> > >
> > >
> > > --
> > > Ahmy Yulrizka
> > > http://ahmy.yulrizka.com
> > > @yulrizka
> > >
> > >
> > > On Wed, Jan 22, 2014 at 5:41 AM, Jun Rao <jun...@gmail.com> wrote:
> > >
> > > > Hmm, without knowing the client ip, it's hard to tell whether those
> are
> > > > from replication fetcher threads or not. Are most of those
> connections
> > in
> > > > established mode?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Tue, Jan 21, 2014 at 8:06 AM, Ahmy Yulrizka <a...@yulrizka.com>
> > > wrote:
> > > >
> > > > > this is the the line i copied on lsof
> > > > >
> > > > > ...
> > > > > java      11818      kafka   98u     sock                0,7
> > 0t0
> > > > >  615628183 can't identify protocol
> > > > > java      11818      kafka   99u     IPv4          615077352
> > 0t0
> > > > >    TCP somedomain.com:9092->121-123-123-123.someprovider.net:37547
> > > > > (CLOSE_WAIT)
> > > > > java      11818      kafka  100u     IPv4          615077353
> > 0t0
> > > > >    TCP somedomain.com:9092->121-123-123-123.someprovider.net:37553
> > > > > (ESTABLISHED)
> > > > > java      11818      kafka  101u     sock                0,7
> > 0t0
> > > > >  615628184 can't identify protocol
> > > > > java      11818      kafka  102u     sock                0,7
> > 0t0
> > > > >  615628185 can't identify protocol
> > > > > java      11818      kafka  103u     sock                0,7
> > 0t0
> > > > >  615628186 can't identify protocol
> > > > > ...
> > > > >
> > > > > as you can see, from the output, i could see the connection state
> on
> > > some
> > > > > of the TCP, but the sock only gives information "can't identify
> > > protocol"
> > > > > so I could not see where or from this sock is originating
> > > > >
> > > > > I could not see the connection also when i run netstat -nat
> > > > >
> > > > > --
> > > > > Ahmy Yulrizka
> > > > > http://ahmy.yulrizka.com
> > > > > @yulrizka
> > > > >
> > > > >
> > > > > On Tue, Jan 21, 2014 at 4:42 PM, Jun Rao <jun...@gmail.com> wrote:
> > > > >
> > > > > > What mode are those sockets in (established, closed, etc)? Also,
> > from
> > > > the
> > > > > > ip, could you tell whether those sockets are from the client or
> > from
> > > > the
> > > > > > replica fetcher in the brokers.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > >
> > > > > > On Tue, Jan 21, 2014 at 3:29 AM, Ahmy Yulrizka <
> a...@yulrizka.com>
> > > > > wrote:
> > > > > >
> > > > > > > We are running 3 kafka nodes, which servers 4 partition.
> > > > > > > We have been experiencing weird behavior during network outage.
> > > > > > >
> > > > > > > we had been experiencing twice in the last couple of days. the
> > > > previous
> > > > > > one
> > > > > > > took down all of the cluster.
> > > > > > > while this one only 2 out of 3 survive. and 1 node became the
> > > leader
> > > > of
> > > > > > all
> > > > > > > partition, and other node only in ISR of 1 partition (out of 4)
> > > > > > >
> > > > > > > my best guess now is that when the network down, the broker
> can't
> > > > > connect
> > > > > > > to other broker to do replication and keep opening the socket
> > > > > > > without closing it. But I'm not entirely sure about this.
> > > > > > >
> > > > > > > Is there any way to mitigate the problem ? or is there any
> > > > > configuration
> > > > > > > options to stop this from happening again ?
> > > > > > >
> > > > > > >
> > > > > > > The java/kafka process open too many socket file descriptor.
> > > > > > > running `lsof -a -p 11818` yield thousand of this line.
> > > > > > >
> > > > > > > ...
> > > > > > > java    11818 kafka 3059u  sock                0,7       0t0
> > > > 615637305
> > > > > > > can't identify protocol
> > > > > > > java    11818 kafka 3060u  sock                0,7       0t0
> > > > 615637306
> > > > > > > can't identify protocol
> > > > > > > java    11818 kafka 3061u  sock                0,7       0t0
> > > > 615637307
> > > > > > > can't identify protocol
> > > > > > > java    11818 kafka 3062u  sock                0,7       0t0
> > > > 615637308
> > > > > > > can't identify protocol
> > > > > > > java    11818 kafka 3063u  sock                0,7       0t0
> > > > 615637309
> > > > > > > can't identify protocol
> > > > > > > java    11818 kafka 3064u  sock                0,7       0t0
> > > > 615637310
> > > > > > > can't identify protocol
> > > > > > > java    11818 kafka 3065u  sock                0,7       0t0
> > > > 615637311
> > > > > > > can't identify protocol
> > > > > > > ...
> > > > > > >
> > > > > > > i verify that the the open socket did not close when i repeated
> > the
> > > > > > command
> > > > > > > after 2 minutes.
> > > > > > >
> > > > > > >
> > > > > > > and the kafka log on the broken node, generate lots of error
> like
> > > > this:
> > > > > > >
> > > > > > > [2014-01-21 04:21:48,819]  64573925 [kafka-acceptor] ERROR
> > > > > > > kafka.network.Acceptor  - Error in acceptor
> > > > > > > java.io.IOException: Too many open files
> > > > > > >         at sun.nio.ch.ServerSocketChannelImpl.accept0(Native
> > > Method)
> > > > > > >         at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:165)
> > > > > > >         at
> kafka.network.Acceptor.accept(SocketServer.scala:200)
> > > > > > >         at kafka.network.Acceptor.run(SocketServer.scala:154)
> > > > > > >         at java.lang.Thread.run(Thread.java:701)
> > > > > > > [2014-01-21 04:21:48,819]  64573925 [kafka-acceptor] ERROR
> > > > > > > kafka.network.Acceptor  - Error in acceptor
> > > > > > > java.io.IOException: Too many open files
> > > > > > >         at sun.nio.ch.ServerSocketChannelImpl.accept0(Native
> > > Method)
> > > > > > >         at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:165)
> > > > > > >         at
> kafka.network.Acceptor.accept(SocketServer.scala:200)
> > > > > > >         at kafka.network.Acceptor.run(SocketServer.scala:154)
> > > > > > >         at java.lang.Thread.run(Thread.java:701)
> > > > > > > [2014-01-21 04:21:48,811]  64573917 [ReplicaFetcherThread-0-1]
> > INFO
> > > > > > >  kafka.consumer.SimpleConsumer  - Reconnect due to socket
> error:
> > > null
> > > > > > > [2014-01-21 04:21:48,819]  64573925 [ReplicaFetcherThread-0-1]
> > WARN
> > > > > > >  kafka.server.ReplicaFetcherThread  -
> [ReplicaFetcherThread-0-1],
> > > > Error
> > > > > > in
> > > > > > > fetch Name: FetchRequest; Version: 0; CorrelationId: 74930218;
> > > > > ClientId:
> > > > > > > ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms;
> > MinBytes:
> > > 1
> > > > > > bytes;
> > > > > > > RequestInfo: [some-topic,0] ->
> > > > > > > PartitionFetchInfo(959825,1048576),[some-topic,3] ->
> > > > > > > PartitionFetchInfo(551546,1048576)
> > > > > > > java.net.SocketException: Too many open files
> > > > > > >         at sun.nio.ch.Net.socket0(Native Method)
> > > > > > >         at sun.nio.ch.Net.socket(Net.java:156)
> > > > > > >         at
> > > > > > sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:102)
> > > > > > >         at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:55)
> > > > > > >         at
> > > > java.nio.channels.SocketChannel.open(SocketChannel.java:122)
> > > > > > >         at
> > > > > > kafka.network.BlockingChannel.connect(BlockingChannel.scala:48)
> > > > > > >         at
> > > > > kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> > > > > > >         at
> > > > > > kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > > > > > >         at
> > > > > > >
> > > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> > > > > > >         at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > > > > > >         at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> > > > > > >         at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > > > > > >         at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > > > > > >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > > > >         at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > > > > > >         at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > > > > > >         at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > > > > > >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > > > >         at
> > > > > kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> > > > > > >         at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> > > > > > >         at
> > > > > > >
> > > > >
> > >
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> > > > > > >         at
> > > > > > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > Ahmy Yulrizka
> > > > > > > http://ahmy.yulrizka.com
> > > > > > > @yulrizka
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to