Hello:

I reran my test with a replication factor of 2 but encountered the same
issue...any other suggestions?

Thanks
Milad

On Mon, Jan 26, 2015 at 1:06 PM, Manoj Jaiswal <[email protected]>
wrote:

> three node kafka cluster with replication factor of  3 is bad design.
>
> It should always be less than the cluster size.
>
> Please change it to 2 and try again.
>
> -Manoj
>
> On Mon, Jan 26, 2015 at 7:19 AM, Milad Fatenejad <[email protected]> wrote:
>
>> Hello:
>>
>> I have a three node kafka cluster with a single topic and a topic
>> replication factor of 3. I ran a test where I inserted a few hundred
>> messages into kafka. While the topology was reading these messages, I
>> killed one of the brokers.
>>
>> My hope was that the kafka spout would simply use one of the other
>> replicas. Instead, I saw the following exceptions and no attempt to retry
>> or failover to one of the working kafka replicas...is this the expected
>> behavior? Is there a way to make storm kafka read messages reliably in the
>> presence of broker failures?
>>
>> I should say that I checked with kafka and after the node failure, one of
>> the other brokers was promoted to leader for each partition, as expected.
>> So I don't understand why the kafka spout wouldn't eventually retry using
>> the new leader? I also checked with the kafka console consumer and verified
>> that all messages are available in kafka.
>>
>> Thank you
>> Milad
>>
>> Exception stack trace is as follows:
>>
>> 2015-01-26 14:21:54,451 [Thread-10-event_spout]
>> [kafka.consumer.SimpleConsumer] [INFO]> Reconnect due to socket error:
>> java.nio.channels.ClosedChannelException
>> 2015-01-26 14:21:54,451 [Thread-10-event_spout] [storm.kafka.KafkaUtils]
>> [WARN]> Network error when fetching messages:
>> java.nio.channels.ClosedChannelException: null
>>         at kafka.network.BlockingChannel.send(BlockingChannel.scala:97)
>> ~[stormjar.jar:na]
>>         at
>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
>> ~[stormjar.jar:na]
>>         at
>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
>> ~[stormjar.jar:na]
>>         at
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:113)
>> ~[stormjar.jar:na]
>>         at
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
>> ~[stormjar.jar:na]
>>         at
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
>> ~[stormjar.jar:na]
>>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> ~[stormjar.jar:na]
>>         at
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:112)
>> ~[stormjar.jar:na]
>>         at
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112)
>> ~[stormjar.jar:na]
>>         at
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112)
>> ~[stormjar.jar:na]
>>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> ~[stormjar.jar:na]
>>         at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:111)
>> ~[stormjar.jar:na]
>>         at
>> kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:47)
>> ~[stormjar.jar:na]
>>         at storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:167)
>> ~[stormjar.jar:na]
>>         at storm.kafka.PartitionManager.fill(PartitionManager.java:162)
>> [stormjar.jar:na]
>>         at storm.kafka.PartitionManager.next(PartitionManager.java:124)
>> [stormjar.jar:na]
>>         at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:141)
>> [stormjar.jar:na]
>>         at
>> backtype.storm.daemon.executor$fn__4249$fn__4264$fn__4293.invoke(executor.clj:565)
>> [storm-core-0.9.3.jar:0.9.3]
>>         at backtype.storm.util$async_loop$fn__461.invoke(util.clj:463)
>> [storm-core-0.9.3.jar:0.9.3]
>>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>>         at java.lang.Thread.run(Thread.java:745) [na:1.8.0_20]
>> 2015-01-26 14:21:54,452 [Thread-10-event_spout] [storm.kafka.KafkaSpout]
>> [WARN]> Fetch failed
>> storm.kafka.FailedFetchException: java.nio.channels.ClosedChannelException
>>         at storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:175)
>> ~[stormjar.jar:na]
>>         at storm.kafka.PartitionManager.fill(PartitionManager.java:162)
>> ~[stormjar.jar:na]
>>         at storm.kafka.PartitionManager.next(PartitionManager.java:124)
>> ~[stormjar.jar:na]
>>         at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:141)
>> ~[stormjar.jar:na]
>>         at
>> backtype.storm.daemon.executor$fn__4249$fn__4264$fn__4293.invoke(executor.clj:565)
>> [storm-core-0.9.3.jar:0.9.3]
>>         at backtype.storm.util$async_loop$fn__461.invoke(util.clj:463)
>> [storm-core-0.9.3.jar:0.9.3]
>>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>>         at java.lang.Thread.run(Thread.java:745) [na:1.8.0_20]
>> Caused by: java.nio.channels.ClosedChannelException: null
>>         at kafka.network.BlockingChannel.send(BlockingChannel.scala:97)
>> ~[stormjar.jar:na]
>>         at
>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
>> ~[stormjar.jar:na]
>>         at
>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
>> ~[stormjar.jar:na]
>>         at
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:113)
>> ~[stormjar.jar:na]
>>         at
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
>> ~[stormjar.jar:na]
>>         at
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
>> ~[stormjar.jar:na]
>>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> ~[stormjar.jar:na]
>>         at
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:112)
>> ~[stormjar.jar:na]
>>         at
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112)
>> ~[stormjar.jar:na]
>>         at
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112)
>> ~[stormjar.jar:na]
>>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> ~[stormjar.jar:na]
>>         at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:111)
>> ~[stormjar.jar:na]
>>         at
>> kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:47)
>> ~[stormjar.jar:na]
>>         at storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:167)
>> ~[stormjar.jar:na]
>>         ... 7 common frames omitted
>>
>
>

Reply via email to