Hi,
This may be the common zookeeper error in with Kafka-Storm integration.
When I run Kafka topology in the local cluster mode, it runs well.
In distributed mode, there are 2 supervisor and 1 nimbus. Zookeeper and
kafka are running on nimbus node. When I submit topology from nimbus node,
zookeeper gives following error in worker logs.
java.lang.RuntimeException: java.lang.RuntimeException:
org.apache.zookeeper.KeeperException$ConnectionLossException:
KeeperErrorCode = ConnectionLoss
at
storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:62)
at storm.kafka.trident.ZkBrokerReader.(ZkBrokerReader.java:24)
at storm.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:35)
at storm.kafka.KafkaSpout.open(KafkaSpout.java:70)
at
backtype.storm.daemon.executor$eval3848$fn__3849$fn__3864.invoke(executor.clj:519)
at backtype.storm.util$async_loop$fn__384.invoke(util.clj:431)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.RuntimeException:
org.apache.zookeeper.KeeperException$ConnectionLossException:
KeeperErrorCode = ConnectionLoss
at
storm.kafka.DynamicBrokersReader.getNumPartitions(DynamicBrokersReader.java:75)
at
storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:48)
... 7 more
Caused by: org.apache.zookeeper.KeeperException$ConnectionLossException:
KeeperErrorCode = ConnectionLoss
at
com.netflix.curator.ConnectionState.getZooKeeper(ConnectionState.java:72)
at
com.netflix.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:74)
at
com.netflix.curator.framework.imps.CuratorFrameworkImpl.getZooKeeper(CuratorFrameworkImpl.java:353)
at
com.netflix.curator.framework.imps.GetChildrenBuilderImpl$3.call(GetChildrenBuilderImpl.java:184)
at
com.netflix.curator.framework.imps.GetChildrenBuilderImpl$3.call(GetChildrenBuilderImpl.java:173)
at com.netflix.curator.RetryLoop.callWithRetry(RetryLoop.java:85)
at
com.netflix.curator.framework.imps.GetChildrenBuilderImpl.pathInForeground(GetChildrenBuilderImpl.java:169)
at
com.netflix.curator.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:161)
at
com.netflix.curator.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:36)
at
storm.kafka.DynamicBrokersReader.getNumPartitions(DynamicBrokersReader.java:72)
... 8 more
Here is the code for kafkaspout in topology :
brokerHosts = new ZkHosts(kafkaZookeeper);
SpoutConfig spoutConf = new SpoutConfig(brokerHosts, kafkaTopic, "",
"storm");
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConf.forceFromStart = true;
spoutConf.zkPort = 2181;
builder.setSpout("kafka-spout", new KafkaSpout(spoutConf), 1);
I am using kafka_2.9.2-0.8.1, zookeeper 3.4.6 and storm 0.9.1. How can I
overcome this error?
-Thanks
Nishu Tayal