Hi everyone,
What needs to be done in order for a Topology to let the KafkaSpout/Zookeeper
know that messages were successfully consumed/processed and should not be sent
to that Topology again ?
Is it something to be controlled automatically or manually within the Bolt?
or
Is it set on the SpoutConfig ?
I am new to Storm.
The topologies I built so far are all performing the work expected , but are
always and continuously consuming all messages there were ever sent to the
Topic .
On each Topology the SpoutConfig.zkRoot and SpoutConfig.id are:
SpoutConfig.zkRoot = / +<topic name> , SpoutConfig.id = <the TopologyName>
The Bolt implements IRichBolt and the only code in it is some "saveToDb" work
within the execute(Tuple tuple)method.
MyTopology.java - main(String[] args) method:
-----
.....
BrokerHosts hosts = new ZkHosts("<ip>:<port>");
SpoutConfig spoutConfig = new SpoutConfig(hosts,<topic name>,"/" +<topic
name>,<TopologyName>);
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafkaspout_" + <topic name>,kafkaSpout);
builder.setBolt(<bolt name>, new MyBolt()).shuffleGrouping("kafkaspout_" +
<topic name>);
Config conf = new Config();
conf.setDebug(true);
StormSubmitter.submitTopology(<topology name>, conf, builder.createTopology());
.....
——
Thanks
--
IPVP