Implement windowed side inputs for direct runner.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/29a73789 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/29a73789 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/29a73789 Branch: refs/heads/python-sdk Commit: 29a73789c787588dd35edc7964b398961c627cdf Parents: 9007376 Author: Robert Bradshaw <rober...@google.com> Authored: Tue Oct 11 15:27:58 2016 -0700 Committer: Robert Bradshaw <rober...@google.com> Committed: Tue Oct 18 12:17:15 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/pvalue.py | 44 ++++++++++--- sdks/python/apache_beam/runners/common.pxd | 1 + sdks/python/apache_beam/runners/common.py | 42 ++++++++---- .../python/apache_beam/runners/direct_runner.py | 25 +------- .../inprocess/inprocess_evaluation_context.py | 8 +-- .../python/apache_beam/transforms/sideinputs.py | 67 +++++++++++++++++--- .../apache_beam/transforms/sideinputs_test.py | 8 --- sdks/python/apache_beam/transforms/timeutil.py | 4 ++ sdks/python/apache_beam/transforms/window.py | 3 + 9 files changed, 139 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29a73789/sdks/python/apache_beam/pvalue.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py index 063d0b5..f8b7feb 100644 --- a/sdks/python/apache_beam/pvalue.py +++ b/sdks/python/apache_beam/pvalue.py @@ -27,6 +27,7 @@ produced when the pipeline gets executed. from __future__ import absolute_import import collections +import itertools class PValue(object): @@ -227,9 +228,10 @@ class SideOutputValue(object): class PCollectionView(PValue): """An immutable view of a PCollection that can be used as a side input.""" - def __init__(self, pipeline): + def __init__(self, pipeline, window_mapping_fn): """Initializes a PCollectionView. Do not call directly.""" super(PCollectionView, self).__init__(pipeline) + self._window_mapping_fn = window_mapping_fn @property def windowing(self): @@ -246,34 +248,60 @@ class PCollectionView(PValue): Returns: Tuple of options for the given view. """ - return () + return {'window_mapping_fn': self._window_mapping_fn} class SingletonPCollectionView(PCollectionView): """A PCollectionView that contains a single object.""" - def __init__(self, pipeline, has_default, default_value): - super(SingletonPCollectionView, self).__init__(pipeline) + def __init__(self, pipeline, has_default, default_value, + window_mapping_fn): + super(SingletonPCollectionView, self).__init__(pipeline, window_mapping_fn) self.has_default = has_default self.default_value = default_value def _view_options(self): - return (self.has_default, self.default_value) + base = super(SingletonPCollectionView, self)._view_options() + if self.has_default: + return dict(base, default=self.default_value) + else: + return base + + @staticmethod + def from_iterable(it, options): + head = list(itertools.islice(it, 2)) + if len(head) == 0: + return options.get('default', EmptySideInput()) + elif len(head) == 1: + return head[0] + else: + raise ValueError( + 'PCollection with more than one element accessed as ' + 'a singleton view.') class IterablePCollectionView(PCollectionView): """A PCollectionView that can be treated as an iterable.""" - pass + + @staticmethod + def from_iterable(it, options): + return it class ListPCollectionView(PCollectionView): """A PCollectionView that can be treated as a list.""" - pass + + @staticmethod + def from_iterable(it, options): + return list(it) class DictPCollectionView(PCollectionView): """A PCollectionView that can be treated as a dict.""" - pass + + @staticmethod + def from_iterable(it, options): + return dict(it) def _get_cached_view(pipeline, key): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29a73789/sdks/python/apache_beam/runners/common.pxd ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd index 5cd4cf8..085fd11 100644 --- a/sdks/python/apache_beam/runners/common.pxd +++ b/sdks/python/apache_beam/runners/common.pxd @@ -36,6 +36,7 @@ cdef class DoFnRunner(Receiver): cdef object tagged_receivers cdef LoggingContext logging_context cdef object step_name + cdef bint has_windowed_side_inputs cdef Receiver main_receivers http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29a73789/sdks/python/apache_beam/runners/common.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 67277c3..86fd819 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -24,6 +24,8 @@ import sys from apache_beam.internal import util from apache_beam.pvalue import SideOutputValue from apache_beam.transforms import core +from apache_beam.transforms import sideinputs +from apache_beam.transforms import window from apache_beam.transforms.window import TimestampedValue from apache_beam.transforms.window import WindowFn from apache_beam.utils.windowed_value import WindowedValue @@ -69,24 +71,36 @@ class DoFnRunner(Receiver): # Preferred alternative to context # TODO(robertwb): Remove once all runners are updated. state=None): + self.has_windowed_side_inputs = False # Set to True in one case below. if not args and not kwargs: self.dofn = fn self.dofn_process = fn.process else: - args, kwargs = util.insert_values_in_args(args, kwargs, side_inputs) + # TODO(robertwb): Remove when all runners pass side input maps. + # TODO(robertwb): Optimize for global windows case. + side_inputs = [side_input if isinstance(side_input, sideinputs.SideInputMap) + else {window.GlobalWindow(): side_input} + for side_input in side_inputs] + if side_inputs: + self.has_windowed_side_inputs = True + def process(context): + w = context.windows[0] + cur_args, cur_kwargs = util.insert_values_in_args( + args, kwargs, [side_input[w] for side_input in side_inputs]) + return fn.process(context, *cur_args, **cur_kwargs) + self.dofn_process = process + elif kwargs: + self.dofn_process = lambda context: fn.process(context, *args, **kwargs) + else: + self.dofn_process = lambda context: fn.process(context, *args) class CurriedFn(core.DoFn): - def start_bundle(self, context): - return fn.start_bundle(context) - - def process(self, context): - return fn.process(context, *args, **kwargs) + start_bundle = staticmethod(fn.start_bundle) + process = staticmethod(self.dofn_process) + finish_bundle = staticmethod(fn.finish_bundle) - def finish_bundle(self, context): - return fn.finish_bundle(context) self.dofn = CurriedFn() - self.dofn_process = lambda context: fn.process(context, *args, **kwargs) self.window_fn = windowing.windowfn self.tagged_receivers = tagged_receivers @@ -133,8 +147,14 @@ class DoFnRunner(Receiver): def process(self, element): try: self.logging_context.enter() - self.context.set_element(element) - self._process_outputs(element, self.dofn_process(self.context)) + if self.has_windowed_side_inputs and len(element.windows) > 1: + for w in element.windows: + self.context.set_element( + WindowedValue(element.value, element.timestamp, w)) + self._process_outputs(element, self.dofn_process(self.context)) + else: + self.context.set_element(element) + self._process_outputs(element, self.dofn_process(self.context)) except BaseException as exn: self.reraise_augmented(exn) finally: http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29a73789/sdks/python/apache_beam/runners/direct_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct_runner.py b/sdks/python/apache_beam/runners/direct_runner.py index 656cc91..936043c 100644 --- a/sdks/python/apache_beam/runners/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct_runner.py @@ -41,6 +41,7 @@ from apache_beam.runners.runner import PipelineResult from apache_beam.runners.runner import PipelineRunner from apache_beam.runners.runner import PipelineState from apache_beam.runners.runner import PValueCache +from apache_beam.transforms import sideinputs from apache_beam.transforms.window import GlobalWindows from apache_beam.transforms.window import WindowedValue from apache_beam.typehints.typecheck import OutputCheckWrapperDoFn @@ -107,29 +108,7 @@ class DirectPipelineRunner(PipelineRunner): transform = transform_node.transform view = transform.view values = self._cache.get_pvalue(transform_node.inputs[0]) - if isinstance(view, SingletonPCollectionView): - has_default, default_value = view._view_options() # pylint: disable=protected-access - if len(values) == 0: - if has_default: - result = default_value - else: - result = EmptySideInput() - elif len(values) == 1: - # TODO(ccy): Figure out whether side inputs should ever be given as - # windowed values - result = values[0].value - else: - raise ValueError(('PCollection with more than one element accessed as ' - 'a singleton view: %s.') % view) - elif isinstance(view, IterablePCollectionView): - result = [v.value for v in values] - elif isinstance(view, ListPCollectionView): - result = [v.value for v in values] - elif isinstance(view, DictPCollectionView): - result = dict(v.value for v in values) - else: - raise NotImplementedError - + result = sideinputs.SideInputMap(type(view), view._view_options(), values) self._cache.cache_output(transform_node, result) @skip_if_cached http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29a73789/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py b/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py index 9c8b695..883be99 100644 --- a/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py +++ b/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py @@ -27,6 +27,7 @@ from apache_beam.pvalue import EmptySideInput from apache_beam.pvalue import IterablePCollectionView from apache_beam.pvalue import ListPCollectionView from apache_beam.pvalue import SingletonPCollectionView +from apache_beam.transforms import sideinputs from apache_beam.runners.inprocess.clock import Clock from apache_beam.runners.inprocess.inprocess_watermark_manager import InProcessWatermarkManager from apache_beam.runners.inprocess.inprocess_executor import TransformExecutor @@ -106,12 +107,9 @@ class _InProcessSideInputsContainer(object): ValueError: If values cannot be converted into the requested form. """ if isinstance(view, SingletonPCollectionView): - has_default, default_value = view._view_options() # pylint: disable=protected-access if len(values) == 0: - if has_default: - result = default_value - else: - result = EmptySideInput() + # pylint: disable=protected-access + result = view._view_options().get('default', EmptySideInput()) elif len(values) == 1: result = values[0].value else: http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29a73789/sdks/python/apache_beam/transforms/sideinputs.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/sideinputs.py b/sdks/python/apache_beam/transforms/sideinputs.py index 6c698da..00c2852 100644 --- a/sdks/python/apache_beam/transforms/sideinputs.py +++ b/sdks/python/apache_beam/transforms/sideinputs.py @@ -27,6 +27,7 @@ from __future__ import absolute_import from apache_beam import pvalue from apache_beam import typehints from apache_beam.transforms.ptransform import PTransform +from apache_beam.transforms import window # Type variables K = typehints.TypeVariable('K') @@ -50,10 +51,6 @@ class CreatePCollectionView(PTransform): return input_type def apply(self, pcoll): - if not pcoll.windowing.is_default(): - raise ValueError( - "Side inputs only supported for global windows, default triggering. " - "Found %s" % pcoll.windowing) return self.view @@ -78,7 +75,8 @@ class ViewAsSingleton(PTransform): return (pcoll | CreatePCollectionView( pvalue.SingletonPCollectionView( - pcoll.pipeline, self.has_default, self.default_value)) + pcoll.pipeline, self.has_default, self.default_value, + default_window_mapping_fn(pcoll.windowing.windowfn))) .with_input_types(input_type) .with_output_types(output_type)) @@ -101,7 +99,9 @@ class ViewAsIterable(PTransform): output_type = typehints.Iterable[input_type] return (pcoll | CreatePCollectionView( - pvalue.IterablePCollectionView(pcoll.pipeline)) + pvalue.IterablePCollectionView( + pcoll.pipeline, + default_window_mapping_fn(pcoll.windowing.windowfn))) .with_input_types(input_type) .with_output_types(output_type)) @@ -123,7 +123,9 @@ class ViewAsList(PTransform): input_type = pcoll.element_type output_type = typehints.List[input_type] return (pcoll - | CreatePCollectionView(pvalue.ListPCollectionView(pcoll.pipeline)) + | CreatePCollectionView(pvalue.ListPCollectionView( + pcoll.pipeline, + default_window_mapping_fn(pcoll.windowing.windowfn))) .with_input_types(input_type) .with_output_types(output_type)) @@ -150,6 +152,55 @@ class ViewAsDict(PTransform): output_type = typehints.Dict[key_type, value_type] return (pcoll | CreatePCollectionView( - pvalue.DictPCollectionView(pcoll.pipeline)) + pvalue.DictPCollectionView( + pcoll.pipeline, + default_window_mapping_fn(pcoll.windowing.windowfn))) .with_input_types(input_type) .with_output_types(output_type)) + + +# Top-level function so we can identify it later. +def _global_window_mapping_fn(w, global_window=window.GlobalWindow()): + return global_window + + +def default_window_mapping_fn(target_window_fn): + if target_window_fn == window.GlobalWindows(): + return _global_window_mapping_fn + else: + def map_via_end(source_window): + return list(target_window_fn.assign( + window.WindowFn.AssignContext(source_window.max_timestamp())))[0] + return map_via_end + + +class SideInputMap(object): + """Represents a mapping of windows to side input values.""" + + def __init__(self, view_class, view_options, iterable): + self._window_mapping_fn = view_options['window_mapping_fn'] + self._view_class = view_class + self._view_options = view_options + self._iterable = iterable + self._cache = {} + + def __getitem__(self, window): + if window not in self._cache: + target_window = self._window_mapping_fn(window) + self._cache[window] = self._view_class.from_iterable( + _FilteringIterable(self._iterable, target_window), self._view_options) + return self._cache[window] + + +class _FilteringIterable(object): + """An iterable containing only those values in the given window. + """ + + def __init__(self, iterable, target_window): + self._iterable = iterable + self._target_window = target_window + + def __iter__(self): + for wv in self._iterable: + if self._target_window in wv.windows: + yield wv.value http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29a73789/sdks/python/apache_beam/transforms/sideinputs_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py index 68deba8..8b5b4e0 100644 --- a/sdks/python/apache_beam/transforms/sideinputs_test.py +++ b/sdks/python/apache_beam/transforms/sideinputs_test.py @@ -27,14 +27,6 @@ from apache_beam.transforms.util import assert_that, equal_to class SideInputsTest(unittest.TestCase): - # TODO(BEAM-733): Actually support this. - def test_no_sideinput_windowing(self): - p = beam.Pipeline('DirectPipelineRunner') - pc = p | beam.Create([0, 1]) | beam.WindowInto(window.FixedWindows(10)) - with self.assertRaises(ValueError): - # pylint: disable=expression-not-assigned - pc | beam.Map(lambda x, side: None, side=beam.pvalue.AsIter(pc)) - def run_windowed_side_inputs(self, elements, main_window_fn, side_window_fn=None, side_input_type=beam.pvalue.AsList, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29a73789/sdks/python/apache_beam/transforms/timeutil.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/timeutil.py b/sdks/python/apache_beam/transforms/timeutil.py index 4092b60..f026d83 100644 --- a/sdks/python/apache_beam/transforms/timeutil.py +++ b/sdks/python/apache_beam/transforms/timeutil.py @@ -58,6 +58,10 @@ class Timestamp(object): return seconds return Timestamp(seconds) + def predecessor(self): + """Returns the largest timestamp smaller than self.""" + return Timestamp(micros=self.micros - 1) + def __repr__(self): micros = self.micros sign = '' http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29a73789/sdks/python/apache_beam/transforms/window.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index e07814d..9485032 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -143,6 +143,9 @@ class BoundedWindow(object): def __init__(self, end): self.end = Timestamp.of(end) + def max_timestamp(self): + return self.end.predecessor() + def __cmp__(self, other): # Order first by endpoint, then arbitrarily. return cmp(self.end, other.end) or cmp(hash(self), hash(other))