FYI, I created a Jira issue for this: https://issues.apache.org/jira/browse/BEAM-3043
> On 9. Oct 2017, at 18:00, Reinier Kip <[email protected]> wrote: > > Hey Aljoscha, > > Thanks for replying! Yes, I have three TextIO sources and two HBaseIO sources > :) I believe duplicate naming also occurs for the intermediate transforms, > though. The node names that are assigned to the "user-level" transforms, I > believe, never make it to Beam's primitive transforms (I think they are > called that?) and consequently never make it to the task and operator names > in Flink. > > Since I posted this problem I have replaced Flink metrics with a single, > manual reporting step after the pipeline runs, so I can't verify this hunch > currently if you need some more information. > > Reinier > From: Aljoscha Krettek <[email protected]> > Sent: 09 October 2017 16:56:26 > To: [email protected] > Subject: Re: Duplicate metric names when using Flink runner + Graphite > reporter > > Hi Reinier, > > Do you have several sources in your pipeline? I think it's a problem of the > Beam Flink runner that does not assign unique names which could be used to > deduplicate the operators names that are used in the metrics name. > > Best, > Aljoscha > >> On 29. Sep 2017, at 20:17, Reinier Kip <[email protected] <mailto:[email protected]>> >> wrote: >> >> Hi all, >> >> I'm running a Beam pipeline on Flink and sending metrics via the Graphite >> reporter. I get repeated exceptions on the slaves, which try to register the >> same metric multiple times. These duplicates all concern task and operator >> metrics. I have given all pipeline nodes unique names. >> >> I am using Beam 2.1.0, and am thus running Flink 1.3.0. >> >> Below you'll find the error+stacktrace, Flink's metrics configuration, and >> several examples of duplicate metric names concerning both tasks and >> operators. >> >> Is this a Beam problem? Should Beam give Flink tasks and operators unique >> names so they'll have unique metric names? Is this a Flink problem? Should >> Flink or Flink's Graphite reporter support duplicate metric names? >> >> Reinier >> >> ################## >> >> Log message: [ERROR] Error while registering metric. >> Stack trace: >> >> java.lang.IllegalArgumentException: A metric named >> bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.operator.DataSource >> (at Read(CompressedSource) >> (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numSplitsProcessed >> already exists >> at com.codahale.metrics.MetricRegistry.register(MetricRegistry.java:91) >> at >> org.apache.flink.dropwizard.ScheduledDropwizardReporter.notifyOfAddedMetric(ScheduledDropwizardReporter.java:151) >> at >> org.apache.flink.runtime.metrics.MetricRegistry.register(MetricRegistry.java:294) >> at >> org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:370) >> at >> org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.meter(AbstractMetricGroup.java:336) >> at >> org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup.<init>(OperatorIOMetricGroup.java:42) >> at >> org.apache.flink.runtime.metrics.groups.OperatorMetricGroup.<init>(OperatorMetricGroup.java:45) >> at >> org.apache.flink.runtime.metrics.groups.TaskMetricGroup.addOperator(TaskMetricGroup.java:133) >> at >> org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:72) >> at >> org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1299) >> at >> org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1015) >> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:256) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) >> at java.lang.Thread.run(Thread.java:748) >> >> ################## >> >> metrics.reporters: graphite >> metrics.reporter.graphite.class: >> org.apache.flink.metrics.graphite.GraphiteReporter >> metrics.reporter.graphite.host: something >> metrics.reporter.graphite.port: 2003 >> metrics.reporter.graphite.protocol: TCP >> metrics.reporter.graphite.interval: 1 SECONDS >> metrics.scope.jm: bla.<host>.jobmanager >> metrics.scope.jm.job: bla.<host>.jobmanager.<job_name> >> metrics.scope.tm: bla.<host>.taskmanager.<tm_id> >> metrics.scope.tm.job: bla.<host>.taskmanager.<tm_id>.<job_name> >> metrics.scope.task: >> bla.<host>.taskmanager.<tm_id>.<job_name>.task.<task_name>.<subtask_index> >> metrics.scope.operator: >> bla.<host>.taskmanager.<tm_id>.<job_name>.operator.<operator_name>.<subtask_index> >> >> ################## >> >> bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.operator.DataSource >> (at Read(CompressedSource) >> (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numSplitsProcessed >> bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.operator.DataSource >> (at Read(CompressedSource) >> (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsOutPerSecond >> bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.operator.DataSource >> (at Read(CompressedSource) >> (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsOut >> bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.operator.DataSource >> (at Read(CompressedSource) >> (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsInPerSecond >> bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.operator.DataSource >> (at Read(CompressedSource) >> (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsIn >> bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.task.DataSource >> (at Read(CompressedSource) >> (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsOutPerSecond >> bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.task.DataSource >> (at Read(CompressedSource) >> (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsOut >> bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.task.DataSource >> (at Read(CompressedSource) >> (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsInPerSecond >> bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.task.DataSource >> (at Read(CompressedSource) >> (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsIn >> bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.task.DataSource >> (at Read(CompressedSource) >> (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numBytesOutPerSecond
