Hi everybody,

I tried to use the "new metrics reporting api" as shown here: 
http://storm.apache.org/releases/1.2.2/metrics_v2.html, however I had no luck.

The following is an example that should work according to my understanding of 
the documentation. It constructs a topology consisting of a single spout that 
emits a 1 every second.
This 1 is received by a bolt which just prints some string to stdout and 
increments a counter as described in the metrics reporting page above.
In the main method I add a config entry that should wire up the reporter. 
However, when starting the program, I only see the printlns from the bolt, and 
nothing from the reporter.

I tested this on a cluster with a CSVReporter, but also nothing.

What am I doing wrong?


```
public class MetricTest {
    public static void main(String... args) {
        System.out.println("Building Topology...");
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("s", new DummySpout());
        builder.setBolt("b", new DummyBolt())
                .shuffleGrouping("s");

        Config config = new Config();
        List<Map<String, Object>> reportersConfig = new ArrayList<>();
        Map<String, Object> consoleReporterConfig = new HashMap<>();
        consoleReporterConfig.put("class", 
"org.apache.storm.metrics2.reporters.ConsoleStormReporter");
        List<String> daemonList = new ArrayList<>();
        daemonList.add("worker");
        daemonList.add("nimbus");
        daemonList.add("supervisor");
        consoleReporterConfig.put("daemons", daemonList);
        consoleReporterConfig.put("report.period", 5);
        consoleReporterConfig.put("report.period.units", "SECONDS");
        reportersConfig.add(consoleReporterConfig);

        config.put("storm.metrics.reporters", reportersConfig);

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("t", config, builder.createTopology());
        Utils.sleep(60_000);
        cluster.killTopology("t");
        cluster.shutdown();
    }
}

class DummySpout extends BaseRichSpout {
    SpoutOutputCollector collector;

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector 
collector) {
        this.collector = collector;
    }

    @Override
    public void nextTuple() {
        Utils.sleep(1000);
        collector.emit(new Values(1));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("x"));
    }
}

class DummyBolt extends BaseRichBolt {
    private Counter tupleCounter;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector 
collector) {
        this.tupleCounter = context.registerCounter("tupleCount");
    }

    @Override
    public void execute(Tuple input) {
        System.out.println("Received tuple with x=" + input.getValue(0));
        this.tupleCounter.inc();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) { }
}
```


Viele Grüße
Best Regards

Manuel Dossinger
[email protected]

Reply via email to