Hi
I am sure that I am doing something wrong. Since now I am worked with
storm and kafka but newer put them together.
Now I'd like to consume from kafka using storm
My topology code is:
Config conf = new Config();
conf.setDebug(true);
BrokerHosts host = new ZkHosts("nn1:2181");
SpoutConfig spoutConfig = new SpoutConfig(host, "demo", "/",
"stormgroup");
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", kafkaSpout);
builder.setBolt("bolt1", new
DemoBolt()).shuffleGrouping("spout");
// if you wish to run your job on a remote cluster
conf.setNumWorkers(4);
conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
conf.put(Config.STORM_ZOOKEEPER_PORT, 2181);
conf.put(Config.WORKER_CHILDOPTS , "-Xmx4096m");
conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS , "-Xmx4096m");
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING , 512);
StormSubmitter.submitTopology(args[0], conf,
builder.createTopology());
I can compile it and upload to storm server. But there are no emits. As
I understand nextTuple() will called automatically?
--
Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
+372 51 480