Hi all,

I'm running a Beam pipeline on Flink and sending metrics via the Graphite 
reporter. I get repeated exceptions on the slaves, which try to register the 
same metric multiple times. These duplicates all concern task and operator 
metrics. I have given all pipeline nodes unique names.

I am using Beam 2.1.0, and am thus running Flink 1.3.0.

Below you'll find the error+stacktrace, Flink's metrics configuration, and 
several examples of duplicate metric names concerning both tasks and operators.

Is this a Beam problem? Should Beam give Flink tasks and operators unique names 
so they'll have unique metric names? Is this a Flink problem? Should Flink or 
Flink's Graphite reporter support duplicate metric names?

Reinier

##################

Log message: [ERROR] Error while registering metric.
Stack trace:

java.lang.IllegalArgumentException: A metric named 
bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.operator.DataSource
 (at Read(CompressedSource) 
(org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numSplitsProcessed
 already exists
at com.codahale.metrics.MetricRegistry.register(MetricRegistry.java:91)
at 
org.apache.flink.dropwizard.ScheduledDropwizardReporter.notifyOfAddedMetric(ScheduledDropwizardReporter.java:151)
at 
org.apache.flink.runtime.metrics.MetricRegistry.register(MetricRegistry.java:294)
at 
org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:370)
at 
org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.meter(AbstractMetricGroup.java:336)
at 
org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup.<init>(OperatorIOMetricGroup.java:42)
at 
org.apache.flink.runtime.metrics.groups.OperatorMetricGroup.<init>(OperatorMetricGroup.java:45)
at 
org.apache.flink.runtime.metrics.groups.TaskMetricGroup.addOperator(TaskMetricGroup.java:133)
at 
org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:72)
at org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1299)
at org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1015)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)

##################

metrics.reporters: graphite
metrics.reporter.graphite.class: 
org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.graphite.host: something
metrics.reporter.graphite.port: 2003
metrics.reporter.graphite.protocol: TCP
metrics.reporter.graphite.interval: 1 SECONDS
metrics.scope.jm: bla.<host>.jobmanager
metrics.scope.jm.job: bla.<host>.jobmanager.<job_name>
metrics.scope.tm: bla.<host>.taskmanager.<tm_id>
metrics.scope.tm.job: bla.<host>.taskmanager.<tm_id>.<job_name>
metrics.scope.task: 
bla.<host>.taskmanager.<tm_id>.<job_name>.task.<task_name>.<subtask_index>
metrics.scope.operator: 
bla.<host>.taskmanager.<tm_id>.<job_name>.operator.<operator_name>.<subtask_index>


##################


bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.operator.DataSource
 (at Read(CompressedSource) 
(org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numSplitsProcessed
bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.operator.DataSource
 (at Read(CompressedSource) 
(org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsOutPerSecond
bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.operator.DataSource
 (at Read(CompressedSource) 
(org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsOut
bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.operator.DataSource
 (at Read(CompressedSource) 
(org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsInPerSecond
bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.operator.DataSource
 (at Read(CompressedSource) 
(org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsIn
bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.task.DataSource
 (at Read(CompressedSource) 
(org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsOutPerSecond
bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.task.DataSource
 (at Read(CompressedSource) 
(org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsOut
bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.task.DataSource
 (at Read(CompressedSource) 
(org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsInPerSecond
bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.task.DataSource
 (at Read(CompressedSource) 
(org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsIn
bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.task.DataSource
 (at Read(CompressedSource) 
(org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numBytesOutPerSecond

Reply via email to