Hey Aljoscha,
Thanks for replying! Yes, I have three TextIO sources and two HBaseIO sources :) I believe duplicate naming also occurs for the intermediate transforms, though. The node names that are assigned to the "user-level" transforms, I believe, never make it to Beam's primitive transforms (I think they are called that?) and consequently never make it to the task and operator names in Flink. Since I posted this problem I have replaced Flink metrics with a single, manual reporting step after the pipeline runs, so I can't verify this hunch currently if you need some more information. Reinier ________________________________ From: Aljoscha Krettek <[email protected]> Sent: 09 October 2017 16:56:26 To: [email protected] Subject: Re: Duplicate metric names when using Flink runner + Graphite reporter Hi Reinier, Do you have several sources in your pipeline? I think it's a problem of the Beam Flink runner that does not assign unique names which could be used to deduplicate the operators names that are used in the metrics name. Best, Aljoscha On 29. Sep 2017, at 20:17, Reinier Kip <[email protected]<mailto:[email protected]>> wrote: Hi all, I'm running a Beam pipeline on Flink and sending metrics via the Graphite reporter. I get repeated exceptions on the slaves, which try to register the same metric multiple times. These duplicates all concern task and operator metrics. I have given all pipeline nodes unique names. I am using Beam 2.1.0, and am thus running Flink 1.3.0. Below you'll find the error+stacktrace, Flink's metrics configuration, and several examples of duplicate metric names concerning both tasks and operators. Is this a Beam problem? Should Beam give Flink tasks and operators unique names so they'll have unique metric names? Is this a Flink problem? Should Flink or Flink's Graphite reporter support duplicate metric names? Reinier ################## Log message: [ERROR] Error while registering metric. Stack trace: java.lang.IllegalArgumentException: A metric named bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.operator.DataSource (at Read(CompressedSource) (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numSplitsProcessed already exists at com.codahale.metrics.MetricRegistry.register(MetricRegistry.java:91) at org.apache.flink.dropwizard.ScheduledDropwizardReporter.notifyOfAddedMetric(ScheduledDropwizardReporter.java:151) at org.apache.flink.runtime.metrics.MetricRegistry.register(MetricRegistry.java:294) at org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:370) at org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.meter(AbstractMetricGroup.java:336) at org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup.<init>(OperatorIOMetricGroup.java:42) at org.apache.flink.runtime.metrics.groups.OperatorMetricGroup.<init>(OperatorMetricGroup.java:45) at org.apache.flink.runtime.metrics.groups.TaskMetricGroup.addOperator(TaskMetricGroup.java:133) at org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:72) at org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1299) at org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1015) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:256) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) ################## metrics.reporters: graphite metrics.reporter.graphite.class: org.apache.flink.metrics.graphite.GraphiteReporter metrics.reporter.graphite.host: something metrics.reporter.graphite.port: 2003 metrics.reporter.graphite.protocol: TCP metrics.reporter.graphite.interval: 1 SECONDS metrics.scope.jm: bla.<host>.jobmanager metrics.scope.jm.job: bla.<host>.jobmanager.<job_name> metrics.scope.tm: bla.<host>.taskmanager.<tm_id> metrics.scope.tm.job: bla.<host>.taskmanager.<tm_id>.<job_name> metrics.scope.task: bla.<host>.taskmanager.<tm_id>.<job_name>.task.<task_name>.<subtask_index> metrics.scope.operator: bla.<host>.taskmanager.<tm_id>.<job_name>.operator.<operator_name>.<subtask_index> ################## bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.operator.DataSource (at Read(CompressedSource) (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numSplitsProcessed bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.operator.DataSource (at Read(CompressedSource) (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsOutPerSecond bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.operator.DataSource (at Read(CompressedSource) (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsOut bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.operator.DataSource (at Read(CompressedSource) (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsInPerSecond bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.operator.DataSource (at Read(CompressedSource) (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsIn bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.task.DataSource (at Read(CompressedSource) (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsOutPerSecond bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.task.DataSource (at Read(CompressedSource) (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsOut bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.task.DataSource (at Read(CompressedSource) (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsInPerSecond bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.task.DataSource (at Read(CompressedSource) (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsIn bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.task.DataSource (at Read(CompressedSource) (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numBytesOutPerSecond
