Hi

I am sure that I am doing something wrong. Since now I am worked with storm and kafka but newer put them together.
Now I'd like to consume from kafka using storm

My topology code is:

           Config conf = new Config();
           conf.setDebug(true);

           BrokerHosts host = new ZkHosts("nn1:2181");
SpoutConfig spoutConfig = new SpoutConfig(host, "demo", "/", "stormgroup");
           spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
           KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);


           TopologyBuilder builder = new TopologyBuilder();
           builder.setSpout("spout", kafkaSpout);
builder.setBolt("bolt1", new DemoBolt()).shuffleGrouping("spout");

           // if you wish to run your job on a remote cluster
           conf.setNumWorkers(4);
           conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
           conf.put(Config.STORM_ZOOKEEPER_PORT, 2181);
           conf.put(Config.WORKER_CHILDOPTS , "-Xmx4096m");
           conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS , "-Xmx4096m");
           conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING  , 512);

StormSubmitter.submitTopology(args[0], conf, builder.createTopology());


I can compile it and upload to storm server. But there are no emits. As I understand nextTuple() will called automatically?

--
Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
+372 51 480

Reply via email to