Repository: incubator-beam Updated Branches: refs/heads/python-sdk e834fa82b -> 65152cab8
Implement add_input for all CombineFns. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3ebf28c6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3ebf28c6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3ebf28c6 Branch: refs/heads/python-sdk Commit: 3ebf28c6e0d17af3720076e33f88a0f126a89059 Parents: e834fa8 Author: Robert Bradshaw <rober...@gmail.com> Authored: Tue Jul 26 01:15:55 2016 -0700 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Tue Aug 2 15:52:28 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/transforms/combiners.py | 16 ++++++++-------- sdks/python/apache_beam/transforms/core.py | 6 +++--- 2 files changed, 11 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3ebf28c6/sdks/python/apache_beam/transforms/combiners.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py index 155dcc6..c3f0da1 100644 --- a/sdks/python/apache_beam/transforms/combiners.py +++ b/sdks/python/apache_beam/transforms/combiners.py @@ -132,6 +132,9 @@ class CountCombineFn(core.CombineFn): def create_accumulator(self): return 0 + def add_input(self, accumulator, element): + return accumulator + 1 + def add_inputs(self, accumulator, elements): return accumulator + len(elements) @@ -425,9 +428,9 @@ class _TupleCombineFnBase(core.CombineFn): class TupleCombineFn(_TupleCombineFnBase): - def add_inputs(self, accumulator, elements): - return [c.add_inputs(a, e) - for c, a, e in zip(self._combiners, accumulator, zip(*elements))] + def add_input(self, accumulator, element): + return [c.add_input(a, e) + for c, a, e in zip(self._combiners, accumulator, element)] def with_common_input(self): return SingleInputTupleCombineFn(*self._combiners) @@ -435,8 +438,8 @@ class TupleCombineFn(_TupleCombineFnBase): class SingleInputTupleCombineFn(_TupleCombineFnBase): - def add_inputs(self, accumulator, elements): - return [c.add_inputs(a, elements) + def add_input(self, accumulator, element): + return [c.add_input(a, element) for c, a in zip(self._combiners, accumulator)] @@ -522,9 +525,6 @@ def curry_combine_fn(fn, args, kwargs): def add_input(self, accumulator, element): return fn.add_input(accumulator, element, *args, **kwargs) - def add_inputs(self, accumulator, elements): - return fn.add_inputs(accumulator, elements, *args, **kwargs) - def merge_accumulators(self, accumulators): return fn.merge_accumulators(accumulators, *args, **kwargs) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3ebf28c6/sdks/python/apache_beam/transforms/core.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 38b9cd2..da26205 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -270,7 +270,7 @@ class CombineFn(WithTypeHints): 1. Input values are partitioned into one or more batches. 2. For each batch, the create_accumulator method is invoked to create a fresh initial "accumulator" value representing the combination of zero values. - 3. For each input value in the batch, the add_inputs method is invoked to + 3. For each input value in the batch, the add_input method is invoked to combine more values with the accumulator for that batch. 4. The merge_accumulators method is invoked to combine accumulators from separate batches into a single combined output accumulator value, once all @@ -296,7 +296,7 @@ class CombineFn(WithTypeHints): def add_input(self, accumulator, element, *args, **kwargs): """Return result of folding element into accumulator. - CombineFn implementors must override either add_input or add_inputs. + CombineFn implementors must override add_input. Args: accumulator: the current accumulator @@ -420,7 +420,7 @@ class CallableWrapperCombineFn(CombineFn): if accumulator is self._EMPTY: return self._fn(elements, *args, **kwargs) elif isinstance(elements, (list, tuple)): - return self._fn([accumulator] + elements, *args, **kwargs) + return self._fn([accumulator] + list(elements), *args, **kwargs) else: def union(): yield accumulator