Hey Aljoscha,

Thanks for replying! Yes, I have three TextIO sources and two HBaseIO sources 
:) I believe duplicate naming also occurs for the intermediate transforms, 
though. The node names that are assigned to the "user-level" transforms, I 
believe, never make it to Beam's primitive transforms (I think they are called 
that?) and consequently never make it to the task and operator names in Flink.


Since I posted this problem I have replaced Flink metrics with a single, manual 
reporting step after the pipeline runs, so I can't verify this hunch currently 
if you need some more information.


Reinier

________________________________
From: Aljoscha Krettek <[email protected]>
Sent: 09 October 2017 16:56:26
To: [email protected]
Subject: Re: Duplicate metric names when using Flink runner + Graphite reporter

Hi Reinier,

Do you have several sources in your pipeline? I think it's a problem of the 
Beam Flink runner that does not assign unique names which could be used to 
deduplicate the operators names that are used in the metrics name.

Best,
Aljoscha

On 29. Sep 2017, at 20:17, Reinier Kip <[email protected]<mailto:[email protected]>> 
wrote:


Hi all,


I'm running a Beam pipeline on Flink and sending metrics via the Graphite 
reporter. I get repeated exceptions on the slaves, which try to register the 
same metric multiple times. These duplicates all concern task and operator 
metrics. I have given all pipeline nodes unique names.

I am using Beam 2.1.0, and am thus running Flink 1.3.0.

Below you'll find the error+stacktrace, Flink's metrics configuration, and 
several examples of duplicate metric names concerning both tasks and operators.

Is this a Beam problem? Should Beam give Flink tasks and operators unique names 
so they'll have unique metric names? Is this a Flink problem? Should Flink or 
Flink's Graphite reporter support duplicate metric names?

Reinier

##################

Log message: [ERROR] Error while registering metric.
Stack trace:

java.lang.IllegalArgumentException: A metric named 
bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.operator.DataSource
 (at Read(CompressedSource) 
(org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numSplitsProcessed
 already exists
at com.codahale.metrics.MetricRegistry.register(MetricRegistry.java:91)
at 
org.apache.flink.dropwizard.ScheduledDropwizardReporter.notifyOfAddedMetric(ScheduledDropwizardReporter.java:151)
at 
org.apache.flink.runtime.metrics.MetricRegistry.register(MetricRegistry.java:294)
at 
org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:370)
at 
org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.meter(AbstractMetricGroup.java:336)
at 
org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup.<init>(OperatorIOMetricGroup.java:42)
at 
org.apache.flink.runtime.metrics.groups.OperatorMetricGroup.<init>(OperatorMetricGroup.java:45)
at 
org.apache.flink.runtime.metrics.groups.TaskMetricGroup.addOperator(TaskMetricGroup.java:133)
at 
org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:72)
at org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1299)
at org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1015)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)

##################

metrics.reporters: graphite
metrics.reporter.graphite.class: 
org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.graphite.host: something
metrics.reporter.graphite.port: 2003
metrics.reporter.graphite.protocol: TCP
metrics.reporter.graphite.interval: 1 SECONDS
metrics.scope.jm: bla.<host>.jobmanager
metrics.scope.jm.job: bla.<host>.jobmanager.<job_name>
metrics.scope.tm: bla.<host>.taskmanager.<tm_id>
metrics.scope.tm.job: bla.<host>.taskmanager.<tm_id>.<job_name>
metrics.scope.task: 
bla.<host>.taskmanager.<tm_id>.<job_name>.task.<task_name>.<subtask_index>
metrics.scope.operator: 
bla.<host>.taskmanager.<tm_id>.<job_name>.operator.<operator_name>.<subtask_index>


##################


bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.operator.DataSource
 (at Read(CompressedSource) 
(org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numSplitsProcessed
bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.operator.DataSource
 (at Read(CompressedSource) 
(org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsOutPerSecond
bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.operator.DataSource
 (at Read(CompressedSource) 
(org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsOut
bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.operator.DataSource
 (at Read(CompressedSource) 
(org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsInPerSecond
bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.operator.DataSource
 (at Read(CompressedSource) 
(org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsIn
bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.task.DataSource
 (at Read(CompressedSource) 
(org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsOutPerSecond
bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.task.DataSource
 (at Read(CompressedSource) 
(org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsOut
bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.task.DataSource
 (at Read(CompressedSource) 
(org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsInPerSecond
bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.task.DataSource
 (at Read(CompressedSource) 
(org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsIn
bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.task.DataSource
 (at Read(CompressedSource) 
(org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numBytesOutPerSecond

Reply via email to