If I am reading your code correctly, it seems you're emitting from the spout without id - therefore, your acking efforts are not being used. You need to do something like:
Object id= <anything you like>; _collector.emit(id,tuple); Regards, Javier On Sep 8, 2015 3:19 PM, "Nick R. Katsipoulakis" <[email protected]> wrote: > Hello all, > > I am running a topology for bench marking my cluster. In it, I anchor > tuples and I acknowledge them for exactly-once processing and in order to > see the complete latency metric on the Storm UI. However, the "Complete > Latency" and the "Acked" metric values for my spouts remain 0 and I guess > that this translates to not being reported properly. > > My Topology's code is really simple and consists of the following three > classes: > > public static class TestWordSpout extends BaseRichSpout { > > SpoutOutputCollector _collector; > > public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext > context, SpoutOutputCollector collector) { > _collector = collector; > } > public void nextTuple() { > final String[] words = new String[] {"nathan", "mike", "jackson", > "golda", "bertels"}; > final Random rand = new Random(); > final String word = words[rand.nextInt(words.length)]; > Values tuple = new Values(); > tuple.add((new Long(System.currentTimeMillis())).toString()); > tuple.add(word); > _collector.emit(tuple); > } > public void declareOutputFields(OutputFieldsDeclarer declarer) { > String[] schema = { "timestamp", "word" }; > declarer.declare(new Fields(schema)); > } > } > > My intermediate bolts code is the following: > > public static class IntermediateBolt extends BaseRichBolt { > > OutputCollector _collector; > > @Override > public void prepare(@SuppressWarnings("rawtypes") Map conf, > TopologyContext context, OutputCollector collector) { > _collector = collector; > } > @Override > public void execute(Tuple tuple) { > Values v = new Values(); > v.add(tuple.getString(0)); > v.add(tuple.getString(1)); > _collector.emit(tuple, v); > } > @Override > public void declareOutputFields(OutputFieldsDeclarer declarer) { > String[] schema = { "timestamp", "word" }; > declarer.declare(new Fields(schema)); > } > } > > And finally, my sink bolts (the last bolts in my topology) are the > following: > > public static class SinkBolt extends BaseRichBolt { > > OutputCollector _collector; > > @Override > public void prepare(@SuppressWarnings("rawtypes") Map conf, > TopologyContext context, OutputCollector collector) { > _collector = collector; > } > @Override > public void execute(Tuple tuple) { > _collector.ack(tuple); > } > @Override > public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) > { > String[] schema = {"timestamp", "word"}; > outputFieldsDeclarer.declare(new Fields(schema)); > } > } > > So, I just have a 3-level topology (spout, intermediate-bolt, sink-bolt) > just to measure my cluster. However, as I mentioned above, in the UI the > "Complete latency" and the "Acked" metrics are not updated for my spouts. > Am I doing something wrong? Please, pay attention that I ack a tuple only > at the SinkBolt. Is this the reason that I my metrics are not updated? > > Thanks, > Nick > >
