Well, I think for streaming jobs (and even for long running batch jobs) you 
need to query metrics from time to time from PipelineResult in separate thread 
or use MetricsPusher [1, that does this job and pushes metrics into supported 
sink.

[1] 
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsPusher.java
 
<https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsPusher.java>

> On 7 Jul 2020, at 12:14, Truebody, Kyle <[email protected]> wrote:
> 
> Thanks for the response Alexey,
>  
> I have it working now. Looks like the order of how I implemented it is 
> causing early call to Metrics Container. This is working for me now :
> 
> PipelineResult result = p.run();
> result.waitUntilFinish();
>  
> for(MetricResult r: 
> result.metrics().queryMetrics(MetricsFilter.builder().build()).getCounters()) 
> {
>                      LOGGER.info 
> <https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Flogger.info%2F&data=02%7C01%7CTruebodyK%40dnb.com%7Cacd9d73a96464d750a2108d81f6c7b4a%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637293898066276886&sdata=zbuPd62lK16pBQUiTWF47NPVBbIryx1gnQZmyBz9ZVE%3D&reserved=0>("n="
>  + r.getName() + " v=" + r.getAttempted().toString());
> }
> for(MetricResult r: 
> result.metrics().queryMetrics(MetricsFilter.builder().build()).getDistributions())
>  {
>      LOGGER.info 
> <https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Flogger.info%2F&data=02%7C01%7CTruebodyK%40dnb.com%7Cacd9d73a96464d750a2108d81f6c7b4a%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637293898066276886&sdata=zbuPd62lK16pBQUiTWF47NPVBbIryx1gnQZmyBz9ZVE%3D&reserved=0>("n="
>  + r.getName() + " v=" + r.getAttempted().toString());
>              }
>  
> Just to call out, I guess this would only work as batch mode running ( which 
> is currently what our target mode is for now ).
> The implementation in the previous mail did not produce counters in streaming 
> mode.
> 
> 
>  
> From: Alexey Romanenko <[email protected] 
> <mailto:[email protected]>> 
> Sent: Friday, July 3, 2020 5:17 PM
> To: [email protected] <mailto:[email protected]>
> Subject: Re: Querying Metrics when using Spark Runner
>  
> CAUTION: This email originated from outside of D&B. Please do not click links 
> or open attachments unless you recognize the sender and know the content is 
> safe.
>  
> Hmmm, the only thing that comes to my mind is that we started to use Spark 
> AccumulatorsV2 API [1] but it was released just right in Beam 2.13.0
> There is a related issue with this upgrade [2] but seems it’s not related.
>  
> [1] https://issues.apache.org/jira/browse/BEAM-4552 
> <https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-4552&data=02%7C01%7CTruebodyK%40dnb.com%7Cacd9d73a96464d750a2108d81f6c7b4a%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637293898066266890&sdata=T6JGD8Y9OBfc063UBt4Izj7PsXUyuSpep80M1NWV0cs%3D&reserved=0>
> [2] https://issues.apache.org/jira/browse/BEAM-10294 
> <https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-10294&data=02%7C01%7CTruebodyK%40dnb.com%7Cacd9d73a96464d750a2108d81f6c7b4a%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637293898066266890&sdata=ekCLW6PmvlxeOhc4uFMLgAzWqQv4bfNfHmCaRhaHiQQ%3D&reserved=0>
> 
> 
> On 2 Jul 2020, at 22:07, Truebody, Kyle <[email protected] 
> <mailto:[email protected]>> wrote:
>  
> Hi, 
>  
> We have recently upgraded to the latest version of Apache Beam 2.22.0. We 
> were previously using version 2.13.0 .
>  
> We are using the SparkRunner.
> 
> 
> I noticed that after the upgrade that the Metrics query has stop producing 
> values. Through debugging I can see that the metrics and distribution  are 
> still being incremented as expected. Has the metrics interface changed at 
> all? This is currently how we querying the metrics.
> 
> 
>     PipelineResult result = p.run();
>     for(MetricResult r: 
> result.metrics().queryMetrics(MetricsFilter.builder().build()).getCounters()) 
> {
>                LOGGER.info 
> <https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Flogger.info%2F&data=02%7C01%7CTruebodyK%40dnb.com%7Cacd9d73a96464d750a2108d81f6c7b4a%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637293898066276886&sdata=zbuPd62lK16pBQUiTWF47NPVBbIryx1gnQZmyBz9ZVE%3D&reserved=0>("n="
>  + r.getName() + " v=" + r.getAttempted().toString());
>     }
>     for(MetricResult r: 
> result.metrics().queryMetrics(MetricsFilter.builder().build()).getDistributions())
>  {
>                LOGGER.info 
> <https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Flogger.info%2F&data=02%7C01%7CTruebodyK%40dnb.com%7Cacd9d73a96464d750a2108d81f6c7b4a%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637293898066276886&sdata=zbuPd62lK16pBQUiTWF47NPVBbIryx1gnQZmyBz9ZVE%3D&reserved=0>("n="
>  + r.getName() + " v=" + r.getAttempted().toString());
>     }
>     result.waitUntilFinish();
> 
> 
>  
> Thanks,
> Kyle

Reply via email to