If I am reading your code correctly, it seems you're emitting from the
spout without id - therefore, your acking efforts are not being used. You
need to do something like:

Object id= <anything you like>;
_collector.emit(id,tuple);

Regards,
Javier
On Sep 8, 2015 3:19 PM, "Nick R. Katsipoulakis" <[email protected]>
wrote:

> Hello all,
>
> I am running a topology for bench marking my cluster. In it, I anchor
> tuples and I acknowledge them for exactly-once processing and in order to
> see the complete latency metric on the Storm UI. However, the "Complete
> Latency" and the "Acked" metric values for my spouts remain 0 and I guess
> that this translates to not being reported properly.
>
> My Topology's code is really simple and consists of the following three
> classes:
>
> public static class TestWordSpout extends BaseRichSpout {
>
>    SpoutOutputCollector _collector;
>
>    public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext 
> context, SpoutOutputCollector collector) {
>       _collector = collector;
>    }
>    public void nextTuple() {
>       final String[] words = new String[] {"nathan", "mike", "jackson", 
> "golda", "bertels"};
>       final Random rand = new Random();
>       final String word = words[rand.nextInt(words.length)];
>       Values tuple = new Values();
>       tuple.add((new Long(System.currentTimeMillis())).toString());
>       tuple.add(word);
>       _collector.emit(tuple);
>    }
>    public void declareOutputFields(OutputFieldsDeclarer declarer) {
>       String[] schema = { "timestamp", "word" };
>       declarer.declare(new Fields(schema));
>    }
> }
>
> My intermediate bolts code is the following:
>
> public static class IntermediateBolt extends BaseRichBolt {
>
>       OutputCollector _collector;
>
>       @Override
>       public void prepare(@SuppressWarnings("rawtypes") Map conf, 
> TopologyContext context, OutputCollector collector) {
>          _collector = collector;
>       }
>       @Override
>       public void execute(Tuple tuple) {
>          Values v = new Values();
>          v.add(tuple.getString(0));
>          v.add(tuple.getString(1));
>          _collector.emit(tuple, v);
>       }
>       @Override
>       public void declareOutputFields(OutputFieldsDeclarer declarer) {
>          String[] schema = { "timestamp", "word" };
>          declarer.declare(new Fields(schema));
>       }
>    }
>
> And finally, my sink bolts (the last bolts in my topology) are the
> following:
>
> public static class SinkBolt extends BaseRichBolt {
>
>    OutputCollector _collector;
>
>    @Override
>    public void prepare(@SuppressWarnings("rawtypes") Map conf, 
> TopologyContext context, OutputCollector collector) {
>       _collector = collector;
>    }
>    @Override
>    public void execute(Tuple tuple) {
>          _collector.ack(tuple);
>    }
>    @Override
>    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) 
> {
>       String[] schema = {"timestamp", "word"};
>       outputFieldsDeclarer.declare(new Fields(schema));
>    }
> }
>
> So, I just have a 3-level topology (spout, intermediate-bolt, sink-bolt)
> just to measure my cluster. However, as I mentioned above, in the UI the
> "Complete latency" and the "Acked" metrics are not updated for my spouts.
> Am I doing something wrong? Please, pay attention that I ack a tuple only
> at the SinkBolt. Is this the reason that I my metrics are not updated?
>
> Thanks,
> Nick
>
>

Reply via email to