Hi everybody,
I tried to use the "new metrics reporting api" as shown here:
http://storm.apache.org/releases/1.2.2/metrics_v2.html, however I had no luck.
The following is an example that should work according to my understanding of
the documentation. It constructs a topology consisting of a single spout that
emits a 1 every second.
This 1 is received by a bolt which just prints some string to stdout and
increments a counter as described in the metrics reporting page above.
In the main method I add a config entry that should wire up the reporter.
However, when starting the program, I only see the printlns from the bolt, and
nothing from the reporter.
I tested this on a cluster with a CSVReporter, but also nothing.
What am I doing wrong?
```
public class MetricTest {
public static void main(String... args) {
System.out.println("Building Topology...");
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("s", new DummySpout());
builder.setBolt("b", new DummyBolt())
.shuffleGrouping("s");
Config config = new Config();
List<Map<String, Object>> reportersConfig = new ArrayList<>();
Map<String, Object> consoleReporterConfig = new HashMap<>();
consoleReporterConfig.put("class",
"org.apache.storm.metrics2.reporters.ConsoleStormReporter");
List<String> daemonList = new ArrayList<>();
daemonList.add("worker");
daemonList.add("nimbus");
daemonList.add("supervisor");
consoleReporterConfig.put("daemons", daemonList);
consoleReporterConfig.put("report.period", 5);
consoleReporterConfig.put("report.period.units", "SECONDS");
reportersConfig.add(consoleReporterConfig);
config.put("storm.metrics.reporters", reportersConfig);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("t", config, builder.createTopology());
Utils.sleep(60_000);
cluster.killTopology("t");
cluster.shutdown();
}
}
class DummySpout extends BaseRichSpout {
SpoutOutputCollector collector;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector
collector) {
this.collector = collector;
}
@Override
public void nextTuple() {
Utils.sleep(1000);
collector.emit(new Values(1));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("x"));
}
}
class DummyBolt extends BaseRichBolt {
private Counter tupleCounter;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector
collector) {
this.tupleCounter = context.registerCounter("tupleCount");
}
@Override
public void execute(Tuple input) {
System.out.println("Received tuple with x=" + input.getValue(0));
this.tupleCounter.inc();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) { }
}
```
Viele Grüße
Best Regards
Manuel Dossinger
[email protected]