Hello all,
I am running a topology for bench marking my cluster. In it, I anchor
tuples and I acknowledge them for exactly-once processing and in order to
see the complete latency metric on the Storm UI. However, the "Complete
Latency" and the "Acked" metric values for my spouts remain 0 and I guess
that this translates to not being reported properly.
My Topology's code is really simple and consists of the following three
classes:
public static class TestWordSpout extends BaseRichSpout {
SpoutOutputCollector _collector;
public void open(@SuppressWarnings("rawtypes") Map conf,
TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
}
public void nextTuple() {
final String[] words = new String[] {"nathan", "mike",
"jackson", "golda", "bertels"};
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
Values tuple = new Values();
tuple.add((new Long(System.currentTimeMillis())).toString());
tuple.add(word);
_collector.emit(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
String[] schema = { "timestamp", "word" };
declarer.declare(new Fields(schema));
}
}
My intermediate bolts code is the following:
public static class IntermediateBolt extends BaseRichBolt {
OutputCollector _collector;
@Override
public void prepare(@SuppressWarnings("rawtypes") Map conf,
TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
Values v = new Values();
v.add(tuple.getString(0));
v.add(tuple.getString(1));
_collector.emit(tuple, v);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
String[] schema = { "timestamp", "word" };
declarer.declare(new Fields(schema));
}
}
And finally, my sink bolts (the last bolts in my topology) are the
following:
public static class SinkBolt extends BaseRichBolt {
OutputCollector _collector;
@Override
public void prepare(@SuppressWarnings("rawtypes") Map conf,
TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
_collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
String[] schema = {"timestamp", "word"};
outputFieldsDeclarer.declare(new Fields(schema));
}
}
So, I just have a 3-level topology (spout, intermediate-bolt, sink-bolt)
just to measure my cluster. However, as I mentioned above, in the UI the
"Complete latency" and the "Acked" metrics are not updated for my spouts.
Am I doing something wrong? Please, pay attention that I ack a tuple only
at the SinkBolt. Is this the reason that I my metrics are not updated?
Thanks,
Nick