Thank you Matthias. I got it now. By the way, I am measuring end-to-end latency for a topology with an input rate of 4500 tuples-per-second, each tuple is about 64 bytes long and my cluster consists of 4x m4.xlarge supervisor nodes and 3x m4.xlarge nodes for zookeeper nodes and nimbus.
My topology consists of about 9 to 36 bolts and 4 spouts, and the deepest pipeline a tuple has to go through is 5 bolts long. By pipeline I mean something like spout-a -> bolt-1 -> bolt-2 -> ... -> bolt-out . I am measuring complete-latency (through the spouts' ack() function) around 800 msec. Is this normal? Thanks, Nick On Wed, Sep 9, 2015 at 11:53 AM, Matthias J. Sax <[email protected]> wrote: > There is no such thing for Bolts. > > The call to Spout.ack(...) happens after Storm retrieved all acks of all > (transitively) anchored tuples. > > Let's say you have spout -> bolt1 -> bolt2 > > Spout emit t1 which is processed by bolt1. > bolt1 emits t2 (with anchor t1) and acks t1. > => there will be no call to Spout.ack(...) yet, because t2 is pending > (ie, was not acked). > After bolt2 processed and acked t2, the callback to Spout.ack(t1) happens. > > > -Matthias > > > > On 09/09/2015 03:55 PM, Nick R. Katsipoulakis wrote: > > Thank you Derek for your reply. > > > > I figured out the error and it was in my code (i was not acking all > > tuples properly). I have another question: > > > > I see that the BaseRichSpout has a callback function called ack(Object > > msgId) which is called when a tuple gets acknowledged. Is there similar > > functionality for Bolts? I see that the BaseRichBolt does not have one. > > > > Thanks, > > Nick > > > > On Wed, Sep 9, 2015 at 9:45 AM, Derek Dagit <[email protected] > > <mailto:[email protected]>> wrote: > > > > The metrics used on the UI are aggregated in chunks. > > > > It could very well be that your code is working perfectly fine, and > > there is a threshold of emits/acks/fails that needs to be met before > > the numbers show up on the UI. > > > > Often I will see 0 on the UI until, for example, the number of emits > > reaches 20. And very often the numbers will increment by 20s too.-- > > Derek > > > > > > ________________________________ > > From: Nick R. Katsipoulakis <[email protected] > > <mailto:[email protected]>> > > To: [email protected] <mailto:[email protected]> > > Sent: Wednesday, September 9, 2015 7:52 AM > > Subject: Re: UIs ack statistics are not updated > > > > > > > > Hello Javier and thank you for your reply. > > > > I have a question about the Tuple ids. Do they have to be unique? I > > am asking because I have many spouts and they might emit identical > > tuples in the topology. > > > > Also, do I have to ack a tuple only in the last bolt that processes > > it, so that the tuple tree created is complete? Or, do I have to ack > > each received tuple on every bolt? > > > > Thanks, > > Nick > > > > > > > > > > On Wed, Sep 9, 2015 at 3:29 AM, Javier Gonzalez <[email protected] > > <mailto:[email protected]>> wrote: > > > > If I am reading your code correctly, it seems you're emitting from > > the spout without id - therefore, your acking efforts are not being > > used. You need to do something like: > > >Object id= <anything you like>; > > >_collector.emit(id,tuple); > > >Regards, > > >Javier > > >On Sep 8, 2015 3:19 PM, "Nick R. Katsipoulakis" > > <[email protected] <mailto:[email protected]>> wrote: > > > > > >Hello all, > > >> > > >> > > >>I am running a topology for bench marking my cluster. In it, I > > anchor tuples and I acknowledge them for exactly-once processing and > > in order to see the complete latency metric on the Storm UI. > > However, the "Complete Latency" and the "Acked" metric values for my > > spouts remain 0 and I guess that this translates to not being > > reported properly. > > >> > > >> > > >>My Topology's code is really simple and consists of the following > > three classes: > > >> > > >> > > >>public static class TestWordSpout extends BaseRichSpout { > > >> > > >>SpoutOutputCollector _collector; > > >> > > >>public void open(@SuppressWarnings("rawtypes") Map conf, > > TopologyContext context, SpoutOutputCollector collector) { > > >>_collector = collector; > > >>} > > >>public void nextTuple() { > > >>final String[] words = new String[] {"nathan", "mike", "jackson", > > "golda", "bertels"}; > > >>final Random rand = new Random(); > > >>final String word = words[rand.nextInt(words.length)]; > > >>Values tuple = new Values(); > > >>tuple.add((new Long(System.currentTimeMillis())).toString()); > > >>tuple.add(word); > > >>_collector.emit(tuple); > > >>} > > >>public void declareOutputFields(OutputFieldsDeclarer declarer) { > > >>String[] schema = { "timestamp", "word" }; > > >>declarer.declare(new Fields(schema)); > > >>} > > >>} > > >>My intermediate bolts code is the following: > > >>public static class IntermediateBolt extends BaseRichBolt { > > >> > > >>OutputCollector _collector; > > >> > > >>@Override > > >>public void prepare(@SuppressWarnings("rawtypes") Map conf, > > TopologyContext context, OutputCollector collector) { > > >>_collector = collector; > > >>} > > >>@Override > > >>public void execute(Tuple tuple) { > > >>Values v = new Values(); > > >>v.add(tuple.getString(0)); > > >>v.add(tuple.getString(1)); > > >>_collector.emit(tuple, v); > > >>} > > >>@Override > > >>public void declareOutputFields(OutputFieldsDeclarer declarer) { > > >>String[] schema = { "timestamp", "word" }; > > >>declarer.declare(new Fields(schema)); > > >>} > > >>} > > >>And finally, my sink bolts (the last bolts in my topology) are the > > following: > > >> > > >> > > >>public static class SinkBolt extends BaseRichBolt { > > >> > > >>OutputCollector _collector; > > >> > > >>@Override > > >>public void prepare(@SuppressWarnings("rawtypes") Map conf, > > TopologyContext context, OutputCollector collector) { > > >>_collector = collector; > > >>} > > >>@Override > > >>public void execute(Tuple tuple) { > > >>_collector.ack(tuple); > > >>} > > >>@Override > > >>public void declareOutputFields(OutputFieldsDeclarer > > outputFieldsDeclarer) { > > >>String[] schema = {"timestamp", "word"}; > > >>outputFieldsDeclarer.declare(new Fields(schema)); > > >>} > > >>} > > >>So, I just have a 3-level topology (spout, intermediate-bolt, > > sink-bolt) just to measure my cluster. However, as I mentioned > > above, in the UI the "Complete latency" and the "Acked" metrics are > > not updated for my spouts. Am I doing something wrong? Please, pay > > attention that I ack a tuple only at the SinkBolt. Is this the > > reason that I my metrics are not updated? > > >> > > >> > > >>Thanks, > > >>Nick > > >> > > >> > > > > > > -- > > > > Nikolaos Romanos Katsipoulakis, > > University of Pittsburgh, PhD candidate > > > > > > > > > > -- > > Nikolaos Romanos Katsipoulakis, > > University of Pittsburgh, PhD candidate > > -- Nikolaos Romanos Katsipoulakis, University of Pittsburgh, PhD candidate
