Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk e834fa82b -> 65152cab8


Implement add_input for all CombineFns.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3ebf28c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3ebf28c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3ebf28c6

Branch: refs/heads/python-sdk
Commit: 3ebf28c6e0d17af3720076e33f88a0f126a89059
Parents: e834fa8
Author: Robert Bradshaw <rober...@gmail.com>
Authored: Tue Jul 26 01:15:55 2016 -0700
Committer: Robert Bradshaw <rober...@gmail.com>
Committed: Tue Aug 2 15:52:28 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/transforms/combiners.py | 16 ++++++++--------
 sdks/python/apache_beam/transforms/core.py      |  6 +++---
 2 files changed, 11 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3ebf28c6/sdks/python/apache_beam/transforms/combiners.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners.py 
b/sdks/python/apache_beam/transforms/combiners.py
index 155dcc6..c3f0da1 100644
--- a/sdks/python/apache_beam/transforms/combiners.py
+++ b/sdks/python/apache_beam/transforms/combiners.py
@@ -132,6 +132,9 @@ class CountCombineFn(core.CombineFn):
   def create_accumulator(self):
     return 0
 
+  def add_input(self, accumulator, element):
+    return accumulator + 1
+
   def add_inputs(self, accumulator, elements):
     return accumulator + len(elements)
 
@@ -425,9 +428,9 @@ class _TupleCombineFnBase(core.CombineFn):
 
 class TupleCombineFn(_TupleCombineFnBase):
 
-  def add_inputs(self, accumulator, elements):
-    return [c.add_inputs(a, e)
-            for c, a, e in zip(self._combiners, accumulator, zip(*elements))]
+  def add_input(self, accumulator, element):
+    return [c.add_input(a, e)
+            for c, a, e in zip(self._combiners, accumulator, element)]
 
   def with_common_input(self):
     return SingleInputTupleCombineFn(*self._combiners)
@@ -435,8 +438,8 @@ class TupleCombineFn(_TupleCombineFnBase):
 
 class SingleInputTupleCombineFn(_TupleCombineFnBase):
 
-  def add_inputs(self, accumulator, elements):
-    return [c.add_inputs(a, elements)
+  def add_input(self, accumulator, element):
+    return [c.add_input(a, element)
             for c, a in zip(self._combiners, accumulator)]
 
 
@@ -522,9 +525,6 @@ def curry_combine_fn(fn, args, kwargs):
       def add_input(self, accumulator, element):
         return fn.add_input(accumulator, element, *args, **kwargs)
 
-      def add_inputs(self, accumulator, elements):
-        return fn.add_inputs(accumulator, elements, *args, **kwargs)
-
       def merge_accumulators(self, accumulators):
         return fn.merge_accumulators(accumulators, *args, **kwargs)
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3ebf28c6/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py 
b/sdks/python/apache_beam/transforms/core.py
index 38b9cd2..da26205 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -270,7 +270,7 @@ class CombineFn(WithTypeHints):
   1. Input values are partitioned into one or more batches.
   2. For each batch, the create_accumulator method is invoked to create a fresh
      initial "accumulator" value representing the combination of zero values.
-  3. For each input value in the batch, the add_inputs method is invoked to
+  3. For each input value in the batch, the add_input method is invoked to
      combine more values with the accumulator for that batch.
   4. The merge_accumulators method is invoked to combine accumulators from
      separate batches into a single combined output accumulator value, once all
@@ -296,7 +296,7 @@ class CombineFn(WithTypeHints):
   def add_input(self, accumulator, element, *args, **kwargs):
     """Return result of folding element into accumulator.
 
-    CombineFn implementors must override either add_input or add_inputs.
+    CombineFn implementors must override add_input.
 
     Args:
       accumulator: the current accumulator
@@ -420,7 +420,7 @@ class CallableWrapperCombineFn(CombineFn):
     if accumulator is self._EMPTY:
       return self._fn(elements, *args, **kwargs)
     elif isinstance(elements, (list, tuple)):
-      return self._fn([accumulator] + elements, *args, **kwargs)
+      return self._fn([accumulator] + list(elements), *args, **kwargs)
     else:
       def union():
         yield accumulator

Reply via email to