Hello Javier and thank you for your reply.

I have a question about the Tuple ids. Do they have to be unique? I am
asking because I have many spouts and they might emit identical tuples in
the topology.

Also, do I have to ack a tuple only in the last bolt that processes it, so
that the tuple tree created is complete? Or, do I have to ack each received
tuple on every bolt?

Thanks,
Nick

On Wed, Sep 9, 2015 at 3:29 AM, Javier Gonzalez <[email protected]> wrote:

> If I am reading your code correctly, it seems you're emitting from the
> spout without id - therefore, your acking efforts are not being used. You
> need to do something like:
>
> Object id= <anything you like>;
> _collector.emit(id,tuple);
>
> Regards,
> Javier
> On Sep 8, 2015 3:19 PM, "Nick R. Katsipoulakis" <[email protected]>
> wrote:
>
>> Hello all,
>>
>> I am running a topology for bench marking my cluster. In it, I anchor
>> tuples and I acknowledge them for exactly-once processing and in order to
>> see the complete latency metric on the Storm UI. However, the "Complete
>> Latency" and the "Acked" metric values for my spouts remain 0 and I guess
>> that this translates to not being reported properly.
>>
>> My Topology's code is really simple and consists of the following three
>> classes:
>>
>> public static class TestWordSpout extends BaseRichSpout {
>>
>>    SpoutOutputCollector _collector;
>>
>>    public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext 
>> context, SpoutOutputCollector collector) {
>>       _collector = collector;
>>    }
>>    public void nextTuple() {
>>       final String[] words = new String[] {"nathan", "mike", "jackson", 
>> "golda", "bertels"};
>>       final Random rand = new Random();
>>       final String word = words[rand.nextInt(words.length)];
>>       Values tuple = new Values();
>>       tuple.add((new Long(System.currentTimeMillis())).toString());
>>       tuple.add(word);
>>       _collector.emit(tuple);
>>    }
>>    public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>       String[] schema = { "timestamp", "word" };
>>       declarer.declare(new Fields(schema));
>>    }
>> }
>>
>> My intermediate bolts code is the following:
>>
>> public static class IntermediateBolt extends BaseRichBolt {
>>
>>       OutputCollector _collector;
>>
>>       @Override
>>       public void prepare(@SuppressWarnings("rawtypes") Map conf, 
>> TopologyContext context, OutputCollector collector) {
>>          _collector = collector;
>>       }
>>       @Override
>>       public void execute(Tuple tuple) {
>>          Values v = new Values();
>>          v.add(tuple.getString(0));
>>          v.add(tuple.getString(1));
>>          _collector.emit(tuple, v);
>>       }
>>       @Override
>>       public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>          String[] schema = { "timestamp", "word" };
>>          declarer.declare(new Fields(schema));
>>       }
>>    }
>>
>> And finally, my sink bolts (the last bolts in my topology) are the
>> following:
>>
>> public static class SinkBolt extends BaseRichBolt {
>>
>>    OutputCollector _collector;
>>
>>    @Override
>>    public void prepare(@SuppressWarnings("rawtypes") Map conf, 
>> TopologyContext context, OutputCollector collector) {
>>       _collector = collector;
>>    }
>>    @Override
>>    public void execute(Tuple tuple) {
>>          _collector.ack(tuple);
>>    }
>>    @Override
>>    public void declareOutputFields(OutputFieldsDeclarer 
>> outputFieldsDeclarer) {
>>       String[] schema = {"timestamp", "word"};
>>       outputFieldsDeclarer.declare(new Fields(schema));
>>    }
>> }
>>
>> So, I just have a 3-level topology (spout, intermediate-bolt, sink-bolt)
>> just to measure my cluster. However, as I mentioned above, in the UI the
>> "Complete latency" and the "Acked" metrics are not updated for my spouts.
>> Am I doing something wrong? Please, pay attention that I ack a tuple only
>> at the SinkBolt. Is this the reason that I my metrics are not updated?
>>
>> Thanks,
>> Nick
>>
>>


-- 
Nikolaos Romanos Katsipoulakis,
University of Pittsburgh, PhD candidate

Reply via email to