Okey - perhaps something totally messed up

2015-01-23T11:40:45.364+0200 s.k.ZkCoordinator [INFO] Task [1/1] New partition managers: [Partition{host=bigdata14.webmedia.int:9092, partition=0}] 2015-01-23T11:40:45.571+0200 s.k.PartitionManager [WARN] Error reading and/or parsing at ZkNode: //storm/partition_0 java.lang.RuntimeException: java.lang.RuntimeException: java.lang.IllegalArgumentException: Invalid path string "//storm/partition_0" caused by empty node name specified @1
        at storm.kafka.ZkState.readJSON(ZkState.java:96) ~[stormjar.jar:na]
at storm.kafka.PartitionManager.<init>(PartitionManager.java:73) ~[stormjar.jar:na] at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) [stormjar.jar:na] at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) [stormjar.jar:na] at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) [stormjar.jar:na] at backtype.storm.daemon.executor$fn__3373$fn__3388$fn__3417.invoke(executor.clj:565) [storm-core-0.9.3.jar:0.9.3] at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) [storm-core-0.9.3.jar:0.9.3]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
Caused by: java.lang.RuntimeException: java.lang.IllegalArgumentException: Invalid path string "//storm/partition_0" caused by empty node name specified @1 at storm.kafka.ZkState.readBytes(ZkState.java:108) ~[stormjar.jar:na]
        at storm.kafka.ZkState.readJSON(ZkState.java:90) ~[stormjar.jar:na]
        ... 8 common frames omitted
Caused by: java.lang.IllegalArgumentException: Invalid path string "//storm/partition_0" caused by empty node name specified @1 at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:99) ~[stormjar.jar:na] at org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1024) ~[stormjar.jar:na] at org.apache.curator.framework.imps.ExistsBuilderImpl$2.call(ExistsBuilderImpl.java:172) ~[stormjar.jar:na] at org.apache.curator.framework.imps.ExistsBuilderImpl$2.call(ExistsBuilderImpl.java:161) ~[stormjar.jar:na] at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107) ~[stormjar.jar:na] at org.apache.curator.framework.imps.ExistsBuilderImpl.pathInForeground(ExistsBuilderImpl.java:157) ~[stormjar.jar:na] at org.apache.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:148) ~[stormjar.jar:na] at org.apache.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:36) ~[stormjar.jar:na] at storm.kafka.ZkState.readBytes(ZkState.java:102) ~[stormjar.jar:na]
        ... 9 common frames omitted

Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
+372 51 480

On 23/01/15 11:34, Margus Roo wrote:
One more thing - I can not see any consumer in kafkaMonitor. So the main question why this code does not consuming anything from the bigdata14:9092 demo topic?

Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
+372 51 480

On 23/01/15 11:29, Margus Roo wrote:
Hi

I am sure that I am doing something wrong. Since now I am worked with storm and kafka but newer put them together.
Now I'd like to consume from kafka using storm

My topology code is:

           Config conf = new Config();
           conf.setDebug(true);

           BrokerHosts host = new ZkHosts("nn1:2181");
SpoutConfig spoutConfig = new SpoutConfig(host, "demo", "/", "stormgroup"); spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
           KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);


           TopologyBuilder builder = new TopologyBuilder();
           builder.setSpout("spout", kafkaSpout);
builder.setBolt("bolt1", new DemoBolt()).shuffleGrouping("spout");

           // if you wish to run your job on a remote cluster
           conf.setNumWorkers(4);
           conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
           conf.put(Config.STORM_ZOOKEEPER_PORT, 2181);
           conf.put(Config.WORKER_CHILDOPTS , "-Xmx4096m");
           conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS , "-Xmx4096m");
           conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING  , 512);

StormSubmitter.submitTopology(args[0], conf, builder.createTopology());


I can compile it and upload to storm server. But there are no emits. As I understand nextTuple() will called automatically?



Reply via email to