Okey - perhaps something totally messed up
2015-01-23T11:40:45.364+0200 s.k.ZkCoordinator [INFO] Task [1/1] New
partition managers: [Partition{host=bigdata14.webmedia.int:9092,
partition=0}]
2015-01-23T11:40:45.571+0200 s.k.PartitionManager [WARN] Error reading
and/or parsing at ZkNode: //storm/partition_0
java.lang.RuntimeException: java.lang.RuntimeException:
java.lang.IllegalArgumentException: Invalid path string
"//storm/partition_0" caused by empty node name specified @1
at storm.kafka.ZkState.readJSON(ZkState.java:96) ~[stormjar.jar:na]
at
storm.kafka.PartitionManager.<init>(PartitionManager.java:73)
~[stormjar.jar:na]
at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98)
[stormjar.jar:na]
at
storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69)
[stormjar.jar:na]
at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135)
[stormjar.jar:na]
at
backtype.storm.daemon.executor$fn__3373$fn__3388$fn__3417.invoke(executor.clj:565)
[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463)
[storm-core-0.9.3.jar:0.9.3]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
Caused by: java.lang.RuntimeException:
java.lang.IllegalArgumentException: Invalid path string
"//storm/partition_0" caused by empty node name specified @1
at storm.kafka.ZkState.readBytes(ZkState.java:108)
~[stormjar.jar:na]
at storm.kafka.ZkState.readJSON(ZkState.java:90) ~[stormjar.jar:na]
... 8 common frames omitted
Caused by: java.lang.IllegalArgumentException: Invalid path string
"//storm/partition_0" caused by empty node name specified @1
at
org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:99)
~[stormjar.jar:na]
at org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1024)
~[stormjar.jar:na]
at
org.apache.curator.framework.imps.ExistsBuilderImpl$2.call(ExistsBuilderImpl.java:172)
~[stormjar.jar:na]
at
org.apache.curator.framework.imps.ExistsBuilderImpl$2.call(ExistsBuilderImpl.java:161)
~[stormjar.jar:na]
at
org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107)
~[stormjar.jar:na]
at
org.apache.curator.framework.imps.ExistsBuilderImpl.pathInForeground(ExistsBuilderImpl.java:157)
~[stormjar.jar:na]
at
org.apache.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:148)
~[stormjar.jar:na]
at
org.apache.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:36)
~[stormjar.jar:na]
at storm.kafka.ZkState.readBytes(ZkState.java:102)
~[stormjar.jar:na]
... 9 common frames omitted
Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
+372 51 480
On 23/01/15 11:34, Margus Roo wrote:
One more thing - I can not see any consumer in kafkaMonitor. So the
main question why this code does not consuming anything from the
bigdata14:9092 demo topic?
Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
+372 51 480
On 23/01/15 11:29, Margus Roo wrote:
Hi
I am sure that I am doing something wrong. Since now I am worked with
storm and kafka but newer put them together.
Now I'd like to consume from kafka using storm
My topology code is:
Config conf = new Config();
conf.setDebug(true);
BrokerHosts host = new ZkHosts("nn1:2181");
SpoutConfig spoutConfig = new SpoutConfig(host, "demo",
"/", "stormgroup");
spoutConf.scheme = new SchemeAsMultiScheme(new
StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", kafkaSpout);
builder.setBolt("bolt1", new
DemoBolt()).shuffleGrouping("spout");
// if you wish to run your job on a remote cluster
conf.setNumWorkers(4);
conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
conf.put(Config.STORM_ZOOKEEPER_PORT, 2181);
conf.put(Config.WORKER_CHILDOPTS , "-Xmx4096m");
conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS , "-Xmx4096m");
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING , 512);
StormSubmitter.submitTopology(args[0], conf,
builder.createTopology());
I can compile it and upload to storm server. But there are no emits.
As I understand nextTuple() will called automatically?