Thanks for the response Alexey,

I have it working now. Looks like the order of how I implemented it is causing 
early call to Metrics Container. This is working for me now :

PipelineResult result = p.run();
result.waitUntilFinish();

for(MetricResult r: 
result.metrics().queryMetrics(MetricsFilter.builder().build()).getCounters()) {
                     
LOGGER.info<https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Flogger.info%2F&data=02%7C01%7CTruebodyK%40dnb.com%7Cacd9d73a96464d750a2108d81f6c7b4a%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637293898066276886&sdata=zbuPd62lK16pBQUiTWF47NPVBbIryx1gnQZmyBz9ZVE%3D&reserved=0>("n="
 + r.getName() + " v=" + r.getAttempted().toString());
}
for(MetricResult r: 
result.metrics().queryMetrics(MetricsFilter.builder().build()).getDistributions())
 {
     
LOGGER.info<https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Flogger.info%2F&data=02%7C01%7CTruebodyK%40dnb.com%7Cacd9d73a96464d750a2108d81f6c7b4a%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637293898066276886&sdata=zbuPd62lK16pBQUiTWF47NPVBbIryx1gnQZmyBz9ZVE%3D&reserved=0>("n="
 + r.getName() + " v=" + r.getAttempted().toString());
             }

Just to call out, I guess this would only work as batch mode running ( which is 
currently what our target mode is for now ).
The implementation in the previous mail did not produce counters in streaming 
mode.



From: Alexey Romanenko <[email protected]>
Sent: Friday, July 3, 2020 5:17 PM
To: [email protected]
Subject: Re: Querying Metrics when using Spark Runner

CAUTION: This email originated from outside of D&B. Please do not click links 
or open attachments unless you recognize the sender and know the content is 
safe.

Hmmm, the only thing that comes to my mind is that we started to use Spark 
AccumulatorsV2 API [1] but it was released just right in Beam 2.13.0
There is a related issue with this upgrade [2] but seems it’s not related.

[1] 
https://issues.apache.org/jira/browse/BEAM-4552<https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-4552&data=02%7C01%7CTruebodyK%40dnb.com%7Cacd9d73a96464d750a2108d81f6c7b4a%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637293898066266890&sdata=T6JGD8Y9OBfc063UBt4Izj7PsXUyuSpep80M1NWV0cs%3D&reserved=0>
[2] 
https://issues.apache.org/jira/browse/BEAM-10294<https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-10294&data=02%7C01%7CTruebodyK%40dnb.com%7Cacd9d73a96464d750a2108d81f6c7b4a%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637293898066266890&sdata=ekCLW6PmvlxeOhc4uFMLgAzWqQv4bfNfHmCaRhaHiQQ%3D&reserved=0>


On 2 Jul 2020, at 22:07, Truebody, Kyle 
<[email protected]<mailto:[email protected]>> wrote:

Hi,

We have recently upgraded to the latest version of Apache Beam 2.22.0. We were 
previously using version 2.13.0 .

We are using the SparkRunner.


I noticed that after the upgrade that the Metrics query has stop producing 
values. Through debugging I can see that the metrics and distribution  are 
still being incremented as expected. Has the metrics interface changed at all? 
This is currently how we querying the metrics.


    PipelineResult result = p.run();
    for(MetricResult r: 
result.metrics().queryMetrics(MetricsFilter.builder().build()).getCounters()) {
               
LOGGER.info<https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Flogger.info%2F&data=02%7C01%7CTruebodyK%40dnb.com%7Cacd9d73a96464d750a2108d81f6c7b4a%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637293898066276886&sdata=zbuPd62lK16pBQUiTWF47NPVBbIryx1gnQZmyBz9ZVE%3D&reserved=0>("n="
 + r.getName() + " v=" + r.getAttempted().toString());
    }
    for(MetricResult r: 
result.metrics().queryMetrics(MetricsFilter.builder().build()).getDistributions())
 {
               
LOGGER.info<https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Flogger.info%2F&data=02%7C01%7CTruebodyK%40dnb.com%7Cacd9d73a96464d750a2108d81f6c7b4a%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637293898066276886&sdata=zbuPd62lK16pBQUiTWF47NPVBbIryx1gnQZmyBz9ZVE%3D&reserved=0>("n="
 + r.getName() + " v=" + r.getAttempted().toString());
    }
    result.waitUntilFinish();



Thanks,
Kyle

Reply via email to