Repository: beam Updated Branches: refs/heads/master 04f5bc6f8 -> 2e1731143
Preparing support for Structured Names in Dataflow counters Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6aadf24f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6aadf24f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6aadf24f Branch: refs/heads/master Commit: 6aadf24f595acee5c0fe4de8b224c31fa1977a33 Parents: 04f5bc6 Author: Pablo <pabl...@google.com> Authored: Thu Aug 3 17:46:09 2017 -0700 Committer: Ahmet Altay <al...@google.com> Committed: Wed Aug 9 16:04:56 2017 -0700 ---------------------------------------------------------------------- .../runners/dataflow/internal/apiclient.py | 51 +++++++---- sdks/python/apache_beam/utils/counters.py | 92 +++++++++----------- 2 files changed, 77 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/6aadf24f/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index dcaf74e..a1f9301 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -710,10 +710,6 @@ def translate_value(value, metric_update_proto): metric_update_proto.integer = to_split_int(value) -def translate_scalar(accumulator, metric_update): - metric_update.scalar = to_json_value(accumulator.value, with_type=True) - - def translate_mean(accumulator, metric_update): if accumulator.count: metric_update.meanSum = to_json_value(accumulator.sum, with_type=True) @@ -733,20 +729,43 @@ def _use_fnapi(pipeline_options): # To enable a counter on the service, add it to this dictionary. -metric_translations = { - cy_combiners.CountCombineFn: ('sum', translate_scalar), - cy_combiners.SumInt64Fn: ('sum', translate_scalar), - cy_combiners.MinInt64Fn: ('min', translate_scalar), - cy_combiners.MaxInt64Fn: ('max', translate_scalar), - cy_combiners.MeanInt64Fn: ('mean', translate_mean), - cy_combiners.SumFloatFn: ('sum', translate_scalar), - cy_combiners.MinFloatFn: ('min', translate_scalar), - cy_combiners.MaxFloatFn: ('max', translate_scalar), - cy_combiners.MeanFloatFn: ('mean', translate_mean), - cy_combiners.AllCombineFn: ('and', translate_scalar), - cy_combiners.AnyCombineFn: ('or', translate_scalar), +structured_counter_translations = { + cy_combiners.CountCombineFn: ( + dataflow.CounterMetadata.KindValueValuesEnum.SUM, + MetricUpdateTranslators.translate_scalar_counter_int), + cy_combiners.SumInt64Fn: ( + dataflow.CounterMetadata.KindValueValuesEnum.SUM, + MetricUpdateTranslators.translate_scalar_counter_int), + cy_combiners.MinInt64Fn: ( + dataflow.CounterMetadata.KindValueValuesEnum.MIN, + MetricUpdateTranslators.translate_scalar_counter_int), + cy_combiners.MaxInt64Fn: ( + dataflow.CounterMetadata.KindValueValuesEnum.MAX, + MetricUpdateTranslators.translate_scalar_counter_int), + cy_combiners.MeanInt64Fn: ( + dataflow.CounterMetadata.KindValueValuesEnum.MEAN, + MetricUpdateTranslators.translate_scalar_mean_int), + cy_combiners.SumFloatFn: ( + dataflow.CounterMetadata.KindValueValuesEnum.SUM, + MetricUpdateTranslators.translate_scalar_counter_float), + cy_combiners.MinFloatFn: ( + dataflow.CounterMetadata.KindValueValuesEnum.MIN, + MetricUpdateTranslators.translate_scalar_counter_float), + cy_combiners.MaxFloatFn: ( + dataflow.CounterMetadata.KindValueValuesEnum.MAX, + MetricUpdateTranslators.translate_scalar_counter_float), + cy_combiners.MeanFloatFn: ( + dataflow.CounterMetadata.KindValueValuesEnum.MEAN, + MetricUpdateTranslators.translate_scalar_mean_float), + cy_combiners.AllCombineFn: ( + dataflow.CounterMetadata.KindValueValuesEnum.AND, + MetricUpdateTranslators.translate_boolean), + cy_combiners.AnyCombineFn: ( + dataflow.CounterMetadata.KindValueValuesEnum.OR, + MetricUpdateTranslators.translate_boolean), } + counter_translations = { cy_combiners.CountCombineFn: ( dataflow.NameAndKind.KindValueValuesEnum.SUM, http://git-wip-us.apache.org/repos/asf/beam/blob/6aadf24f/sdks/python/apache_beam/utils/counters.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/counters.py b/sdks/python/apache_beam/utils/counters.py index b379461..5d029dc 100644 --- a/sdks/python/apache_beam/utils/counters.py +++ b/sdks/python/apache_beam/utils/counters.py @@ -27,6 +27,46 @@ import threading from apache_beam.transforms import cy_combiners +class CounterName(object): + """Naming information for a counter.""" + SYSTEM = object() + USER = object() + + def __init__(self, name, stage_name=None, step_name=None, + system_name=None, namespace=None, + origin=None, output_index=None): + self.name = name + self.origin = origin or CounterName.SYSTEM + self.namespace = namespace + self.stage_name = stage_name + self.step_name = step_name + self.system_name = system_name + self.output_index = output_index + + def __hash__(self): + return hash((self.name, + self.origin, + self.namespace, + self.stage_name, + self.step_name, + self.system_name, + self.output_index)) + + def __str__(self): + return '%s' % self._str_internal() + + def __repr__(self): + return '<%s at %s>' % (self._str_internal(), hex(id(self))) + + def _str_internal(self): + if self.origin == CounterName.USER: + return 'user-%s-%s' % (self.step_name, self.name) + elif self.origin == CounterName.SYSTEM and self.output_index: + return '%s-out%s-%s' % (self.step_name, self.output_index, self.name) + else: + return '%s-%s-%s' % (self.stage_name, self.step_name, self.name) + + class Counter(object): """A counter aggregates a series of values. @@ -52,8 +92,8 @@ class Counter(object): """Creates a Counter object. Args: - name: the name of this counter. Typically has three parts: - "step-output-counter". + name: the name of this counter. It may be a string, + or a CounterName object. combine_fn: the CombineFn to use for aggregation """ self.name = name @@ -90,10 +130,6 @@ class AccumulatorCombineFnCounter(Counter): self._fast_add_input(value) -# Counters that represent Accumulators have names starting with this -USER_COUNTER_PREFIX = 'user-' - - class CounterFactory(object): """Keeps track of unique counters.""" @@ -128,21 +164,6 @@ class CounterFactory(object): self.counters[name] = counter return counter - def get_aggregator_counter(self, step_name, aggregator): - """Returns an AggregationCounter for this step's aggregator. - - Passing in the same values will return the same counter. - - Args: - step_name: the name of this step. - aggregator: an Aggregator object. - Returns: - A new or existing counter. - """ - return self.get_counter( - '%s%s-%s' % (USER_COUNTER_PREFIX, step_name, aggregator.name), - aggregator.combine_fn) - def get_counters(self): """Returns the current set of counters. @@ -154,32 +175,3 @@ class CounterFactory(object): """ with self._lock: return self.counters.values() - - def get_aggregator_values(self, aggregator_or_name): - """Returns dict of step names to values of the aggregator.""" - with self._lock: - return get_aggregator_values( - aggregator_or_name, self.counters, lambda counter: counter.value()) - - -def get_aggregator_values(aggregator_or_name, counter_dict, - value_extractor=None): - """Extracts the named aggregator value from a set of counters. - - Args: - aggregator_or_name: an Aggregator object or the name of one. - counter_dict: a dict object of {name: value_wrapper} - value_extractor: a function to convert the value_wrapper into a value. - If None, no extraction is done and the value is return unchanged. - - Returns: - dict of step names to values of the aggregator. - """ - name = aggregator_or_name - if value_extractor is None: - value_extractor = lambda x: x - if not isinstance(aggregator_or_name, basestring): - name = aggregator_or_name.name - return {n: value_extractor(c) for n, c in counter_dict.iteritems() - if n.startswith(USER_COUNTER_PREFIX) - and n.endswith('-%s' % name)}