This is an automated email from the ASF dual-hosted git repository. altay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new ea40c6d Add compacting to TypeCheckCombineFn. (#7821) ea40c6d is described below commit ea40c6dc7800c95b4b25921166c263cee204dc11 Author: Robert Bradshaw <rober...@gmail.com> AuthorDate: Thu Feb 14 01:45:41 2019 +0100 Add compacting to TypeCheckCombineFn. (#7821) * [BEAM-4030] Add compact() to various helper CombineFns. --- sdks/python/apache_beam/runners/direct/helper_transforms.py | 2 +- sdks/python/apache_beam/transforms/combiners.py | 12 +++++++++--- sdks/python/apache_beam/typehints/typecheck.py | 3 +++ 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/runners/direct/helper_transforms.py b/sdks/python/apache_beam/runners/direct/helper_transforms.py index 6d894fb..60b3ad3 100644 --- a/sdks/python/apache_beam/runners/direct/helper_transforms.py +++ b/sdks/python/apache_beam/runners/direct/helper_transforms.py @@ -69,7 +69,7 @@ class PartialGroupByKeyCombiningValues(beam.DoFn): def finish_bundle(self): for (k, w), va in self._cache.items(): - yield WindowedValue((k, va), w.end, (w,)) + yield WindowedValue((k, self._combine_fn.compact(va)), w.end, (w,)) def default_type_hints(self): hints = self._combine_fn.get_type_hints().copy() diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py index 8aa6241..65e098e 100644 --- a/sdks/python/apache_beam/transforms/combiners.py +++ b/sdks/python/apache_beam/transforms/combiners.py @@ -595,6 +595,9 @@ class SampleCombineFn(core.CombineFn): def merge_accumulators(self, heaps): return self._top_combiner.merge_accumulators(heaps) + def compact(self, heap): + return self._top_combiner.compact(heap) + def extract_output(self, heap): # Here we strip off the random number keys we added in add_input. return [e for _, e in self._top_combiner.extract_output(heap)] @@ -618,6 +621,9 @@ class _TupleCombineFnBase(core.CombineFn): return [c.merge_accumulators(a) for c, a in zip(self._combiners, zip(*accumulators))] + def compact(self, accumulator): + return [c.compact(a) for c, a in zip(self._combiners, accumulator)] + def extract_output(self, accumulator): return tuple([c.extract_output(a) for c, a in zip(self._combiners, accumulator)]) @@ -736,12 +742,12 @@ class _CurriedFn(core.CombineFn): def merge_accumulators(self, accumulators): return self.fn.merge_accumulators(accumulators, *self.args, **self.kwargs) - def extract_output(self, accumulator): - return self.fn.extract_output(accumulator, *self.args, **self.kwargs) - def compact(self, accumulator): return self.fn.compact(accumulator, *self.args, **self.kwargs) + def extract_output(self, accumulator): + return self.fn.extract_output(accumulator, *self.args, **self.kwargs) + def apply(self, elements): return self.fn.apply(elements, *self.args, **self.kwargs) diff --git a/sdks/python/apache_beam/typehints/typecheck.py b/sdks/python/apache_beam/typehints/typecheck.py index 72106a5..b69abae 100644 --- a/sdks/python/apache_beam/typehints/typecheck.py +++ b/sdks/python/apache_beam/typehints/typecheck.py @@ -213,6 +213,9 @@ class TypeCheckCombineFn(core.CombineFn): def merge_accumulators(self, accumulators, *args, **kwargs): return self._combinefn.merge_accumulators(accumulators, *args, **kwargs) + def compact(self, accumulator, *args, **kwargs): + return self._combinefn.compact(accumulator, *args, **kwargs) + def extract_output(self, accumulator, *args, **kwargs): result = self._combinefn.extract_output(accumulator, *args, **kwargs) if self._output_type_hint: