Hello,
Is there any progress on this issue? We also experience socket leak in case
of network outage.
Thanks,
Priya


On Fri, Jan 24, 2014 at 7:30 AM, Jun Rao <jun...@gmail.com> wrote:

> Thanks for find this out. We probably should disconnect on any exception.
> Could you file a jira and perhaps attach a patch?
>
> Thanks,
>
> Jun
>
>
> On Fri, Jan 24, 2014 at 6:06 AM, Ahmy Yulrizka <a...@yulrizka.com> wrote:
>
> > Hi,
> >
> > I Think I found out the problem..
> >
> > this is part of the stack trace. First i think there is connection
> problem,
> > and when connection restore it get new information from the zookeeper
> >
> > [2014-01-23 23:24:55,391] INFO Opening socket connection to server
> > host2.provider.com/2.2.2.2:2181 (org.apache.zookeeper.ClientCnxn)
> > [2014-01-23 23:24:55,391] INFO Socket connection established to
> > host2.provider.com/2.2.2.2:2181, initiating session
> > (org.apache.zookeeper.ClientCnxn)
> > [2014-01-23 23:24:55,391] DEBUG Session establishment request sent on
> > host2.provider.com/2.2.2.2:2181 (org.apache.zookeeper.ClientCnxn)
> > [2014-01-23 23:24:57,393] INFO Client session timed out, have not heard
> > from server in 2002ms for sessionid 0x0, closing socket connection and
> > attempting reconnect (org.apache.zookeeper.ClientCnxn)
> > [2014-01-23 23:24:58,047] INFO Opening socket connection to server
> > host3.provider.com/3.3.3.3:2181 (org.apache.zookeeper.ClientCnxn)
> > [2014-01-23 23:24:59,048] INFO Socket connection established to
> > host3.provider.com/3.3.3.3:2181, initiating session
> > (org.apache.zookeeper.ClientCnxn)
> > [2014-01-23 23:24:59,049] DEBUG Session establishment request sent on
> > host3.provider.com/3.3.3.3:2181 (org.apache.zookeeper.ClientCnxn)
> > [2014-01-23 23:24:59,539] INFO Session establishment complete on server
> > host3.provider.com/3.3.3.3:2181, sessionid = 0x343c13436e50001,
> negotiated
> > timeout = 6000 (org.apache.zookeeper.ClientCnxn)
> > [2014-01-23 23:24:59,539] DEBUG Received event: WatchedEvent
> > state:SyncConnected type:None path:null (org.I0Itec.zkclient.ZkClient)
> > [2014-01-23 23:24:59,539] INFO zookeeper state changed (SyncConnected)
> > (org.I0Itec.zkclient.ZkClient)
> > [2014-01-23 23:24:59,539] DEBUG New event: ZkEvent[State changed to
> > SyncConnected sent to
> > kafka.server.KafkaZooKeeper$SessionExpireListener@163e1f27]
> > (org.I0Itec.zkclient.ZkEventThread)
> > [2014-01-23 23:24:59,539] DEBUG New event: ZkEvent[State changed to
> > SyncConnected sent to
> > kafka.controller.KafkaController$SessionExpirationListener@486f44d9]
> > (org.I0Itec.zkclient.ZkEventThread)
> > [2014-01-23 23:24:59,539] DEBUG Leaving process event
> > (org.I0Itec.zkclient.ZkClient)
> > [2014-01-23 23:24:59,539] DEBUG State is SyncConnected
> > (org.I0Itec.zkclient.ZkClient)
> > [2014-01-23 23:24:59,540] DEBUG State is SyncConnected
> > (org.I0Itec.zkclient.ZkClient)
> >
> > Then the ReplicaFetcher Thread tries to reconnect.
> > At this point it tries to connect to other brokers. but then it can't
> > resolve the IP address and throws :
> > "java.nio.channels.UnresolvedAddressException"
> >
> > [2014-01-23 23:25:01,067] WARN [ReplicaFetcherThread-0-1], Error in fetch
> > Name: FetchRequest; Version: 0; CorrelationId: 478411; ClientId:
> > ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1
> bytes;
> > RequestInfo: [some-topic,0] ->
> > PartitionFetchInfo(1247662,1048576),[some-topic,3] ->
> > PartitionFetchInfo(839677,1048576) (kafka.server.ReplicaFetcherThread)
> > java.nio.channels.UnresolvedAddressException
> > at sun.nio.ch.Net.checkAddress(Net.java:89)
> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:510)
> >  at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> >  at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> >  at
> >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> >  at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> >  at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> >  at
> >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> > at
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> >  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > [2014-01-23 23:25:01,077] INFO Reconnect due to socket error: null
> > (kafka.consumer.SimpleConsumer)
> > [2014-01-23 23:25:01,078] WARN [ReplicaFetcherThread-0-1], Error in fetch
> > Name: FetchRequest; Version: 0; CorrelationId: 478412; ClientId:
> > ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1
> bytes;
> > RequestInfo: [some-topic,0] ->
> > PartitionFetchInfo(1247662,1048576),[some-topic,3] ->
> > PartitionFetchInfo(839677,1048576) (kafka.server.ReplicaFetcherThread)
> > java.nio.channels.UnresolvedAddressException
> > at sun.nio.ch.Net.checkAddress(Net.java:89)
> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:510)
> >  at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> >  at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> >  at
> >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> >  at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> >  at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> >  at
> >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> > at
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> >  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > [2014-01-23 23:25:01,079] INFO Reconnect due to socket error: null
> > (kafka.consumer.SimpleConsumer)
> >
> >
> > it throws bunch of this error until too many open file...
> >
> > [2014-01-23 23:25:03,756] INFO Reconnect due to socket error: null
> > (kafka.consumer.SimpleConsumer)
> > [2014-01-23 23:25:03,756] WARN [ReplicaFetcherThread-0-1], Error in fetch
> > Name: FetchRequest; Version: 0; CorrelationId: 482406; ClientId:
> > ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1
> bytes;
> > RequestInfo: [some-topic,0] ->
> > PartitionFetchInfo(1247662,1048576),[some-topic,2] ->
> > PartitionFetchInfo(1135106,1048576),[some-topic,3] ->
> > PartitionFetchInfo(839677,1048576),[some-topic,1] ->
> > PartitionFetchInfo(1273826,1048576) (kafka.server.ReplicaFetcherThread)
> > java.nio.channels.UnresolvedAddressException
> > at sun.nio.ch.Net.checkAddress(Net.java:89)
> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:510)
> >  at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> >  at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> >  at
> >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> >  at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> >  at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> >  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> >  at
> >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> > at
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> >  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > [2014-01-23 23:25:03,757] INFO Reconnect due to socket error: null
> > (kafka.consumer.SimpleConsumer)
> > [2014-01-23 23:25:03,757] WARN [ReplicaFetcherThread-0-1], Error in fetch
> > Name: FetchRequest; Version: 0; CorrelationId: 482407; ClientId:
> > ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1
> bytes;
> > RequestInfo: [some-topic,0] ->
> > PartitionFetchInfo(1247662,1048576),[some-topic,2] ->
> > PartitionFetchInfo(1135106,1048576),[some-topic,3] ->
> > PartitionFetchInfo(839677,1048576),[some-topic,1] ->
> > PartitionFetchInfo(1273826,1048576) (kafka.server.ReplicaFetcherThread)
> > java.net.SocketException: Too many open files
> > at sun.nio.ch.Net.socket0(Native Method)
> > at sun.nio.ch.Net.socket(Net.java:156)
> >  at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:102)
> > at
> >
> >
> sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:55)
> >  at java.nio.channels.SocketChannel.open(SocketChannel.java:122)
> > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:48)
> >  at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> > at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> >  at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> > at
> >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> >  at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> >  at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >  at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> >  at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >  at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> > at
> >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> >  at
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > [2014-01-23 23:25:03,757] INFO Reconnect due to socket error: null
> > (kafka.consumer.SimpleConsumer)
> > [2014-01-23 23:25:03,757] WARN [ReplicaFetcherThread-0-1], Error in fetch
> > Name: FetchRequest; Version: 0; CorrelationId: 482408; ClientId:
> > ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1
> bytes;
> > RequestInfo: [some-topic,0] ->
> > PartitionFetchInfo(1247662,1048576),[some-topic,2] ->
> > PartitionFetchInfo(1135106,1048576),[some-topic,3] ->
> > PartitionFetchInfo(839677,1048576),[some-topic,1] ->
> > PartitionFetchInfo(1273826,1048576) (kafka.server.ReplicaFetcherThread)
> > java.net.SocketException: Too many open files
> > at sun.nio.ch.Net.socket0(Native Method)
> > at sun.nio.ch.Net.socket(Net.java:156)
> >  at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:102)
> > at
> >
> >
> sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:55)
> >  at java.nio.channels.SocketChannel.open(SocketChannel.java:122)
> > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:48)
> >  at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> > at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> >  at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> > at
> >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> >  at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> >  at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >  at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> >  at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >  at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> > at
> >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> >  at
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > [2014-01-23 23:25:03,758] INFO Reconnect due to socket error: null
> > (kafka.consumer.SimpleConsumer)
> >
> >
> >
> > I look into the source code of
> > core/src/main/scala/kafka/consumer/SimpleConsumer.scala line 79,
> >
> > it tries to reconnect(). but only close the connection when it is
> > "java.io.IOException" which did not catch
> > "java.nio.channels.UnresolvedAddressException"
> >
> >   private def sendRequest(request: RequestOrResponse): Receive = {
> >     lock synchronized {
> >       getOrMakeConnection()
> >       var response: Receive = null
> >       try {
> >         blockingChannel.send(request)
> >         response = blockingChannel.receive()
> >       } catch {
> >         case e : java.io.IOException =>
> >           info("Reconnect due to socket error: %s".format(e.getMessage))
> >           // retry once
> >           try {
> >             reconnect()
> >             blockingChannel.send(request)
> >             response = blockingChannel.receive()
> >           } catch {
> >             case ioe: java.io.IOException =>
> >               disconnect()
> >               throw ioe
> >           }
> >         case e: Throwable => throw e
> >       }
> >       response
> >     }
> >   }
> >
> > This is my production setting
> >
> > OS: Ubuntu 12.04
> > kafka : kafka_2.8.0-0.8.0.jar
> >
> > java :
> > java version "1.6.0_27"
> > OpenJDK Runtime Environment (IcedTea6 1.12.6)
> > (6b27-1.12.6-1ubuntu0.12.04.4)
> > OpenJDK 64-Bit Server VM (build 20.0-b12, mixed mode)
> >
> > Some interesting fact that if i one of the file descriptor of the process
> > using
> >
> > $ gdb -p KAFKA_PID
> > > call close(4567)
> >
> > it reuse that file description and establish connection to a consumer
> >
> > java       9708      kafka 4087u     sock                0,7         0t0
> > 3258461771 can't identify protocol
> > java       9708      kafka 4088u     IPv4         3441430493         0t0
> >      TCP host2.provider.com:9092->consumer.host.com:38208 (ESTABLISHED)
> > java       9708      kafka 4089u     sock                0,7         0t0
> > 3258461773 can't identify protocol
> > java       9708      kafka 4090u     sock                0,7         0t0
> > 3258461774 can't identify protocol
> >
> >
> >
> >
> > --
> > Ahmy Yulrizka
> > http://ahmy.yulrizka.com
> > @yulrizka
> >
> >
> > On Wed, Jan 22, 2014 at 5:41 AM, Jun Rao <jun...@gmail.com> wrote:
> >
> > > Hmm, without knowing the client ip, it's hard to tell whether those are
> > > from replication fetcher threads or not. Are most of those connections
> in
> > > established mode?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Tue, Jan 21, 2014 at 8:06 AM, Ahmy Yulrizka <a...@yulrizka.com>
> > wrote:
> > >
> > > > this is the the line i copied on lsof
> > > >
> > > > ...
> > > > java      11818      kafka   98u     sock                0,7
> 0t0
> > > >  615628183 can't identify protocol
> > > > java      11818      kafka   99u     IPv4          615077352
> 0t0
> > > >    TCP somedomain.com:9092->121-123-123-123.someprovider.net:37547
> > > > (CLOSE_WAIT)
> > > > java      11818      kafka  100u     IPv4          615077353
> 0t0
> > > >    TCP somedomain.com:9092->121-123-123-123.someprovider.net:37553
> > > > (ESTABLISHED)
> > > > java      11818      kafka  101u     sock                0,7
> 0t0
> > > >  615628184 can't identify protocol
> > > > java      11818      kafka  102u     sock                0,7
> 0t0
> > > >  615628185 can't identify protocol
> > > > java      11818      kafka  103u     sock                0,7
> 0t0
> > > >  615628186 can't identify protocol
> > > > ...
> > > >
> > > > as you can see, from the output, i could see the connection state on
> > some
> > > > of the TCP, but the sock only gives information "can't identify
> > protocol"
> > > > so I could not see where or from this sock is originating
> > > >
> > > > I could not see the connection also when i run netstat -nat
> > > >
> > > > --
> > > > Ahmy Yulrizka
> > > > http://ahmy.yulrizka.com
> > > > @yulrizka
> > > >
> > > >
> > > > On Tue, Jan 21, 2014 at 4:42 PM, Jun Rao <jun...@gmail.com> wrote:
> > > >
> > > > > What mode are those sockets in (established, closed, etc)? Also,
> from
> > > the
> > > > > ip, could you tell whether those sockets are from the client or
> from
> > > the
> > > > > replica fetcher in the brokers.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > > On Tue, Jan 21, 2014 at 3:29 AM, Ahmy Yulrizka <a...@yulrizka.com>
> > > > wrote:
> > > > >
> > > > > > We are running 3 kafka nodes, which servers 4 partition.
> > > > > > We have been experiencing weird behavior during network outage.
> > > > > >
> > > > > > we had been experiencing twice in the last couple of days. the
> > > previous
> > > > > one
> > > > > > took down all of the cluster.
> > > > > > while this one only 2 out of 3 survive. and 1 node became the
> > leader
> > > of
> > > > > all
> > > > > > partition, and other node only in ISR of 1 partition (out of 4)
> > > > > >
> > > > > > my best guess now is that when the network down, the broker can't
> > > > connect
> > > > > > to other broker to do replication and keep opening the socket
> > > > > > without closing it. But I'm not entirely sure about this.
> > > > > >
> > > > > > Is there any way to mitigate the problem ? or is there any
> > > > configuration
> > > > > > options to stop this from happening again ?
> > > > > >
> > > > > >
> > > > > > The java/kafka process open too many socket file descriptor.
> > > > > > running `lsof -a -p 11818` yield thousand of this line.
> > > > > >
> > > > > > ...
> > > > > > java    11818 kafka 3059u  sock                0,7       0t0
> > > 615637305
> > > > > > can't identify protocol
> > > > > > java    11818 kafka 3060u  sock                0,7       0t0
> > > 615637306
> > > > > > can't identify protocol
> > > > > > java    11818 kafka 3061u  sock                0,7       0t0
> > > 615637307
> > > > > > can't identify protocol
> > > > > > java    11818 kafka 3062u  sock                0,7       0t0
> > > 615637308
> > > > > > can't identify protocol
> > > > > > java    11818 kafka 3063u  sock                0,7       0t0
> > > 615637309
> > > > > > can't identify protocol
> > > > > > java    11818 kafka 3064u  sock                0,7       0t0
> > > 615637310
> > > > > > can't identify protocol
> > > > > > java    11818 kafka 3065u  sock                0,7       0t0
> > > 615637311
> > > > > > can't identify protocol
> > > > > > ...
> > > > > >
> > > > > > i verify that the the open socket did not close when i repeated
> the
> > > > > command
> > > > > > after 2 minutes.
> > > > > >
> > > > > >
> > > > > > and the kafka log on the broken node, generate lots of error like
> > > this:
> > > > > >
> > > > > > [2014-01-21 04:21:48,819]  64573925 [kafka-acceptor] ERROR
> > > > > > kafka.network.Acceptor  - Error in acceptor
> > > > > > java.io.IOException: Too many open files
> > > > > >         at sun.nio.ch.ServerSocketChannelImpl.accept0(Native
> > Method)
> > > > > >         at
> > > > > >
> > > > >
> > > >
> > >
> >
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:165)
> > > > > >         at kafka.network.Acceptor.accept(SocketServer.scala:200)
> > > > > >         at kafka.network.Acceptor.run(SocketServer.scala:154)
> > > > > >         at java.lang.Thread.run(Thread.java:701)
> > > > > > [2014-01-21 04:21:48,819]  64573925 [kafka-acceptor] ERROR
> > > > > > kafka.network.Acceptor  - Error in acceptor
> > > > > > java.io.IOException: Too many open files
> > > > > >         at sun.nio.ch.ServerSocketChannelImpl.accept0(Native
> > Method)
> > > > > >         at
> > > > > >
> > > > >
> > > >
> > >
> >
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:165)
> > > > > >         at kafka.network.Acceptor.accept(SocketServer.scala:200)
> > > > > >         at kafka.network.Acceptor.run(SocketServer.scala:154)
> > > > > >         at java.lang.Thread.run(Thread.java:701)
> > > > > > [2014-01-21 04:21:48,811]  64573917 [ReplicaFetcherThread-0-1]
> INFO
> > > > > >  kafka.consumer.SimpleConsumer  - Reconnect due to socket error:
> > null
> > > > > > [2014-01-21 04:21:48,819]  64573925 [ReplicaFetcherThread-0-1]
> WARN
> > > > > >  kafka.server.ReplicaFetcherThread  - [ReplicaFetcherThread-0-1],
> > > Error
> > > > > in
> > > > > > fetch Name: FetchRequest; Version: 0; CorrelationId: 74930218;
> > > > ClientId:
> > > > > > ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms;
> MinBytes:
> > 1
> > > > > bytes;
> > > > > > RequestInfo: [some-topic,0] ->
> > > > > > PartitionFetchInfo(959825,1048576),[some-topic,3] ->
> > > > > > PartitionFetchInfo(551546,1048576)
> > > > > > java.net.SocketException: Too many open files
> > > > > >         at sun.nio.ch.Net.socket0(Native Method)
> > > > > >         at sun.nio.ch.Net.socket(Net.java:156)
> > > > > >         at
> > > > > sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:102)
> > > > > >         at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:55)
> > > > > >         at
> > > java.nio.channels.SocketChannel.open(SocketChannel.java:122)
> > > > > >         at
> > > > > kafka.network.BlockingChannel.connect(BlockingChannel.scala:48)
> > > > > >         at
> > > > kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> > > > > >         at
> > > > > kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > > > > >         at
> > > > > >
> > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> > > > > >         at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > > > > >         at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> > > > > >         at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > > > > >         at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > > > > >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > > >         at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > > > > >         at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > > > > >         at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > > > > >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > > >         at
> > > > kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> > > > > >         at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> > > > > >         at
> > > > > >
> > > >
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> > > > > >         at
> > > > > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Ahmy Yulrizka
> > > > > > http://ahmy.yulrizka.com
> > > > > > @yulrizka
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to