Well, I think for streaming jobs (and even for long running batch jobs) you need to query metrics from time to time from PipelineResult in separate thread or use MetricsPusher [1, that does this job and pushes metrics into supported sink.
[1] https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsPusher.java <https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsPusher.java> > On 7 Jul 2020, at 12:14, Truebody, Kyle <[email protected]> wrote: > > Thanks for the response Alexey, > > I have it working now. Looks like the order of how I implemented it is > causing early call to Metrics Container. This is working for me now : > > PipelineResult result = p.run(); > result.waitUntilFinish(); > > for(MetricResult r: > result.metrics().queryMetrics(MetricsFilter.builder().build()).getCounters()) > { > LOGGER.info > <https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Flogger.info%2F&data=02%7C01%7CTruebodyK%40dnb.com%7Cacd9d73a96464d750a2108d81f6c7b4a%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637293898066276886&sdata=zbuPd62lK16pBQUiTWF47NPVBbIryx1gnQZmyBz9ZVE%3D&reserved=0>("n=" > + r.getName() + " v=" + r.getAttempted().toString()); > } > for(MetricResult r: > result.metrics().queryMetrics(MetricsFilter.builder().build()).getDistributions()) > { > LOGGER.info > <https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Flogger.info%2F&data=02%7C01%7CTruebodyK%40dnb.com%7Cacd9d73a96464d750a2108d81f6c7b4a%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637293898066276886&sdata=zbuPd62lK16pBQUiTWF47NPVBbIryx1gnQZmyBz9ZVE%3D&reserved=0>("n=" > + r.getName() + " v=" + r.getAttempted().toString()); > } > > Just to call out, I guess this would only work as batch mode running ( which > is currently what our target mode is for now ). > The implementation in the previous mail did not produce counters in streaming > mode. > > > > From: Alexey Romanenko <[email protected] > <mailto:[email protected]>> > Sent: Friday, July 3, 2020 5:17 PM > To: [email protected] <mailto:[email protected]> > Subject: Re: Querying Metrics when using Spark Runner > > CAUTION: This email originated from outside of D&B. Please do not click links > or open attachments unless you recognize the sender and know the content is > safe. > > Hmmm, the only thing that comes to my mind is that we started to use Spark > AccumulatorsV2 API [1] but it was released just right in Beam 2.13.0 > There is a related issue with this upgrade [2] but seems it’s not related. > > [1] https://issues.apache.org/jira/browse/BEAM-4552 > <https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-4552&data=02%7C01%7CTruebodyK%40dnb.com%7Cacd9d73a96464d750a2108d81f6c7b4a%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637293898066266890&sdata=T6JGD8Y9OBfc063UBt4Izj7PsXUyuSpep80M1NWV0cs%3D&reserved=0> > [2] https://issues.apache.org/jira/browse/BEAM-10294 > <https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-10294&data=02%7C01%7CTruebodyK%40dnb.com%7Cacd9d73a96464d750a2108d81f6c7b4a%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637293898066266890&sdata=ekCLW6PmvlxeOhc4uFMLgAzWqQv4bfNfHmCaRhaHiQQ%3D&reserved=0> > > > On 2 Jul 2020, at 22:07, Truebody, Kyle <[email protected] > <mailto:[email protected]>> wrote: > > Hi, > > We have recently upgraded to the latest version of Apache Beam 2.22.0. We > were previously using version 2.13.0 . > > We are using the SparkRunner. > > > I noticed that after the upgrade that the Metrics query has stop producing > values. Through debugging I can see that the metrics and distribution are > still being incremented as expected. Has the metrics interface changed at > all? This is currently how we querying the metrics. > > > PipelineResult result = p.run(); > for(MetricResult r: > result.metrics().queryMetrics(MetricsFilter.builder().build()).getCounters()) > { > LOGGER.info > <https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Flogger.info%2F&data=02%7C01%7CTruebodyK%40dnb.com%7Cacd9d73a96464d750a2108d81f6c7b4a%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637293898066276886&sdata=zbuPd62lK16pBQUiTWF47NPVBbIryx1gnQZmyBz9ZVE%3D&reserved=0>("n=" > + r.getName() + " v=" + r.getAttempted().toString()); > } > for(MetricResult r: > result.metrics().queryMetrics(MetricsFilter.builder().build()).getDistributions()) > { > LOGGER.info > <https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Flogger.info%2F&data=02%7C01%7CTruebodyK%40dnb.com%7Cacd9d73a96464d750a2108d81f6c7b4a%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637293898066276886&sdata=zbuPd62lK16pBQUiTWF47NPVBbIryx1gnQZmyBz9ZVE%3D&reserved=0>("n=" > + r.getName() + " v=" + r.getAttempted().toString()); > } > result.waitUntilFinish(); > > > > Thanks, > Kyle
