Hi,
First, depends on your use case. Do you want to have processing : at
least once, at most once or exactly once.
This is mostly dependent on the way you anchor and ack tuples. For
"exactly once" use case, this is a bit more trickier. See documentation :
http://storm.apache.org/releases/0.10.0/Guaranteeing-message-processing.html
As for KafkaSpout, it starts back at the last offset validated. Thing
is, if you accept more than 1 tuple pending in your topology, kafkaSpout
will consume as many tuple as possible until the topology has the number
of tuples in flight.
If you allow 10 tuples in flight, then it consumes the 10 tuples, but if
you manage to ack tuple 1 and 2, but tuple 3 fails, tuple 4 and 5 ack
and the others are still in flight when you receive the failure from
tuple 3, kafka spout will go backward up to tuple 3 to replay from tuple
3. It will so replay 3, 4, 5, ... leading to potential duplicates.
You can use the formalism of "exactly once" to handle the issue or very
strictly limit the number of tuples in flight to 1. You'll process
tuples one at a time though, so far from full speed I guess.
Le 30/03/2016 04:10, I PVP a écrit :
Hi everyone,
What needs to be done in order for a Topology to let the
KafkaSpout/Zookeeper know that messages were successfully
consumed/processed and should not be sent to that Topology again ?
Is it something to be controlled automatically or manually within
the Bolt?
or
Is it set on the SpoutConfig ?
I am new to Storm.
The topologies I built so far are all performing the work expected ,
but are always and continuously consuming all messages there were
ever sent to the Topic .
On each Topology the SpoutConfig.zkRoot and SpoutConfig.id are:
SpoutConfig.zkRoot = / +<topic name> , SpoutConfig.id = <the
TopologyName>
The Bolt implements IRichBolt and the only code in it is some
"saveToDb" work within the execute(Tuple tuple)method.
MyTopology.java - main(String[] args) method:
-----
.....
BrokerHosts hosts = new ZkHosts("<ip>:<port>");
SpoutConfig spoutConfig = new SpoutConfig(hosts,<topic name>,"/"
+<topic name>,<TopologyName>);
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafkaspout_" + <topic name>,kafkaSpout);
builder.setBolt(<bolt name>, new
MyBolt()).shuffleGrouping("kafkaspout_" + <topic name>);
Config conf = new Config();
conf.setDebug(true);
StormSubmitter.submitTopology(<topology name>, conf,
builder.createTopology());
.....
——
Thanks
--
IPVP
--
BR,
Aurelien Violette