Milad, Can you share your kafkaSpout config. -Harsha

On Mon, Jan 26, 2015, at 01:31 PM, Milad Fatenejad wrote:
> Hello:
>
> I reran my test with a replication factor of 2 but encountered the
> same issue...any other suggestions?
>
> Thanks Milad
>
> On Mon, Jan 26, 2015 at 1:06 PM, Manoj Jaiswal
> <[email protected]> wrote:
>> three node kafka cluster with replication factor of 3 is bad design.
>>
>> It should always be less than the cluster size.
>>
>> Please change it to 2 and try again.
>>
>> -Manoj
>>
>> On Mon, Jan 26, 2015 at 7:19 AM, Milad Fatenejad
>> <[email protected]> wrote:
>>> Hello:
>>>
>>> I have a three node kafka cluster with a single topic and a topic
>>> replication factor of 3. I ran a test where I inserted a few hundred
>>> messages into kafka. While the topology was reading these messages,
>>> I killed one of the brokers.
>>>
>>> My hope was that the kafka spout would simply use one of the other
>>> replicas. Instead, I saw the following exceptions and no attempt to
>>> retry or failover to one of the working kafka replicas...is this the
>>> expected behavior? Is there a way to make storm kafka read messages
>>> reliably in the presence of broker failures?
>>>
>>> I should say that I checked with kafka and after the node failure,
>>> one of the other brokers was promoted to leader for each partition,
>>> as expected. So I don't understand why the kafka spout wouldn't
>>> eventually retry using the new leader? I also checked with the kafka
>>> console consumer and verified that all messages are available in
>>> kafka.
>>>
>>> Thank you Milad
>>>
>>> Exception stack trace is as follows:
>>>
>>> 2015-01-26 14:21:54,451 [Thread-10-event_spout]
>>> [kafka.consumer.SimpleConsumer] [INFO]> Reconnect due to socket
>>> error: java.nio.channels.ClosedChannelException 2015-01-26
>>> 14:21:54,451 [Thread-10-event_spout] [storm.kafka.KafkaUtils]
>>> [WARN]> Network error when fetching messages:
>>> java.nio.channels.ClosedChannelException: null at
>>> kafka.network.BlockingChannel.send(BlockingChannel.scala:97)
>>> ~[stormjar.jar:na] at
>>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
>>> ~[stormjar.jar:na] at
>>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
>>> ~[stormjar.jar:na] at
>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:113)
>>> ~[stormjar.jar:na] at
>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
>>> ~[stormjar.jar:na] at
>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
>>> ~[stormjar.jar:na] at
>>> kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>> ~[stormjar.jar:na] at
>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:112)
>>> ~[stormjar.jar:na] at
>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112)
>>> ~[stormjar.jar:na] at
>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112)
>>> ~[stormjar.jar:na] at
>>> kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>> ~[stormjar.jar:na] at
>>> kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:111)
>>> ~[stormjar.jar:na] at
>>> kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:47)
>>> ~[stormjar.jar:na] at
>>> storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:167)
>>> ~[stormjar.jar:na] at
>>> storm.kafka.PartitionManager.fill(PartitionManager.java:162)
>>> [stormjar.jar:na] at
>>> storm.kafka.PartitionManager.next(PartitionManager.java:124)
>>> [stormjar.jar:na] at
>>> storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:141)
>>> [stormjar.jar:na] at
>>> backtype.storm.daemon.executor$fn__4249$fn__4264$fn__4293.invoke(executor.clj:565)
>>> [storm-core-0.9.3.jar:0.9.3] at
>>> backtype.storm.util$async_loop$fn__461.invoke(util.clj:463)
>>> [storm-core-0.9.3.jar:0.9.3] at clojure.lang.AFn.run(AFn.java:24)
>>> [clojure-1.5.1.jar:na] at java.lang.Thread.run(Thread.java:745)
>>> [na:1.8.0_20] 2015-01-26 14:21:54,452 [Thread-10-event_spout]
>>> [storm.kafka.KafkaSpout] [WARN]> Fetch failed
>>> storm.kafka.FailedFetchException:
>>> java.nio.channels.ClosedChannelException at
>>> storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:175)
>>> ~[stormjar.jar:na] at
>>> storm.kafka.PartitionManager.fill(PartitionManager.java:162)
>>> ~[stormjar.jar:na] at
>>> storm.kafka.PartitionManager.next(PartitionManager.java:124)
>>> ~[stormjar.jar:na] at
>>> storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:141)
>>> ~[stormjar.jar:na] at
>>> backtype.storm.daemon.executor$fn__4249$fn__4264$fn__4293.invoke(executor.clj:565)
>>> [storm-core-0.9.3.jar:0.9.3] at
>>> backtype.storm.util$async_loop$fn__461.invoke(util.clj:463)
>>> [storm-core-0.9.3.jar:0.9.3] at clojure.lang.AFn.run(AFn.java:24)
>>> [clojure-1.5.1.jar:na] at java.lang.Thread.run(Thread.java:745)
>>> [na:1.8.0_20] Caused by: java.nio.channels.ClosedChannelException:
>>> null at kafka.network.BlockingChannel.send(BlockingChannel.scala:97)
>>> ~[stormjar.jar:na] at
>>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
>>> ~[stormjar.jar:na] at
>>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
>>> ~[stormjar.jar:na] at
>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:113)
>>> ~[stormjar.jar:na] at
>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
>>> ~[stormjar.jar:na] at
>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
>>> ~[stormjar.jar:na] at
>>> kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>> ~[stormjar.jar:na] at
>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:112)
>>> ~[stormjar.jar:na] at
>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112)
>>> ~[stormjar.jar:na] at
>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112)
>>> ~[stormjar.jar:na] at
>>> kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>> ~[stormjar.jar:na] at
>>> kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:111)
>>> ~[stormjar.jar:na] at
>>> kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:47)
>>> ~[stormjar.jar:na] at
>>> storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:167)
>>> ~[stormjar.jar:na] ... 7 common frames omitted
>>
>

Reply via email to