Hello Javier and thank you for your reply. I have a question about the Tuple ids. Do they have to be unique? I am asking because I have many spouts and they might emit identical tuples in the topology.
Also, do I have to ack a tuple only in the last bolt that processes it, so that the tuple tree created is complete? Or, do I have to ack each received tuple on every bolt? Thanks, Nick On Wed, Sep 9, 2015 at 3:29 AM, Javier Gonzalez <[email protected]> wrote: > If I am reading your code correctly, it seems you're emitting from the > spout without id - therefore, your acking efforts are not being used. You > need to do something like: > > Object id= <anything you like>; > _collector.emit(id,tuple); > > Regards, > Javier > On Sep 8, 2015 3:19 PM, "Nick R. Katsipoulakis" <[email protected]> > wrote: > >> Hello all, >> >> I am running a topology for bench marking my cluster. In it, I anchor >> tuples and I acknowledge them for exactly-once processing and in order to >> see the complete latency metric on the Storm UI. However, the "Complete >> Latency" and the "Acked" metric values for my spouts remain 0 and I guess >> that this translates to not being reported properly. >> >> My Topology's code is really simple and consists of the following three >> classes: >> >> public static class TestWordSpout extends BaseRichSpout { >> >> SpoutOutputCollector _collector; >> >> public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext >> context, SpoutOutputCollector collector) { >> _collector = collector; >> } >> public void nextTuple() { >> final String[] words = new String[] {"nathan", "mike", "jackson", >> "golda", "bertels"}; >> final Random rand = new Random(); >> final String word = words[rand.nextInt(words.length)]; >> Values tuple = new Values(); >> tuple.add((new Long(System.currentTimeMillis())).toString()); >> tuple.add(word); >> _collector.emit(tuple); >> } >> public void declareOutputFields(OutputFieldsDeclarer declarer) { >> String[] schema = { "timestamp", "word" }; >> declarer.declare(new Fields(schema)); >> } >> } >> >> My intermediate bolts code is the following: >> >> public static class IntermediateBolt extends BaseRichBolt { >> >> OutputCollector _collector; >> >> @Override >> public void prepare(@SuppressWarnings("rawtypes") Map conf, >> TopologyContext context, OutputCollector collector) { >> _collector = collector; >> } >> @Override >> public void execute(Tuple tuple) { >> Values v = new Values(); >> v.add(tuple.getString(0)); >> v.add(tuple.getString(1)); >> _collector.emit(tuple, v); >> } >> @Override >> public void declareOutputFields(OutputFieldsDeclarer declarer) { >> String[] schema = { "timestamp", "word" }; >> declarer.declare(new Fields(schema)); >> } >> } >> >> And finally, my sink bolts (the last bolts in my topology) are the >> following: >> >> public static class SinkBolt extends BaseRichBolt { >> >> OutputCollector _collector; >> >> @Override >> public void prepare(@SuppressWarnings("rawtypes") Map conf, >> TopologyContext context, OutputCollector collector) { >> _collector = collector; >> } >> @Override >> public void execute(Tuple tuple) { >> _collector.ack(tuple); >> } >> @Override >> public void declareOutputFields(OutputFieldsDeclarer >> outputFieldsDeclarer) { >> String[] schema = {"timestamp", "word"}; >> outputFieldsDeclarer.declare(new Fields(schema)); >> } >> } >> >> So, I just have a 3-level topology (spout, intermediate-bolt, sink-bolt) >> just to measure my cluster. However, as I mentioned above, in the UI the >> "Complete latency" and the "Acked" metrics are not updated for my spouts. >> Am I doing something wrong? Please, pay attention that I ack a tuple only >> at the SinkBolt. Is this the reason that I my metrics are not updated? >> >> Thanks, >> Nick >> >> -- Nikolaos Romanos Katsipoulakis, University of Pittsburgh, PhD candidate
