Hello:

I have a three node kafka cluster with a single topic and a topic
replication factor of 3. I ran a test where I inserted a few hundred
messages into kafka. While the topology was reading these messages, I
killed one of the brokers.

My hope was that the kafka spout would simply use one of the other
replicas. Instead, I saw the following exceptions and no attempt to retry
or failover to one of the working kafka replicas...is this the expected
behavior? Is there a way to make storm kafka read messages reliably in the
presence of broker failures?

I should say that I checked with kafka and after the node failure, one of
the other brokers was promoted to leader for each partition, as expected.
So I don't understand why the kafka spout wouldn't eventually retry using
the new leader? I also checked with the kafka console consumer and verified
that all messages are available in kafka.

Thank you
Milad

Exception stack trace is as follows:

2015-01-26 14:21:54,451 [Thread-10-event_spout]
[kafka.consumer.SimpleConsumer] [INFO]> Reconnect due to socket error:
java.nio.channels.ClosedChannelException
2015-01-26 14:21:54,451 [Thread-10-event_spout] [storm.kafka.KafkaUtils]
[WARN]> Network error when fetching messages:
java.nio.channels.ClosedChannelException: null
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:97)
~[stormjar.jar:na]
        at
kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
~[stormjar.jar:na]
        at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
~[stormjar.jar:na]
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:113)
~[stormjar.jar:na]
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
~[stormjar.jar:na]
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
~[stormjar.jar:na]
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
~[stormjar.jar:na]
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:112)
~[stormjar.jar:na]
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112)
~[stormjar.jar:na]
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112)
~[stormjar.jar:na]
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
~[stormjar.jar:na]
        at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:111)
~[stormjar.jar:na]
        at
kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:47)
~[stormjar.jar:na]
        at storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:167)
~[stormjar.jar:na]
        at storm.kafka.PartitionManager.fill(PartitionManager.java:162)
[stormjar.jar:na]
        at storm.kafka.PartitionManager.next(PartitionManager.java:124)
[stormjar.jar:na]
        at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:141)
[stormjar.jar:na]
        at
backtype.storm.daemon.executor$fn__4249$fn__4264$fn__4293.invoke(executor.clj:565)
[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.util$async_loop$fn__461.invoke(util.clj:463)
[storm-core-0.9.3.jar:0.9.3]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_20]
2015-01-26 14:21:54,452 [Thread-10-event_spout] [storm.kafka.KafkaSpout]
[WARN]> Fetch failed
storm.kafka.FailedFetchException: java.nio.channels.ClosedChannelException
        at storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:175)
~[stormjar.jar:na]
        at storm.kafka.PartitionManager.fill(PartitionManager.java:162)
~[stormjar.jar:na]
        at storm.kafka.PartitionManager.next(PartitionManager.java:124)
~[stormjar.jar:na]
        at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:141)
~[stormjar.jar:na]
        at
backtype.storm.daemon.executor$fn__4249$fn__4264$fn__4293.invoke(executor.clj:565)
[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.util$async_loop$fn__461.invoke(util.clj:463)
[storm-core-0.9.3.jar:0.9.3]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_20]
Caused by: java.nio.channels.ClosedChannelException: null
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:97)
~[stormjar.jar:na]
        at
kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
~[stormjar.jar:na]
        at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
~[stormjar.jar:na]
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:113)
~[stormjar.jar:na]
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
~[stormjar.jar:na]
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
~[stormjar.jar:na]
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
~[stormjar.jar:na]
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:112)
~[stormjar.jar:na]
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112)
~[stormjar.jar:na]
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112)
~[stormjar.jar:na]
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
~[stormjar.jar:na]
        at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:111)
~[stormjar.jar:na]
        at
kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:47)
~[stormjar.jar:na]
        at storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:167)
~[stormjar.jar:na]
        ... 7 common frames omitted

Reply via email to