Hi Reinier, Do you have several sources in your pipeline? I think it's a problem of the Beam Flink runner that does not assign unique names which could be used to deduplicate the operators names that are used in the metrics name.
Best, Aljoscha > On 29. Sep 2017, at 20:17, Reinier Kip <[email protected]> wrote: > > Hi all, > > I'm running a Beam pipeline on Flink and sending metrics via the Graphite > reporter. I get repeated exceptions on the slaves, which try to register the > same metric multiple times. These duplicates all concern task and operator > metrics. I have given all pipeline nodes unique names. > > I am using Beam 2.1.0, and am thus running Flink 1.3.0. > > Below you'll find the error+stacktrace, Flink's metrics configuration, and > several examples of duplicate metric names concerning both tasks and > operators. > > Is this a Beam problem? Should Beam give Flink tasks and operators unique > names so they'll have unique metric names? Is this a Flink problem? Should > Flink or Flink's Graphite reporter support duplicate metric names? > > Reinier > > ################## > > Log message: [ERROR] Error while registering metric. > Stack trace: > > java.lang.IllegalArgumentException: A metric named > bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.operator.DataSource > (at Read(CompressedSource) > (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numSplitsProcessed > already exists > at com.codahale.metrics.MetricRegistry.register(MetricRegistry.java:91) > at > org.apache.flink.dropwizard.ScheduledDropwizardReporter.notifyOfAddedMetric(ScheduledDropwizardReporter.java:151) > at > org.apache.flink.runtime.metrics.MetricRegistry.register(MetricRegistry.java:294) > at > org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:370) > at > org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.meter(AbstractMetricGroup.java:336) > at > org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup.<init>(OperatorIOMetricGroup.java:42) > at > org.apache.flink.runtime.metrics.groups.OperatorMetricGroup.<init>(OperatorMetricGroup.java:45) > at > org.apache.flink.runtime.metrics.groups.TaskMetricGroup.addOperator(TaskMetricGroup.java:133) > at > org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:72) > at > org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1299) > at > org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1015) > at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:256) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > > ################## > > metrics.reporters: graphite > metrics.reporter.graphite.class: > org.apache.flink.metrics.graphite.GraphiteReporter > metrics.reporter.graphite.host: something > metrics.reporter.graphite.port: 2003 > metrics.reporter.graphite.protocol: TCP > metrics.reporter.graphite.interval: 1 SECONDS > metrics.scope.jm: bla.<host>.jobmanager > metrics.scope.jm.job: bla.<host>.jobmanager.<job_name> > metrics.scope.tm: bla.<host>.taskmanager.<tm_id> > metrics.scope.tm.job: bla.<host>.taskmanager.<tm_id>.<job_name> > metrics.scope.task: > bla.<host>.taskmanager.<tm_id>.<job_name>.task.<task_name>.<subtask_index> > metrics.scope.operator: > bla.<host>.taskmanager.<tm_id>.<job_name>.operator.<operator_name>.<subtask_index> > > ################## > > bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.operator.DataSource > (at Read(CompressedSource) > (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numSplitsProcessed > bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.operator.DataSource > (at Read(CompressedSource) > (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsOutPerSecond > bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.operator.DataSource > (at Read(CompressedSource) > (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsOut > bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.operator.DataSource > (at Read(CompressedSource) > (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsInPerSecond > bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.operator.DataSource > (at Read(CompressedSource) > (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsIn > bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.task.DataSource > (at Read(CompressedSource) > (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsOutPerSecond > bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.task.DataSource > (at Read(CompressedSource) > (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsOut > bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.task.DataSource > (at Read(CompressedSource) > (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsInPerSecond > bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.task.DataSource > (at Read(CompressedSource) > (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsIn > bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.task.DataSource > (at Read(CompressedSource) > (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numBytesOutPerSecond
