Milad, Can you share your kafkaSpout config. -Harsha
On Mon, Jan 26, 2015, at 01:31 PM, Milad Fatenejad wrote: > Hello: > > I reran my test with a replication factor of 2 but encountered the > same issue...any other suggestions? > > Thanks Milad > > On Mon, Jan 26, 2015 at 1:06 PM, Manoj Jaiswal > <[email protected]> wrote: >> three node kafka cluster with replication factor of 3 is bad design. >> >> It should always be less than the cluster size. >> >> Please change it to 2 and try again. >> >> -Manoj >> >> On Mon, Jan 26, 2015 at 7:19 AM, Milad Fatenejad >> <[email protected]> wrote: >>> Hello: >>> >>> I have a three node kafka cluster with a single topic and a topic >>> replication factor of 3. I ran a test where I inserted a few hundred >>> messages into kafka. While the topology was reading these messages, >>> I killed one of the brokers. >>> >>> My hope was that the kafka spout would simply use one of the other >>> replicas. Instead, I saw the following exceptions and no attempt to >>> retry or failover to one of the working kafka replicas...is this the >>> expected behavior? Is there a way to make storm kafka read messages >>> reliably in the presence of broker failures? >>> >>> I should say that I checked with kafka and after the node failure, >>> one of the other brokers was promoted to leader for each partition, >>> as expected. So I don't understand why the kafka spout wouldn't >>> eventually retry using the new leader? I also checked with the kafka >>> console consumer and verified that all messages are available in >>> kafka. >>> >>> Thank you Milad >>> >>> Exception stack trace is as follows: >>> >>> 2015-01-26 14:21:54,451 [Thread-10-event_spout] >>> [kafka.consumer.SimpleConsumer] [INFO]> Reconnect due to socket >>> error: java.nio.channels.ClosedChannelException 2015-01-26 >>> 14:21:54,451 [Thread-10-event_spout] [storm.kafka.KafkaUtils] >>> [WARN]> Network error when fetching messages: >>> java.nio.channels.ClosedChannelException: null at >>> kafka.network.BlockingChannel.send(BlockingChannel.scala:97) >>> ~[stormjar.jar:na] at >>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79) >>> ~[stormjar.jar:na] at >>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69) >>> ~[stormjar.jar:na] at >>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:113) >>> ~[stormjar.jar:na] at >>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113) >>> ~[stormjar.jar:na] at >>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113) >>> ~[stormjar.jar:na] at >>> kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) >>> ~[stormjar.jar:na] at >>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:112) >>> ~[stormjar.jar:na] at >>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112) >>> ~[stormjar.jar:na] at >>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112) >>> ~[stormjar.jar:na] at >>> kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) >>> ~[stormjar.jar:na] at >>> kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:111) >>> ~[stormjar.jar:na] at >>> kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:47) >>> ~[stormjar.jar:na] at >>> storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:167) >>> ~[stormjar.jar:na] at >>> storm.kafka.PartitionManager.fill(PartitionManager.java:162) >>> [stormjar.jar:na] at >>> storm.kafka.PartitionManager.next(PartitionManager.java:124) >>> [stormjar.jar:na] at >>> storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:141) >>> [stormjar.jar:na] at >>> backtype.storm.daemon.executor$fn__4249$fn__4264$fn__4293.invoke(executor.clj:565) >>> [storm-core-0.9.3.jar:0.9.3] at >>> backtype.storm.util$async_loop$fn__461.invoke(util.clj:463) >>> [storm-core-0.9.3.jar:0.9.3] at clojure.lang.AFn.run(AFn.java:24) >>> [clojure-1.5.1.jar:na] at java.lang.Thread.run(Thread.java:745) >>> [na:1.8.0_20] 2015-01-26 14:21:54,452 [Thread-10-event_spout] >>> [storm.kafka.KafkaSpout] [WARN]> Fetch failed >>> storm.kafka.FailedFetchException: >>> java.nio.channels.ClosedChannelException at >>> storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:175) >>> ~[stormjar.jar:na] at >>> storm.kafka.PartitionManager.fill(PartitionManager.java:162) >>> ~[stormjar.jar:na] at >>> storm.kafka.PartitionManager.next(PartitionManager.java:124) >>> ~[stormjar.jar:na] at >>> storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:141) >>> ~[stormjar.jar:na] at >>> backtype.storm.daemon.executor$fn__4249$fn__4264$fn__4293.invoke(executor.clj:565) >>> [storm-core-0.9.3.jar:0.9.3] at >>> backtype.storm.util$async_loop$fn__461.invoke(util.clj:463) >>> [storm-core-0.9.3.jar:0.9.3] at clojure.lang.AFn.run(AFn.java:24) >>> [clojure-1.5.1.jar:na] at java.lang.Thread.run(Thread.java:745) >>> [na:1.8.0_20] Caused by: java.nio.channels.ClosedChannelException: >>> null at kafka.network.BlockingChannel.send(BlockingChannel.scala:97) >>> ~[stormjar.jar:na] at >>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79) >>> ~[stormjar.jar:na] at >>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69) >>> ~[stormjar.jar:na] at >>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:113) >>> ~[stormjar.jar:na] at >>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113) >>> ~[stormjar.jar:na] at >>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113) >>> ~[stormjar.jar:na] at >>> kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) >>> ~[stormjar.jar:na] at >>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:112) >>> ~[stormjar.jar:na] at >>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112) >>> ~[stormjar.jar:na] at >>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112) >>> ~[stormjar.jar:na] at >>> kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) >>> ~[stormjar.jar:na] at >>> kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:111) >>> ~[stormjar.jar:na] at >>> kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:47) >>> ~[stormjar.jar:na] at >>> storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:167) >>> ~[stormjar.jar:na] ... 7 common frames omitted >> >
