I'm using Flink 1.4.2 and deploying job on Yarn Cluster. I have a streaming job, which flattens the data and outputs it. It basically takes a input record and produces n output record. I'm using Table Function for this. The logic to flatten the data is implemented in a UDF. The UDF has a counter which basically counts the number of records produced. this.context.getMetricGroup().counter("output_records_counter")
I know Flink provides numRecordsOut metric which is essentially gives me the same number. When the job is started the output records count seen for `output_records_counter` counter and `numRecordsOut` are exactly same. When a task manager is lost and the job is restarted there's a huge difference in the count of output records . As seen in the graph when the job was started the both the counts are overlapping. When a task manager is lost and is re-deployed the count is different. I'm not sure why this number varies so much. Can someone please shed some light on how is this counter implemented or direct me to source code or any reference material. For numRecordsOut, each taskmanager emits the count of data. Is the same not true for the output_records_counter. <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1468/Screen_Shot_2018-11-24_at_9.png> -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/