Which metrics specifically do you mean? Beam metrics (e.g. backlogBytes)
or Flink metrics (e.g. numRecordsOut)?
Quick look at the code (ReaderInvocationUtil) reveals that the Beam
metrics should be reported correctly. The Flink native metrics are
always reported, independently of the type of operator.
Have you seen the metrics being reported without the custom metric reporter?
-Max
On 09.09.20 06:47, Binh Nguyen Van wrote:
Hi,
I have a streaming pipeline that reads messages from Kafka
(KafkaIO.read), transforms, and then writes to Kafka (KafkaIO.write). It
is using Apache Beam 2.23.0 and Flink 1.10.1.
The pipeline is working as expected and I am trying to collect some
metrics through Graphite so I did the following steps:
* Copy `opt/flink-metrics-graphite.jar` to `lib`
* Submit the application to run on YARN with the following command
```
bin/flink run \
-m yarn-cluster \
-e yarn-per-job \
-p 5 \
-yd \
-Dtaskmanager.memory.process.size=4G \
-Dsecurity.kerberos.login.keytab=<keytab> \
-Dsecurity.kerberos.login.principal=<principal> \
-Dsecurity.kerberos.login.contexts=Client,HDFS \
-Dstate.backend=rocksdb \
-Dstate.checkpoints.dir=<checkpoint-dir> \
<path-to-jar> \
--runner=FlinkRunner \
--appName=<app-name> \
--jobName=<app-name> \
--metricsGraphiteHost=<graphite-host> \
--metricsPushPeriod=30 \
--metricsGraphitePort=2003 \
--checkpointingInterval=60000 \
--externalizedCheckpointsEnabled=true \
--retainExternalizedCheckpointsOnCancellation=true
--autoWatermarkInterval=5000 \
--savepointPath=<savepoint-path>
<extra-params>
```
I also put the following config in `flink-conf.yaml` to prefix metrics
with `flink`
```
metrics.reporter.grph.class:
org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.grph.host: graphite
metrics.reporter.grph.interval: 30 SECONDS
metrics.reporter.grph.port: 2003
metrics.reporter.grph.protocol: TCP
metrics.scope.jm <http://metrics.scope.jm>: flink.<host>.jobmanager
metrics.scope.jm.job: flink.<host>.jobmanager.<job_name>
metrics.scope.operator:
flink.<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>
metrics.scope.task:
flink.<host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>
metrics.scope.tm <http://metrics.scope.tm>: flink.<host>.taskmanager.<tm_id>
metrics.scope.tm.job: flink.<host>.taskmanager.<tm_id>.<job_name>
```
When I run the pipeline I can see data of some metrics in Graphite of
other operators but not the source operator, I can see the metrics in
Graphite but there is no data for those metrics.
Screen Shot 2020-09-08 at 9.03.45 PM.png
Am I doing anything wrong or do I miss any configuration?
Thanks
-Binh