I may be completely off base with this, but I think metrics reporters are configured in the cluster configuration, and not on a per-topology basis. Note how the documentation you linked sets the config in storm.yaml, rather than in the topology configuration.
Could you try putting the metrics configuration in the cluster configuration instead? On 1.x you would do this by making your LocalCluster via the Testing.withLocalCluster method. That method takes a MkClusterParam, which takes the cluster config map. Example here https://github.com/xumingming/storm-lib/blob/50394ff463da0d92ac4e07dee78a664a3a227e9c/src/jvm/storm/TestingApiDemo.java#L88 In 2.x you can configure this easier from Java using the LocalCluster.Builder. Den tir. 14. maj 2019 kl. 10.49 skrev Manuel Dossinger < [email protected]>: > Hi everybody, > > I tried to use the "new metrics reporting api" as shown here: > http://storm.apache.org/releases/1.2.2/metrics_v2.html, however I had no > luck. > > The following is an example that should work according to my understanding > of the documentation. It constructs a topology consisting of a single spout > that emits a 1 every second. > This 1 is received by a bolt which just prints some string to stdout and > increments a counter as described in the metrics reporting page above. > In the main method I add a config entry that should wire up the reporter. > However, when starting the program, I only see the printlns from the bolt, > and nothing from the reporter. > > I tested this on a cluster with a CSVReporter, but also nothing. > > What am I doing wrong? > > > ``` > public class MetricTest { > public static void main(String... args) { > System.out.println("Building Topology..."); > TopologyBuilder builder = new TopologyBuilder(); > builder.setSpout("s", new DummySpout()); > builder.setBolt("b", new DummyBolt()) > .shuffleGrouping("s"); > > Config config = new Config(); > List<Map<String, Object>> reportersConfig = new ArrayList<>(); > Map<String, Object> consoleReporterConfig = new HashMap<>(); > consoleReporterConfig.put("class", > "org.apache.storm.metrics2.reporters.ConsoleStormReporter"); > List<String> daemonList = new ArrayList<>(); > daemonList.add("worker"); > daemonList.add("nimbus"); > daemonList.add("supervisor"); > consoleReporterConfig.put("daemons", daemonList); > consoleReporterConfig.put("report.period", 5); > consoleReporterConfig.put("report.period.units", "SECONDS"); > reportersConfig.add(consoleReporterConfig); > > config.put("storm.metrics.reporters", reportersConfig); > > LocalCluster cluster = new LocalCluster(); > cluster.submitTopology("t", config, builder.createTopology()); > Utils.sleep(60_000); > cluster.killTopology("t"); > cluster.shutdown(); > } > } > > class DummySpout extends BaseRichSpout { > SpoutOutputCollector collector; > > @Override > public void open(Map conf, TopologyContext context, > SpoutOutputCollector collector) { > this.collector = collector; > } > > @Override > public void nextTuple() { > Utils.sleep(1000); > collector.emit(new Values(1)); > } > > @Override > public void declareOutputFields(OutputFieldsDeclarer declarer) { > declarer.declare(new Fields("x")); > } > } > > class DummyBolt extends BaseRichBolt { > private Counter tupleCounter; > > @Override > public void prepare(Map stormConf, TopologyContext context, > OutputCollector collector) { > this.tupleCounter = context.registerCounter("tupleCount"); > } > > @Override > public void execute(Tuple input) { > System.out.println("Received tuple with x=" + input.getValue(0)); > this.tupleCounter.inc(); > } > > @Override > public void declareOutputFields(OutputFieldsDeclarer declarer) { } > } > ``` > > > Viele Grüße > Best Regards > > Manuel Dossinger > [email protected] > >
