Hi,

For the past two days, I’ve been tying to implement a KafkaSpout within our 
topology. Here 
are some important information.

All three services are running on the same instance. Kafka’s brokers use as by 
default the 9092 
port, with advertised.listeners set to PLAINTEXT://localhost:9092. Zookeeper, 
uses the default 
client port 2181. Whereas the Storm Nimbus host name has been set to localhost 
as well.

A custom Kafka Producer creates log messages successfully, whereas by using the 
zkCli 
Zookeeper script I’ve seen that when using the /brokers path, the partitions 
and other relevant 
information are stored correctly.

However, I keep getting the error when activating, and afterwards monitoring 
the topology. 
Here is the source code of the Storm topology I’ve implemented:

BrokerHosts hosts = new ZkHosts("127.0.0.1:2181");

SpoutConfig spoutConfig = new SpoutConfig(hosts, "bytes", "/kafkastorm/", 
"bytes" + UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.zkServers = Arrays.asList("127.0.0.1");
spoutConfig.zkPort = 2181;

KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("bytes", kafkaSpout);
builder.setBolt("byteSize", new 
KafkaByteProcessingBolt()).shuffleGrouping("bytes");

StormTopology topology = builder.createTopology();

Config config = new Config();

StormSubmitter.submitTopology("topology", config, topology);

However, the error message I keep getting when executing the bin/storm monitor 
<topology_name> 
-m bytes is the following:

Exception in thread "main" java.lang.IllegalArgumentException: stream: default 
not found
at org.apache.storm.utils.Monitor.metrics(Monitor.java:223)
at org.apache.storm.utils.Monitor.metrics(Monitor.java:159)
at org.apache.storm.command.monitor$_main.doInvoke(monitor.clj:36)
at clojure.lang.RestFn.applyTo(RestFn.java:137)
at org.apache.storm.command.monitor.main(Unknown Source)

Whereas by inspecting the logs of the workers (the worker.log file), I’ve 
concluded that 
the KafkaSpout fails on the open() method. 

java.lang.NoClassDefFoundError: org/apache/curator/RetryPolicy
at org.apache.storm.kafka.KafkaSpout.open(KafkaSpout.java:75) 
~[storm-kafka-1.0.2.jar:1.0.2]
at org.apache.storm.daemon.executor$fn__7990$fn__8005.invoke(executor.clj:604) 
~[storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:482) 
[storm-core-1.0.2.jar:1.0.2]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101]
Caused by: java.lang.ClassNotFoundException: org.apache.curator.RetryPolicy
at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[?:1.8.0_101]
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_101]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) ~[?:1.8.0_101]
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_101]
... 5 more

Could someone explain what might be the reason for the KafkaSpout to fail on 
the 
open() method? 

I would really appreciate for your help!

Thanks in advance,
Dominik




Reply via email to