This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new ac0f4f0 MetricFilter matching by Metric objects. new 407b014 Merge pull request #12123 from rainwoodman/query-by-metric: MetricFilter matching by Metric objects. ac0f4f0 is described below commit ac0f4f05d6a0f2975c564c3723b38dac4eb89c89 Author: Yu Feng <f...@google.com> AuthorDate: Mon Jun 29 13:59:10 2020 -0700 MetricFilter matching by Metric objects. This change would encourage reusing predefined metric objects in metric queries. Querying with the metric object may result more maintainable code than duplicating the name and namespace strings. Or storeing the names elsewhere as CONSTANT_NAME. --- sdks/python/apache_beam/metrics/metric.py | 5 +++++ sdks/python/apache_beam/metrics/metric_test.py | 21 ++++++++++++++++++--- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/metrics/metric.py b/sdks/python/apache_beam/metrics/metric.py index bb8215e..4b5a2a5 100644 --- a/sdks/python/apache_beam/metrics/metric.py +++ b/sdks/python/apache_beam/metrics/metric.py @@ -220,6 +220,11 @@ class MetricsFilter(object): def namespaces(self): return frozenset(self._namespaces) + def with_metric(self, metric): + return ( + self.with_name(metric.metric_name.name).with_namespace( + metric.metric_name.namespace)) + def with_name(self, name): return self.with_names([name]) diff --git a/sdks/python/apache_beam/metrics/metric_test.py b/sdks/python/apache_beam/metrics/metric_test.py index 7185385..09f1902 100644 --- a/sdks/python/apache_beam/metrics/metric_test.py +++ b/sdks/python/apache_beam/metrics/metric_test.py @@ -141,11 +141,15 @@ class MetricsTest(unittest.TestCase): def test_user_counter_using_pardo(self): class SomeDoFn(beam.DoFn): """A custom dummy DoFn using yield.""" + static_counter_elements = metrics.Metrics.counter( + "SomeDoFn", 'metrics_static_counter_element') + def __init__(self): self.user_counter_elements = metrics.Metrics.counter( self.__class__, 'metrics_user_counter_element') def process(self, element): + self.static_counter_elements.inc(2) self.user_counter_elements.inc() distro = Metrics.distribution(self.__class__, 'element_dist') distro.update(element) @@ -159,15 +163,26 @@ class MetricsTest(unittest.TestCase): res = pipeline.run() res.wait_until_finish() + # Verify static counter. + metric_results = ( + res.metrics().query( + MetricsFilter().with_metric(SomeDoFn.static_counter_elements))) + outputs_static_counter = metric_results['counters'][0] + + self.assertEqual( + outputs_static_counter.key.metric.name, + 'metrics_static_counter_element') + self.assertEqual(outputs_static_counter.committed, 8) + # Verify user counter. metric_results = ( res.metrics().query( MetricsFilter().with_name('metrics_user_counter_element'))) - outputs_counter = metric_results['counters'][0] + outputs_user_counter = metric_results['counters'][0] self.assertEqual( - outputs_counter.key.metric.name, 'metrics_user_counter_element') - self.assertEqual(outputs_counter.committed, 4) + outputs_user_counter.key.metric.name, 'metrics_user_counter_element') + self.assertEqual(outputs_user_counter.committed, 4) # Verify user distribution counter. metric_results = res.metrics().query()