Hi,
For the past two days, I’ve been tying to implement a KafkaSpout within our
topology. Here
are some important information.
All three services are running on the same instance. Kafka’s brokers use as by
default the 9092
port, with advertised.listeners set to PLAINTEXT://localhost:9092. Zookeeper,
uses the default
client port 2181. Whereas the Storm Nimbus host name has been set to localhost
as well.
A custom Kafka Producer creates log messages successfully, whereas by using the
zkCli
Zookeeper script I’ve seen that when using the /brokers path, the partitions
and other relevant
information are stored correctly.
However, I keep getting the error when activating, and afterwards monitoring
the topology.
Here is the source code of the Storm topology I’ve implemented:
BrokerHosts hosts = new ZkHosts("127.0.0.1:2181");
SpoutConfig spoutConfig = new SpoutConfig(hosts, "bytes", "/kafkastorm/",
"bytes" + UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.zkServers = Arrays.asList("127.0.0.1");
spoutConfig.zkPort = 2181;
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("bytes", kafkaSpout);
builder.setBolt("byteSize", new
KafkaByteProcessingBolt()).shuffleGrouping("bytes");
StormTopology topology = builder.createTopology();
Config config = new Config();
StormSubmitter.submitTopology("topology", config, topology);
However, the error message I keep getting when executing the bin/storm monitor
<topology_name>
-m bytes is the following:
Exception in thread "main" java.lang.IllegalArgumentException: stream: default
not found
at org.apache.storm.utils.Monitor.metrics(Monitor.java:223)
at org.apache.storm.utils.Monitor.metrics(Monitor.java:159)
at org.apache.storm.command.monitor$_main.doInvoke(monitor.clj:36)
at clojure.lang.RestFn.applyTo(RestFn.java:137)
at org.apache.storm.command.monitor.main(Unknown Source)
Whereas by inspecting the logs of the workers (the worker.log file), I’ve
concluded that
the KafkaSpout fails on the open() method.
java.lang.NoClassDefFoundError: org/apache/curator/RetryPolicy
at org.apache.storm.kafka.KafkaSpout.open(KafkaSpout.java:75)
~[storm-kafka-1.0.2.jar:1.0.2]
at org.apache.storm.daemon.executor$fn__7990$fn__8005.invoke(executor.clj:604)
~[storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:482)
[storm-core-1.0.2.jar:1.0.2]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101]
Caused by: java.lang.ClassNotFoundException: org.apache.curator.RetryPolicy
at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[?:1.8.0_101]
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_101]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) ~[?:1.8.0_101]
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_101]
... 5 more
Could someone explain what might be the reason for the KafkaSpout to fail on
the
open() method?
I would really appreciate for your help!
Thanks in advance,
Dominik