Hi everyone,

What needs to be done in order for a Topology to let the KafkaSpout/Zookeeper 
know  that messages were successfully consumed/processed and should not be sent 
to that Topology again ?

Is it something to be controlled  automatically or manually  within the Bolt?
or
Is it set on the SpoutConfig ?

I am new to Storm.

The topologies I built so far are all  performing the work expected , but are 
always and continuously consuming all messages  there were ever sent to the 
Topic .

On each Topology the SpoutConfig.zkRoot and SpoutConfig.id are:  
SpoutConfig.zkRoot = / +<topic name> , SpoutConfig.id = <the TopologyName>

The Bolt implements IRichBolt and the only code in it is some "saveToDb" work 
within the execute(Tuple tuple)method.
MyTopology.java - main(String[] args) method:
-----
.....
BrokerHosts hosts = new ZkHosts("<ip>:<port>");
SpoutConfig spoutConfig = new SpoutConfig(hosts,<topic name>,"/" +<topic 
name>,<TopologyName>);
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafkaspout_" + <topic name>,kafkaSpout);
builder.setBolt(<bolt name>, new MyBolt()).shuffleGrouping("kafkaspout_" + 
<topic name>);
Config conf = new Config();
conf.setDebug(true);
StormSubmitter.submitTopology(<topology name>, conf, builder.createTopology());
.....
——

Thanks

--
IPVP

Reply via email to