Repository: incubator-beam Updated Branches: refs/heads/python-sdk b4716d9dc -> 351c3831d
Optimize Map and Flatmap when there are no side inputs. varargs and kwargs are expensive, even when they're empty. This is especially true for otherwise one-argument Python calls which are special cased in CPython. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7d2fb1f8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7d2fb1f8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7d2fb1f8 Branch: refs/heads/python-sdk Commit: 7d2fb1f88d1a2370dd4053f3a1738cbb9838cc2f Parents: b4716d9 Author: Robert Bradshaw <rober...@google.com> Authored: Wed Jul 27 18:29:59 2016 -0700 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Thu Jul 28 11:09:48 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/transforms/core.py | 40 ++++++++++++++++++------- 1 file changed, 30 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7d2fb1f8/sdks/python/apache_beam/transforms/core.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 5e6aafc..38b9cd2 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -20,6 +20,8 @@ from __future__ import absolute_import import copy +import inspect +import types from apache_beam import pvalue from apache_beam import typehints @@ -194,6 +196,16 @@ class DoFn(WithTypeHints): return type_hint +def _fn_takes_side_inputs(fn): + try: + argspec = inspect.getargspec(fn) + except TypeError: + # We can't tell; maybe it does. + return True + is_bound = isinstance(fn, types.MethodType) and fn.im_self is not None + return len(argspec.args) > 1 + is_bound or argspec.varargs or argspec.keywords + + class CallableWrapperDoFn(DoFn): """A DoFn (function) object wrapping a callable object. @@ -214,6 +226,11 @@ class CallableWrapperDoFn(DoFn): raise TypeError('Expected a callable object instead of: %r' % fn) self._fn = fn + if _fn_takes_side_inputs(fn): + self.process = lambda context, *args, **kwargs: fn( + context.element, *args, **kwargs) + else: + self.process = lambda context: fn(context.element) super(CallableWrapperDoFn, self).__init__() @@ -237,9 +254,6 @@ class CallableWrapperDoFn(DoFn): return self._strip_output_annotations( trivial_inference.infer_return_type(self._fn, [input_type])) - def process(self, context, *args, **kwargs): - return self._fn(context.element, *args, **kwargs) - def process_argspec_fn(self): return getattr(self._fn, '_argspec_fn', self._fn) @@ -676,7 +690,10 @@ def Map(fn_or_label, *args, **kwargs): # pylint: disable=invalid-name 'Map can be used only with callable objects. ' 'Received %r instead for %s argument.' % (fn, 'first' if label is None else 'second')) - wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)] + if _fn_takes_side_inputs(fn): + wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)] + else: + wrapper = lambda x: [fn(x)] # Proxy the type-hint information from the original function to this new # wrapped function. @@ -1008,21 +1025,24 @@ class GroupByKey(PTransform): value_type = windowed_value_iter_type.inner_type.inner_type return Iterable[KV[key_type, Iterable[value_type]]] - def process(self, context): - k, vs = context.element + def start_bundle(self, context): # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.transforms.trigger import InMemoryUnmergedState from apache_beam.transforms.trigger import create_trigger_driver # pylint: enable=wrong-import-order, wrong-import-position - driver = create_trigger_driver(self.windowing, True) - state = InMemoryUnmergedState() + self.driver = create_trigger_driver(self.windowing, True) + self.state_type = InMemoryUnmergedState + + def process(self, context): + k, vs = context.element + state = self.state_type() # TODO(robertwb): Conditionally process in smaller chunks. - for wvalue in driver.process_elements(state, vs, MIN_TIMESTAMP): + for wvalue in self.driver.process_elements(state, vs, MIN_TIMESTAMP): yield wvalue.with_value((k, wvalue.value)) while state.timers: fired = state.get_and_clear_timers() for timer_window, (name, time_domain, fire_time) in fired: - for wvalue in driver.process_timer( + for wvalue in self.driver.process_timer( timer_window, name, time_domain, fire_time, state): yield wvalue.with_value((k, wvalue.value))