Thank you Matthias. I got it now.

By the way, I am measuring end-to-end latency for a topology with an input
rate of 4500 tuples-per-second, each tuple is about 64 bytes long and my
cluster consists of 4x m4.xlarge supervisor nodes and 3x m4.xlarge nodes
for zookeeper nodes and nimbus.

My topology consists of about 9 to 36 bolts and 4 spouts, and the deepest
pipeline a tuple has to go through is 5 bolts long. By pipeline I mean
something like spout-a -> bolt-1 -> bolt-2 -> ... -> bolt-out .

I am measuring complete-latency (through the spouts' ack() function) around
800 msec. Is this normal?

Thanks,
Nick

On Wed, Sep 9, 2015 at 11:53 AM, Matthias J. Sax <[email protected]> wrote:

> There is no such thing for Bolts.
>
> The call to Spout.ack(...) happens after Storm retrieved all acks of all
> (transitively) anchored tuples.
>
> Let's say you have spout -> bolt1 -> bolt2
>
> Spout emit t1 which is processed by bolt1.
> bolt1 emits t2 (with anchor t1) and acks t1.
> => there will be no call to Spout.ack(...) yet, because t2 is pending
> (ie, was not acked).
> After bolt2 processed and acked t2, the callback to Spout.ack(t1) happens.
>
>
> -Matthias
>
>
>
> On 09/09/2015 03:55 PM, Nick R. Katsipoulakis wrote:
> > Thank you Derek for your reply.
> >
> > I figured out the error and it was in my code (i was not acking all
> > tuples properly). I have another question:
> >
> > I see that the BaseRichSpout has a callback function called ack(Object
> > msgId) which is called when a tuple gets acknowledged. Is there similar
> > functionality for Bolts? I see that the BaseRichBolt does not have one.
> >
> > Thanks,
> > Nick
> >
> > On Wed, Sep 9, 2015 at 9:45 AM, Derek Dagit <[email protected]
> > <mailto:[email protected]>> wrote:
> >
> >     The metrics used on the UI are aggregated in chunks.
> >
> >     It could very well be that your code is working perfectly fine, and
> >     there is a threshold of emits/acks/fails that needs to be met before
> >     the numbers show up on the UI.
> >
> >     Often I will see 0 on the UI until, for example, the number of emits
> >     reaches 20.  And very often the numbers will increment by 20s too.--
> >     Derek
> >
> >
> >     ________________________________
> >     From: Nick R. Katsipoulakis <[email protected]
> >     <mailto:[email protected]>>
> >     To: [email protected] <mailto:[email protected]>
> >     Sent: Wednesday, September 9, 2015 7:52 AM
> >     Subject: Re: UIs ack statistics are not updated
> >
> >
> >
> >     Hello Javier and thank you for your reply.
> >
> >     I have a question about the Tuple ids. Do they have to be unique? I
> >     am asking because I have many spouts and they might emit identical
> >     tuples in the topology.
> >
> >     Also, do I have to ack a tuple only in the last bolt that processes
> >     it, so that the tuple tree created is complete? Or, do I have to ack
> >     each received tuple on every bolt?
> >
> >     Thanks,
> >     Nick
> >
> >
> >
> >
> >     On Wed, Sep 9, 2015 at 3:29 AM, Javier Gonzalez <[email protected]
> >     <mailto:[email protected]>> wrote:
> >
> >     If I am reading your code correctly, it seems you're emitting from
> >     the spout without id - therefore, your acking efforts are not being
> >     used. You need to do something like:
> >     >Object id= <anything you like>;
> >     >_collector.emit(id,tuple);
> >     >Regards,
> >     >Javier
> >     >On Sep 8, 2015 3:19 PM, "Nick R. Katsipoulakis"
> >     <[email protected] <mailto:[email protected]>> wrote:
> >     >
> >     >Hello all,
> >     >>
> >     >>
> >     >>I am running a topology for bench marking my cluster. In it, I
> >     anchor tuples and I acknowledge them for exactly-once processing and
> >     in order to see the complete latency metric on the Storm UI.
> >     However, the "Complete Latency" and the "Acked" metric values for my
> >     spouts remain 0 and I guess that this translates to not being
> >     reported properly.
> >     >>
> >     >>
> >     >>My Topology's code is really simple and consists of the following
> >     three classes:
> >     >>
> >     >>
> >     >>public static class TestWordSpout extends BaseRichSpout {
> >     >>
> >     >>SpoutOutputCollector _collector;
> >     >>
> >     >>public void open(@SuppressWarnings("rawtypes") Map conf,
> >     TopologyContext context, SpoutOutputCollector collector) {
> >     >>_collector = collector;
> >     >>}
> >     >>public void nextTuple() {
> >     >>final String[] words = new String[] {"nathan", "mike", "jackson",
> >     "golda", "bertels"};
> >     >>final Random rand = new Random();
> >     >>final String word = words[rand.nextInt(words.length)];
> >     >>Values tuple = new Values();
> >     >>tuple.add((new Long(System.currentTimeMillis())).toString());
> >     >>tuple.add(word);
> >     >>_collector.emit(tuple);
> >     >>}
> >     >>public void declareOutputFields(OutputFieldsDeclarer declarer) {
> >     >>String[] schema = { "timestamp", "word" };
> >     >>declarer.declare(new Fields(schema));
> >     >>}
> >     >>}
> >     >>My intermediate bolts code is the following:
> >     >>public static class IntermediateBolt extends BaseRichBolt {
> >     >>
> >     >>OutputCollector _collector;
> >     >>
> >     >>@Override
> >     >>public void prepare(@SuppressWarnings("rawtypes") Map conf,
> >     TopologyContext context, OutputCollector collector) {
> >     >>_collector = collector;
> >     >>}
> >     >>@Override
> >     >>public void execute(Tuple tuple) {
> >     >>Values v = new Values();
> >     >>v.add(tuple.getString(0));
> >     >>v.add(tuple.getString(1));
> >     >>_collector.emit(tuple, v);
> >     >>}
> >     >>@Override
> >     >>public void declareOutputFields(OutputFieldsDeclarer declarer) {
> >     >>String[] schema = { "timestamp", "word" };
> >     >>declarer.declare(new Fields(schema));
> >     >>}
> >     >>}
> >     >>And finally, my sink bolts (the last bolts in my topology) are the
> >     following:
> >     >>
> >     >>
> >     >>public static class SinkBolt extends BaseRichBolt {
> >     >>
> >     >>OutputCollector _collector;
> >     >>
> >     >>@Override
> >     >>public void prepare(@SuppressWarnings("rawtypes") Map conf,
> >     TopologyContext context, OutputCollector collector) {
> >     >>_collector = collector;
> >     >>}
> >     >>@Override
> >     >>public void execute(Tuple tuple) {
> >     >>_collector.ack(tuple);
> >     >>}
> >     >>@Override
> >     >>public void declareOutputFields(OutputFieldsDeclarer
> >     outputFieldsDeclarer) {
> >     >>String[] schema = {"timestamp", "word"};
> >     >>outputFieldsDeclarer.declare(new Fields(schema));
> >     >>}
> >     >>}
> >     >>So, I just have a 3-level topology (spout, intermediate-bolt,
> >     sink-bolt) just to measure my cluster. However, as I mentioned
> >     above, in the UI the "Complete latency" and the "Acked" metrics are
> >     not updated for my spouts. Am I doing something wrong? Please, pay
> >     attention that I ack a tuple only at the SinkBolt. Is this the
> >     reason that I my metrics are not updated?
> >     >>
> >     >>
> >     >>Thanks,
> >     >>Nick
> >     >>
> >     >>
> >
> >
> >     --
> >
> >     Nikolaos Romanos Katsipoulakis,
> >     University of Pittsburgh, PhD candidate
> >
> >
> >
> >
> > --
> > Nikolaos Romanos Katsipoulakis,
> > University of Pittsburgh, PhD candidate
>
>


-- 
Nikolaos Romanos Katsipoulakis,
University of Pittsburgh, PhD candidate

Reply via email to