Hello all,

I am running a topology for bench marking my cluster. In it, I anchor
tuples and I acknowledge them for exactly-once processing and in order to
see the complete latency metric on the Storm UI. However, the "Complete
Latency" and the "Acked" metric values for my spouts remain 0 and I guess
that this translates to not being reported properly.

My Topology's code is really simple and consists of the following three
classes:

public static class TestWordSpout extends BaseRichSpout {

   SpoutOutputCollector _collector;

   public void open(@SuppressWarnings("rawtypes") Map conf,
TopologyContext context, SpoutOutputCollector collector) {
      _collector = collector;
   }
   public void nextTuple() {
      final String[] words = new String[] {"nathan", "mike",
"jackson", "golda", "bertels"};
      final Random rand = new Random();
      final String word = words[rand.nextInt(words.length)];
      Values tuple = new Values();
      tuple.add((new Long(System.currentTimeMillis())).toString());
      tuple.add(word);
      _collector.emit(tuple);
   }
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      String[] schema = { "timestamp", "word" };
      declarer.declare(new Fields(schema));
   }
}

My intermediate bolts code is the following:

public static class IntermediateBolt extends BaseRichBolt {

      OutputCollector _collector;

      @Override
      public void prepare(@SuppressWarnings("rawtypes") Map conf,
TopologyContext context, OutputCollector collector) {
         _collector = collector;
      }
      @Override
      public void execute(Tuple tuple) {
         Values v = new Values();
         v.add(tuple.getString(0));
         v.add(tuple.getString(1));
         _collector.emit(tuple, v);
      }
      @Override
      public void declareOutputFields(OutputFieldsDeclarer declarer) {
         String[] schema = { "timestamp", "word" };
         declarer.declare(new Fields(schema));
      }
   }

And finally, my sink bolts (the last bolts in my topology) are the
following:

public static class SinkBolt extends BaseRichBolt {

   OutputCollector _collector;

   @Override
   public void prepare(@SuppressWarnings("rawtypes") Map conf,
TopologyContext context, OutputCollector collector) {
      _collector = collector;
   }
   @Override
   public void execute(Tuple tuple) {
         _collector.ack(tuple);
   }
   @Override
   public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
      String[] schema = {"timestamp", "word"};
      outputFieldsDeclarer.declare(new Fields(schema));
   }
}

So, I just have a 3-level topology (spout, intermediate-bolt, sink-bolt)
just to measure my cluster. However, as I mentioned above, in the UI the
"Complete latency" and the "Acked" metrics are not updated for my spouts.
Am I doing something wrong? Please, pay attention that I ack a tuple only
at the SinkBolt. Is this the reason that I my metrics are not updated?

Thanks,
Nick

Reply via email to