Hello: I reran my test with a replication factor of 2 but encountered the same issue...any other suggestions?
Thanks Milad On Mon, Jan 26, 2015 at 1:06 PM, Manoj Jaiswal <[email protected]> wrote: > three node kafka cluster with replication factor of 3 is bad design. > > It should always be less than the cluster size. > > Please change it to 2 and try again. > > -Manoj > > On Mon, Jan 26, 2015 at 7:19 AM, Milad Fatenejad <[email protected]> wrote: > >> Hello: >> >> I have a three node kafka cluster with a single topic and a topic >> replication factor of 3. I ran a test where I inserted a few hundred >> messages into kafka. While the topology was reading these messages, I >> killed one of the brokers. >> >> My hope was that the kafka spout would simply use one of the other >> replicas. Instead, I saw the following exceptions and no attempt to retry >> or failover to one of the working kafka replicas...is this the expected >> behavior? Is there a way to make storm kafka read messages reliably in the >> presence of broker failures? >> >> I should say that I checked with kafka and after the node failure, one of >> the other brokers was promoted to leader for each partition, as expected. >> So I don't understand why the kafka spout wouldn't eventually retry using >> the new leader? I also checked with the kafka console consumer and verified >> that all messages are available in kafka. >> >> Thank you >> Milad >> >> Exception stack trace is as follows: >> >> 2015-01-26 14:21:54,451 [Thread-10-event_spout] >> [kafka.consumer.SimpleConsumer] [INFO]> Reconnect due to socket error: >> java.nio.channels.ClosedChannelException >> 2015-01-26 14:21:54,451 [Thread-10-event_spout] [storm.kafka.KafkaUtils] >> [WARN]> Network error when fetching messages: >> java.nio.channels.ClosedChannelException: null >> at kafka.network.BlockingChannel.send(BlockingChannel.scala:97) >> ~[stormjar.jar:na] >> at >> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79) >> ~[stormjar.jar:na] >> at >> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69) >> ~[stormjar.jar:na] >> at >> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:113) >> ~[stormjar.jar:na] >> at >> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113) >> ~[stormjar.jar:na] >> at >> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113) >> ~[stormjar.jar:na] >> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) >> ~[stormjar.jar:na] >> at >> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:112) >> ~[stormjar.jar:na] >> at >> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112) >> ~[stormjar.jar:na] >> at >> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112) >> ~[stormjar.jar:na] >> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) >> ~[stormjar.jar:na] >> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:111) >> ~[stormjar.jar:na] >> at >> kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:47) >> ~[stormjar.jar:na] >> at storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:167) >> ~[stormjar.jar:na] >> at storm.kafka.PartitionManager.fill(PartitionManager.java:162) >> [stormjar.jar:na] >> at storm.kafka.PartitionManager.next(PartitionManager.java:124) >> [stormjar.jar:na] >> at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:141) >> [stormjar.jar:na] >> at >> backtype.storm.daemon.executor$fn__4249$fn__4264$fn__4293.invoke(executor.clj:565) >> [storm-core-0.9.3.jar:0.9.3] >> at backtype.storm.util$async_loop$fn__461.invoke(util.clj:463) >> [storm-core-0.9.3.jar:0.9.3] >> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] >> at java.lang.Thread.run(Thread.java:745) [na:1.8.0_20] >> 2015-01-26 14:21:54,452 [Thread-10-event_spout] [storm.kafka.KafkaSpout] >> [WARN]> Fetch failed >> storm.kafka.FailedFetchException: java.nio.channels.ClosedChannelException >> at storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:175) >> ~[stormjar.jar:na] >> at storm.kafka.PartitionManager.fill(PartitionManager.java:162) >> ~[stormjar.jar:na] >> at storm.kafka.PartitionManager.next(PartitionManager.java:124) >> ~[stormjar.jar:na] >> at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:141) >> ~[stormjar.jar:na] >> at >> backtype.storm.daemon.executor$fn__4249$fn__4264$fn__4293.invoke(executor.clj:565) >> [storm-core-0.9.3.jar:0.9.3] >> at backtype.storm.util$async_loop$fn__461.invoke(util.clj:463) >> [storm-core-0.9.3.jar:0.9.3] >> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] >> at java.lang.Thread.run(Thread.java:745) [na:1.8.0_20] >> Caused by: java.nio.channels.ClosedChannelException: null >> at kafka.network.BlockingChannel.send(BlockingChannel.scala:97) >> ~[stormjar.jar:na] >> at >> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79) >> ~[stormjar.jar:na] >> at >> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69) >> ~[stormjar.jar:na] >> at >> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:113) >> ~[stormjar.jar:na] >> at >> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113) >> ~[stormjar.jar:na] >> at >> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113) >> ~[stormjar.jar:na] >> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) >> ~[stormjar.jar:na] >> at >> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:112) >> ~[stormjar.jar:na] >> at >> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112) >> ~[stormjar.jar:na] >> at >> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112) >> ~[stormjar.jar:na] >> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) >> ~[stormjar.jar:na] >> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:111) >> ~[stormjar.jar:na] >> at >> kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:47) >> ~[stormjar.jar:na] >> at storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:167) >> ~[stormjar.jar:na] >> ... 7 common frames omitted >> > >
