It seems that the consumer can't connect to the broker for some reason. Any
other error on the broker? Any issue with the network?

Thanks,

Jun

On Sat, Nov 1, 2014 at 9:46 PM, Chen Wang <chen.apache.s...@gmail.com>
wrote:

> Hello Folks,
> I am using Highlevel consumer, and it seems to drop connections
> intermittently:
>
> 2014-11-01 13:34:40 SimpleConsumer [INFO] Reconnect due to socket error:
> Received -1 when reading from channel, socket has likely been closed.
> 2014-11-01 13:34:40 ConsumerFetcherThread [WARN]
>
> [ConsumerFetcherThread-campaign_open_consumer_targeting_20141031_trgt-storm03-1414801367127-40cc618a-0-5],
> Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 20220;
> ClientId:
>
> campaign_open_consumer_targeting_20141031-ConsumerFetcherThread-campaign_open_consumer_targeting_20141031_trgt-storm03-1414801367127-40cc618a-0-5;
> ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo:
> [test_topic,18] -> PartitionFetchInfo(1681313989,4194304),[test_topic,21]
> -> PartitionFetchInfo(141266339,4194304)
> java.net.ConnectException: Connection refused
>         at sun.nio.ch.Net.connect(Native Method)
>         at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:534)
>         at kafka.network.BlockingChannel.connect(Unknown Source)
>         at kafka.consumer.SimpleConsumer.connect(Unknown Source)
>         at kafka.consumer.SimpleConsumer.reconnect(Unknown Source)
>         at kafka.consumer.SimpleConsumer.liftedTree1$1(Unknown Source)
>         at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(Unknown
> Source)
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Unknown
> Source)
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
> Source)
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
> Source)
>         at kafka.metrics.KafkaTimer.time(Unknown Source)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(Unknown Source)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
> Source)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
> Source)
>         at kafka.metrics.KafkaTimer.time(Unknown Source)
>         at kafka.consumer.SimpleConsumer.fetch(Unknown Source)
>         at kafka.server.AbstractFetcherThread.processFetchRequest(Unknown
> Source)
>         at kafka.server.AbstractFetcherThread.doWork(Unknown Source)
>         at kafka.utils.ShutdownableThread.run(Unknown Source)
> 2014-11-01 13:34:40 VerifiableProperties [INFO] Verifying properties
>
> or sometimes:
> 2014-11-01 13:34:40 SimpleConsumer [INFO] Reconnect due to socket error:
> null
> 2014-11-01 13:34:40 ConsumerFetcherThread [WARN]
>
> [ConsumerFetcherThread-campaign_open_consumer_targeting_20141031_trgt-storm03-1414801367127-40cc618a-0-5],
> Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 20222;
> ClientId:
>
> campaign_open_consumer_targeting_20141031-ConsumerFetcherThread-campaign_open_consumer_targeting_20141031_trgt-storm03-1414801367127-40cc618a-0-5;
> ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo:
> [test_topic,18] -> PartitionFetchInfo(1681313989,4194304)
> java.net.ConnectException: Connection refused
>         at sun.nio.ch.Net.connect(Native Method)
>         at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:534)
>         at kafka.network.BlockingChannel.connect(Unknown Source)
>         at kafka.consumer.SimpleConsumer.connect(Unknown Source)
>         at kafka.consumer.SimpleConsumer.reconnect(Unknown Source)
>         at kafka.consumer.SimpleConsumer.liftedTree1$1(Unknown Source)
>         at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(Unknown
> Source)
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Unknown
> Source)
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
> Source)
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
> Source)
>         at kafka.metrics.KafkaTimer.time(Unknown Source)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(Unknown Source)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
> Source)
>         at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
> Source)
>         at kafka.metrics.KafkaTimer.time(Unknown Source)
>         at kafka.consumer.SimpleConsumer.fetch(Unknown Source)
>         at kafka.server.AbstractFetcherThread.processFetchRequest(Unknown
> Source)
>         at kafka.server.AbstractFetcherThread.doWork(Unknown Source)
>         at kafka.utils.ShutdownableThread.run(Unknown Source)
>
> The config I am using is:
> kafka.config.fetch.message.max.bytes4194304kafka.config.group.idmygroupid
> kafka.config.rebalance.backoff.ms6000kafka.config.rebalance.max.retries6
> kafka.config.zookeeper.connectbrokerlist
> kafka.config.zookeeper.session.timeout.ms60000
> There should not be any network connectivity issue as all the zookeepers,
> brokers,consumers are in the same cluster.
>
> What would be the cause for the connection reset error? Is it because the
> zookeeper cannot talk to the broker to get the partitionInfo?
> Thanks,
> Chen
>

Reply via email to