Thanks for find this out. We probably should disconnect on any exception.
Could you file a jira and perhaps attach a patch?

Thanks,

Jun


On Fri, Jan 24, 2014 at 6:06 AM, Ahmy Yulrizka <a...@yulrizka.com> wrote:

> Hi,
>
> I Think I found out the problem..
>
> this is part of the stack trace. First i think there is connection problem,
> and when connection restore it get new information from the zookeeper
>
> [2014-01-23 23:24:55,391] INFO Opening socket connection to server
> host2.provider.com/2.2.2.2:2181 (org.apache.zookeeper.ClientCnxn)
> [2014-01-23 23:24:55,391] INFO Socket connection established to
> host2.provider.com/2.2.2.2:2181, initiating session
> (org.apache.zookeeper.ClientCnxn)
> [2014-01-23 23:24:55,391] DEBUG Session establishment request sent on
> host2.provider.com/2.2.2.2:2181 (org.apache.zookeeper.ClientCnxn)
> [2014-01-23 23:24:57,393] INFO Client session timed out, have not heard
> from server in 2002ms for sessionid 0x0, closing socket connection and
> attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2014-01-23 23:24:58,047] INFO Opening socket connection to server
> host3.provider.com/3.3.3.3:2181 (org.apache.zookeeper.ClientCnxn)
> [2014-01-23 23:24:59,048] INFO Socket connection established to
> host3.provider.com/3.3.3.3:2181, initiating session
> (org.apache.zookeeper.ClientCnxn)
> [2014-01-23 23:24:59,049] DEBUG Session establishment request sent on
> host3.provider.com/3.3.3.3:2181 (org.apache.zookeeper.ClientCnxn)
> [2014-01-23 23:24:59,539] INFO Session establishment complete on server
> host3.provider.com/3.3.3.3:2181, sessionid = 0x343c13436e50001, negotiated
> timeout = 6000 (org.apache.zookeeper.ClientCnxn)
> [2014-01-23 23:24:59,539] DEBUG Received event: WatchedEvent
> state:SyncConnected type:None path:null (org.I0Itec.zkclient.ZkClient)
> [2014-01-23 23:24:59,539] INFO zookeeper state changed (SyncConnected)
> (org.I0Itec.zkclient.ZkClient)
> [2014-01-23 23:24:59,539] DEBUG New event: ZkEvent[State changed to
> SyncConnected sent to
> kafka.server.KafkaZooKeeper$SessionExpireListener@163e1f27]
> (org.I0Itec.zkclient.ZkEventThread)
> [2014-01-23 23:24:59,539] DEBUG New event: ZkEvent[State changed to
> SyncConnected sent to
> kafka.controller.KafkaController$SessionExpirationListener@486f44d9]
> (org.I0Itec.zkclient.ZkEventThread)
> [2014-01-23 23:24:59,539] DEBUG Leaving process event
> (org.I0Itec.zkclient.ZkClient)
> [2014-01-23 23:24:59,539] DEBUG State is SyncConnected
> (org.I0Itec.zkclient.ZkClient)
> [2014-01-23 23:24:59,540] DEBUG State is SyncConnected
> (org.I0Itec.zkclient.ZkClient)
>
> Then the ReplicaFetcher Thread tries to reconnect.
> At this point it tries to connect to other brokers. but then it can't
> resolve the IP address and throws :
> "java.nio.channels.UnresolvedAddressException"
>
> [2014-01-23 23:25:01,067] WARN [ReplicaFetcherThread-0-1], Error in fetch
> Name: FetchRequest; Version: 0; CorrelationId: 478411; ClientId:
> ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1 bytes;
> RequestInfo: [some-topic,0] ->
> PartitionFetchInfo(1247662,1048576),[some-topic,3] ->
> PartitionFetchInfo(839677,1048576) (kafka.server.ReplicaFetcherThread)
> java.nio.channels.UnresolvedAddressException
> at sun.nio.ch.Net.checkAddress(Net.java:89)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:510)
>  at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
>  at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
>  at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
>  at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
>  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
>  at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
>  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
>  at
>
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> [2014-01-23 23:25:01,077] INFO Reconnect due to socket error: null
> (kafka.consumer.SimpleConsumer)
> [2014-01-23 23:25:01,078] WARN [ReplicaFetcherThread-0-1], Error in fetch
> Name: FetchRequest; Version: 0; CorrelationId: 478412; ClientId:
> ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1 bytes;
> RequestInfo: [some-topic,0] ->
> PartitionFetchInfo(1247662,1048576),[some-topic,3] ->
> PartitionFetchInfo(839677,1048576) (kafka.server.ReplicaFetcherThread)
> java.nio.channels.UnresolvedAddressException
> at sun.nio.ch.Net.checkAddress(Net.java:89)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:510)
>  at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
>  at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
>  at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
>  at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
>  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
>  at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
>  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
>  at
>
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> [2014-01-23 23:25:01,079] INFO Reconnect due to socket error: null
> (kafka.consumer.SimpleConsumer)
>
>
> it throws bunch of this error until too many open file...
>
> [2014-01-23 23:25:03,756] INFO Reconnect due to socket error: null
> (kafka.consumer.SimpleConsumer)
> [2014-01-23 23:25:03,756] WARN [ReplicaFetcherThread-0-1], Error in fetch
> Name: FetchRequest; Version: 0; CorrelationId: 482406; ClientId:
> ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1 bytes;
> RequestInfo: [some-topic,0] ->
> PartitionFetchInfo(1247662,1048576),[some-topic,2] ->
> PartitionFetchInfo(1135106,1048576),[some-topic,3] ->
> PartitionFetchInfo(839677,1048576),[some-topic,1] ->
> PartitionFetchInfo(1273826,1048576) (kafka.server.ReplicaFetcherThread)
> java.nio.channels.UnresolvedAddressException
> at sun.nio.ch.Net.checkAddress(Net.java:89)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:510)
>  at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
>  at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
>  at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
>  at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
>  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
>  at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
>  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
>  at
>
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> [2014-01-23 23:25:03,757] INFO Reconnect due to socket error: null
> (kafka.consumer.SimpleConsumer)
> [2014-01-23 23:25:03,757] WARN [ReplicaFetcherThread-0-1], Error in fetch
> Name: FetchRequest; Version: 0; CorrelationId: 482407; ClientId:
> ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1 bytes;
> RequestInfo: [some-topic,0] ->
> PartitionFetchInfo(1247662,1048576),[some-topic,2] ->
> PartitionFetchInfo(1135106,1048576),[some-topic,3] ->
> PartitionFetchInfo(839677,1048576),[some-topic,1] ->
> PartitionFetchInfo(1273826,1048576) (kafka.server.ReplicaFetcherThread)
> java.net.SocketException: Too many open files
> at sun.nio.ch.Net.socket0(Native Method)
> at sun.nio.ch.Net.socket(Net.java:156)
>  at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:102)
> at
>
> sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:55)
>  at java.nio.channels.SocketChannel.open(SocketChannel.java:122)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:48)
>  at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
>  at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
>  at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
>  at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>  at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
>  at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>  at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> at
>
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
>  at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> [2014-01-23 23:25:03,757] INFO Reconnect due to socket error: null
> (kafka.consumer.SimpleConsumer)
> [2014-01-23 23:25:03,757] WARN [ReplicaFetcherThread-0-1], Error in fetch
> Name: FetchRequest; Version: 0; CorrelationId: 482408; ClientId:
> ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1 bytes;
> RequestInfo: [some-topic,0] ->
> PartitionFetchInfo(1247662,1048576),[some-topic,2] ->
> PartitionFetchInfo(1135106,1048576),[some-topic,3] ->
> PartitionFetchInfo(839677,1048576),[some-topic,1] ->
> PartitionFetchInfo(1273826,1048576) (kafka.server.ReplicaFetcherThread)
> java.net.SocketException: Too many open files
> at sun.nio.ch.Net.socket0(Native Method)
> at sun.nio.ch.Net.socket(Net.java:156)
>  at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:102)
> at
>
> sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:55)
>  at java.nio.channels.SocketChannel.open(SocketChannel.java:122)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:48)
>  at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
>  at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
>  at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
>  at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>  at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
>  at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>  at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> at
>
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
>  at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> [2014-01-23 23:25:03,758] INFO Reconnect due to socket error: null
> (kafka.consumer.SimpleConsumer)
>
>
>
> I look into the source code of
> core/src/main/scala/kafka/consumer/SimpleConsumer.scala line 79,
>
> it tries to reconnect(). but only close the connection when it is
> "java.io.IOException" which did not catch
> "java.nio.channels.UnresolvedAddressException"
>
>   private def sendRequest(request: RequestOrResponse): Receive = {
>     lock synchronized {
>       getOrMakeConnection()
>       var response: Receive = null
>       try {
>         blockingChannel.send(request)
>         response = blockingChannel.receive()
>       } catch {
>         case e : java.io.IOException =>
>           info("Reconnect due to socket error: %s".format(e.getMessage))
>           // retry once
>           try {
>             reconnect()
>             blockingChannel.send(request)
>             response = blockingChannel.receive()
>           } catch {
>             case ioe: java.io.IOException =>
>               disconnect()
>               throw ioe
>           }
>         case e: Throwable => throw e
>       }
>       response
>     }
>   }
>
> This is my production setting
>
> OS: Ubuntu 12.04
> kafka : kafka_2.8.0-0.8.0.jar
>
> java :
> java version "1.6.0_27"
> OpenJDK Runtime Environment (IcedTea6 1.12.6)
> (6b27-1.12.6-1ubuntu0.12.04.4)
> OpenJDK 64-Bit Server VM (build 20.0-b12, mixed mode)
>
> Some interesting fact that if i one of the file descriptor of the process
> using
>
> $ gdb -p KAFKA_PID
> > call close(4567)
>
> it reuse that file description and establish connection to a consumer
>
> java       9708      kafka 4087u     sock                0,7         0t0
> 3258461771 can't identify protocol
> java       9708      kafka 4088u     IPv4         3441430493         0t0
>      TCP host2.provider.com:9092->consumer.host.com:38208 (ESTABLISHED)
> java       9708      kafka 4089u     sock                0,7         0t0
> 3258461773 can't identify protocol
> java       9708      kafka 4090u     sock                0,7         0t0
> 3258461774 can't identify protocol
>
>
>
>
> --
> Ahmy Yulrizka
> http://ahmy.yulrizka.com
> @yulrizka
>
>
> On Wed, Jan 22, 2014 at 5:41 AM, Jun Rao <jun...@gmail.com> wrote:
>
> > Hmm, without knowing the client ip, it's hard to tell whether those are
> > from replication fetcher threads or not. Are most of those connections in
> > established mode?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Tue, Jan 21, 2014 at 8:06 AM, Ahmy Yulrizka <a...@yulrizka.com>
> wrote:
> >
> > > this is the the line i copied on lsof
> > >
> > > ...
> > > java      11818      kafka   98u     sock                0,7       0t0
> > >  615628183 can't identify protocol
> > > java      11818      kafka   99u     IPv4          615077352       0t0
> > >    TCP somedomain.com:9092->121-123-123-123.someprovider.net:37547
> > > (CLOSE_WAIT)
> > > java      11818      kafka  100u     IPv4          615077353       0t0
> > >    TCP somedomain.com:9092->121-123-123-123.someprovider.net:37553
> > > (ESTABLISHED)
> > > java      11818      kafka  101u     sock                0,7       0t0
> > >  615628184 can't identify protocol
> > > java      11818      kafka  102u     sock                0,7       0t0
> > >  615628185 can't identify protocol
> > > java      11818      kafka  103u     sock                0,7       0t0
> > >  615628186 can't identify protocol
> > > ...
> > >
> > > as you can see, from the output, i could see the connection state on
> some
> > > of the TCP, but the sock only gives information "can't identify
> protocol"
> > > so I could not see where or from this sock is originating
> > >
> > > I could not see the connection also when i run netstat -nat
> > >
> > > --
> > > Ahmy Yulrizka
> > > http://ahmy.yulrizka.com
> > > @yulrizka
> > >
> > >
> > > On Tue, Jan 21, 2014 at 4:42 PM, Jun Rao <jun...@gmail.com> wrote:
> > >
> > > > What mode are those sockets in (established, closed, etc)? Also, from
> > the
> > > > ip, could you tell whether those sockets are from the client or from
> > the
> > > > replica fetcher in the brokers.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Tue, Jan 21, 2014 at 3:29 AM, Ahmy Yulrizka <a...@yulrizka.com>
> > > wrote:
> > > >
> > > > > We are running 3 kafka nodes, which servers 4 partition.
> > > > > We have been experiencing weird behavior during network outage.
> > > > >
> > > > > we had been experiencing twice in the last couple of days. the
> > previous
> > > > one
> > > > > took down all of the cluster.
> > > > > while this one only 2 out of 3 survive. and 1 node became the
> leader
> > of
> > > > all
> > > > > partition, and other node only in ISR of 1 partition (out of 4)
> > > > >
> > > > > my best guess now is that when the network down, the broker can't
> > > connect
> > > > > to other broker to do replication and keep opening the socket
> > > > > without closing it. But I'm not entirely sure about this.
> > > > >
> > > > > Is there any way to mitigate the problem ? or is there any
> > > configuration
> > > > > options to stop this from happening again ?
> > > > >
> > > > >
> > > > > The java/kafka process open too many socket file descriptor.
> > > > > running `lsof -a -p 11818` yield thousand of this line.
> > > > >
> > > > > ...
> > > > > java    11818 kafka 3059u  sock                0,7       0t0
> > 615637305
> > > > > can't identify protocol
> > > > > java    11818 kafka 3060u  sock                0,7       0t0
> > 615637306
> > > > > can't identify protocol
> > > > > java    11818 kafka 3061u  sock                0,7       0t0
> > 615637307
> > > > > can't identify protocol
> > > > > java    11818 kafka 3062u  sock                0,7       0t0
> > 615637308
> > > > > can't identify protocol
> > > > > java    11818 kafka 3063u  sock                0,7       0t0
> > 615637309
> > > > > can't identify protocol
> > > > > java    11818 kafka 3064u  sock                0,7       0t0
> > 615637310
> > > > > can't identify protocol
> > > > > java    11818 kafka 3065u  sock                0,7       0t0
> > 615637311
> > > > > can't identify protocol
> > > > > ...
> > > > >
> > > > > i verify that the the open socket did not close when i repeated the
> > > > command
> > > > > after 2 minutes.
> > > > >
> > > > >
> > > > > and the kafka log on the broken node, generate lots of error like
> > this:
> > > > >
> > > > > [2014-01-21 04:21:48,819]  64573925 [kafka-acceptor] ERROR
> > > > > kafka.network.Acceptor  - Error in acceptor
> > > > > java.io.IOException: Too many open files
> > > > >         at sun.nio.ch.ServerSocketChannelImpl.accept0(Native
> Method)
> > > > >         at
> > > > >
> > > >
> > >
> >
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:165)
> > > > >         at kafka.network.Acceptor.accept(SocketServer.scala:200)
> > > > >         at kafka.network.Acceptor.run(SocketServer.scala:154)
> > > > >         at java.lang.Thread.run(Thread.java:701)
> > > > > [2014-01-21 04:21:48,819]  64573925 [kafka-acceptor] ERROR
> > > > > kafka.network.Acceptor  - Error in acceptor
> > > > > java.io.IOException: Too many open files
> > > > >         at sun.nio.ch.ServerSocketChannelImpl.accept0(Native
> Method)
> > > > >         at
> > > > >
> > > >
> > >
> >
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:165)
> > > > >         at kafka.network.Acceptor.accept(SocketServer.scala:200)
> > > > >         at kafka.network.Acceptor.run(SocketServer.scala:154)
> > > > >         at java.lang.Thread.run(Thread.java:701)
> > > > > [2014-01-21 04:21:48,811]  64573917 [ReplicaFetcherThread-0-1] INFO
> > > > >  kafka.consumer.SimpleConsumer  - Reconnect due to socket error:
> null
> > > > > [2014-01-21 04:21:48,819]  64573925 [ReplicaFetcherThread-0-1] WARN
> > > > >  kafka.server.ReplicaFetcherThread  - [ReplicaFetcherThread-0-1],
> > Error
> > > > in
> > > > > fetch Name: FetchRequest; Version: 0; CorrelationId: 74930218;
> > > ClientId:
> > > > > ReplicaFetcherThread-0-1; ReplicaId: 2; MaxWait: 500 ms; MinBytes:
> 1
> > > > bytes;
> > > > > RequestInfo: [some-topic,0] ->
> > > > > PartitionFetchInfo(959825,1048576),[some-topic,3] ->
> > > > > PartitionFetchInfo(551546,1048576)
> > > > > java.net.SocketException: Too many open files
> > > > >         at sun.nio.ch.Net.socket0(Native Method)
> > > > >         at sun.nio.ch.Net.socket(Net.java:156)
> > > > >         at
> > > > sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:102)
> > > > >         at
> > > > >
> > > > >
> > > >
> > >
> >
> sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:55)
> > > > >         at
> > java.nio.channels.SocketChannel.open(SocketChannel.java:122)
> > > > >         at
> > > > kafka.network.BlockingChannel.connect(BlockingChannel.scala:48)
> > > > >         at
> > > kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> > > > >         at
> > > > kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > > > >         at
> > > > >
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> > > > >         at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > > > >         at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> > > > >         at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > > > >         at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > > > >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > >         at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > > > >         at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > > > >         at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > > > >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > >         at
> > > kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> > > > >         at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
> > > > >         at
> > > > >
> > >
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> > > > >         at
> > > > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > > >
> > > > >
> > > > > --
> > > > > Ahmy Yulrizka
> > > > > http://ahmy.yulrizka.com
> > > > > @yulrizka
> > > > >
> > > >
> > >
> >
>

Reply via email to