FYI, I created a Jira issue for this: 
https://issues.apache.org/jira/browse/BEAM-3043

> On 9. Oct 2017, at 18:00, Reinier Kip <[email protected]> wrote:
> 
> Hey Aljoscha,
> 
> Thanks for replying! Yes, I have three TextIO sources and two HBaseIO sources 
> :) I believe duplicate naming also occurs for the intermediate transforms, 
> though. The node names that are assigned to the "user-level" transforms, I 
> believe, never make it to Beam's primitive transforms (I think they are 
> called that?) and consequently never make it to the task and operator names 
> in Flink.
> 
> Since I posted this problem I have replaced Flink metrics with a single, 
> manual reporting step after the pipeline runs, so I can't verify this hunch 
> currently if you need some more information.
> 
> Reinier
> From: Aljoscha Krettek <[email protected]>
> Sent: 09 October 2017 16:56:26
> To: [email protected]
> Subject: Re: Duplicate metric names when using Flink runner + Graphite 
> reporter
>  
> Hi Reinier,
> 
> Do you have several sources in your pipeline? I think it's a problem of the 
> Beam Flink runner that does not assign unique names which could be used to 
> deduplicate the operators names that are used in the metrics name.
> 
> Best,
> Aljoscha
> 
>> On 29. Sep 2017, at 20:17, Reinier Kip <[email protected] <mailto:[email protected]>> 
>> wrote:
>> 
>> Hi all,
>> 
>> I'm running a Beam pipeline on Flink and sending metrics via the Graphite 
>> reporter. I get repeated exceptions on the slaves, which try to register the 
>> same metric multiple times. These duplicates all concern task and operator 
>> metrics. I have given all pipeline nodes unique names.
>> 
>> I am using Beam 2.1.0, and am thus running Flink 1.3.0.
>> 
>> Below you'll find the error+stacktrace, Flink's metrics configuration, and 
>> several examples of duplicate metric names concerning both tasks and 
>> operators.
>> 
>> Is this a Beam problem? Should Beam give Flink tasks and operators unique 
>> names so they'll have unique metric names? Is this a Flink problem? Should 
>> Flink or Flink's Graphite reporter support duplicate metric names?
>> 
>> Reinier
>> 
>> ##################
>> 
>> Log message: [ERROR] Error while registering metric.
>> Stack trace:
>> 
>> java.lang.IllegalArgumentException: A metric named 
>> bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.operator.DataSource
>>  (at Read(CompressedSource) 
>> (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numSplitsProcessed
>>  already exists
>> at com.codahale.metrics.MetricRegistry.register(MetricRegistry.java:91)
>> at 
>> org.apache.flink.dropwizard.ScheduledDropwizardReporter.notifyOfAddedMetric(ScheduledDropwizardReporter.java:151)
>> at 
>> org.apache.flink.runtime.metrics.MetricRegistry.register(MetricRegistry.java:294)
>> at 
>> org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:370)
>> at 
>> org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.meter(AbstractMetricGroup.java:336)
>> at 
>> org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup.<init>(OperatorIOMetricGroup.java:42)
>> at 
>> org.apache.flink.runtime.metrics.groups.OperatorMetricGroup.<init>(OperatorMetricGroup.java:45)
>> at 
>> org.apache.flink.runtime.metrics.groups.TaskMetricGroup.addOperator(TaskMetricGroup.java:133)
>> at 
>> org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:72)
>> at 
>> org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1299)
>> at 
>> org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1015)
>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:256)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>> at java.lang.Thread.run(Thread.java:748)
>> 
>> ##################
>> 
>> metrics.reporters: graphite
>> metrics.reporter.graphite.class: 
>> org.apache.flink.metrics.graphite.GraphiteReporter
>> metrics.reporter.graphite.host: something
>> metrics.reporter.graphite.port: 2003
>> metrics.reporter.graphite.protocol: TCP
>> metrics.reporter.graphite.interval: 1 SECONDS
>> metrics.scope.jm: bla.<host>.jobmanager
>> metrics.scope.jm.job: bla.<host>.jobmanager.<job_name>
>> metrics.scope.tm: bla.<host>.taskmanager.<tm_id>
>> metrics.scope.tm.job: bla.<host>.taskmanager.<tm_id>.<job_name>
>> metrics.scope.task: 
>> bla.<host>.taskmanager.<tm_id>.<job_name>.task.<task_name>.<subtask_index>
>> metrics.scope.operator: 
>> bla.<host>.taskmanager.<tm_id>.<job_name>.operator.<operator_name>.<subtask_index>
>> 
>> ##################
>> 
>> bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.operator.DataSource
>>  (at Read(CompressedSource) 
>> (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numSplitsProcessed
>> bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.operator.DataSource
>>  (at Read(CompressedSource) 
>> (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsOutPerSecond
>> bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.operator.DataSource
>>  (at Read(CompressedSource) 
>> (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsOut
>> bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.operator.DataSource
>>  (at Read(CompressedSource) 
>> (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsInPerSecond
>> bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.operator.DataSource
>>  (at Read(CompressedSource) 
>> (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsIn
>> bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.task.DataSource
>>  (at Read(CompressedSource) 
>> (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsOutPerSecond
>> bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.task.DataSource
>>  (at Read(CompressedSource) 
>> (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsOut
>> bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.task.DataSource
>>  (at Read(CompressedSource) 
>> (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsInPerSecond
>> bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.task.DataSource
>>  (at Read(CompressedSource) 
>> (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numRecordsIn
>> bla.hdp-slave-019.taskmanager.3067db835689cdd8b6087dae79ec088f.bla-0929174053-aa405356.task.DataSource
>>  (at Read(CompressedSource) 
>> (org-apache-beam-runners-flink-translation-wrappers-SourceInputFormat)).5.numBytesOutPerSecond

Reply via email to