When I check the server log, it also flushes with exception:
[2014-11-02 10:00:32,284] ERROR Closing socket for /10.93.80.119 because of
error (kafka.network.Processor)
java.io.IOException: Broken pipe
at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
at
sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:433)
at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:565)
at kafka.log.FileMessageSet.writeTo(FileMessageSet.scala:147)
at kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:69)
at kafka.network.MultiSend.writeTo(Transmission.scala:102)
at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:124)
at kafka.network.MultiSend.writeTo(Transmission.scala:102)
at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:219)
at kafka.network.Processor.write(SocketServer.scala:405)
at kafka.network.Processor.run(SocketServer.scala:265)
at java.lang.Thread.run(Thread.java:744)
On Sat, Nov 1, 2014 at 9:46 PM, Chen Wang
wrote:
> Hello Folks,
> I am using Highlevel consumer, and it seems to drop connections
> intermittently:
>
> 2014-11-01 13:34:40 SimpleConsumer [INFO] Reconnect due to socket error:
> Received -1 when reading from channel, socket has likely been closed.
> 2014-11-01 13:34:40 ConsumerFetcherThread [WARN]
> [ConsumerFetcherThread-campaign_open_consumer_targeting_20141031_trgt-storm03-1414801367127-40cc618a-0-5],
> Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 20220;
> ClientId:
> campaign_open_consumer_targeting_20141031-ConsumerFetcherThread-campaign_open_consumer_targeting_20141031_trgt-storm03-1414801367127-40cc618a-0-5;
> ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo:
> [test_topic,18] -> PartitionFetchInfo(1681313989,4194304),[test_topic,21]
> -> PartitionFetchInfo(141266339,4194304)
> java.net.ConnectException: Connection refused
> at sun.nio.ch.Net.connect(Native Method)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:534)
> at kafka.network.BlockingChannel.connect(Unknown Source)
> at kafka.consumer.SimpleConsumer.connect(Unknown Source)
> at kafka.consumer.SimpleConsumer.reconnect(Unknown Source)
> at kafka.consumer.SimpleConsumer.liftedTree1$1(Unknown Source)
> at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(Unknown
> Source)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Unknown
> Source)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
> Source)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
> Source)
> at kafka.metrics.KafkaTimer.time(Unknown Source)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(Unknown Source)
> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
> Source)
> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
> Source)
> at kafka.metrics.KafkaTimer.time(Unknown Source)
> at kafka.consumer.SimpleConsumer.fetch(Unknown Source)
> at kafka.server.AbstractFetcherThread.processFetchRequest(Unknown
> Source)
> at kafka.server.AbstractFetcherThread.doWork(Unknown Source)
> at kafka.utils.ShutdownableThread.run(Unknown Source)
> 2014-11-01 13:34:40 VerifiableProperties [INFO] Verifying properties
>
> or sometimes:
> 2014-11-01 13:34:40 SimpleConsumer [INFO] Reconnect due to socket error:
> null
> 2014-11-01 13:34:40 ConsumerFetcherThread [WARN]
> [ConsumerFetcherThread-campaign_open_consumer_targeting_20141031_trgt-storm03-1414801367127-40cc618a-0-5],
> Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 20222;
> ClientId:
> campaign_open_consumer_targeting_20141031-ConsumerFetcherThread-campaign_open_consumer_targeting_20141031_trgt-storm03-1414801367127-40cc618a-0-5;
> ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo:
> [test_topic,18] -> PartitionFetchInfo(1681313989,4194304)
> java.net.ConnectException: Connection refused
> at sun.nio.ch.Net.connect(Native Method)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:534)
> at kafka.network.BlockingChannel.connect(Unknown Source)
> at kafka.consumer.SimpleConsumer.connect(Unknown Source)
> at kafka.consumer.SimpleConsumer.reconnect(Unknown Source)
> at kafka.consumer.SimpleConsumer.liftedTree1$1(Unknown Source)
> at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(Unknown
> Source)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Unknown
> Source)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
> Source)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun