Minor fixups for better testing
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a50efccc Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a50efccc Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a50efccc Branch: refs/heads/python-sdk Commit: a50efccc4b79e95cedf49d2680df67dd2b587927 Parents: 82f553e Author: Robert Bradshaw <rober...@google.com> Authored: Mon Oct 17 17:50:32 2016 -0700 Committer: Robert Bradshaw <rober...@google.com> Committed: Tue Oct 18 12:17:16 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/runners/common.py | 5 +++-- sdks/python/apache_beam/transforms/sideinputs.py | 7 ++++++- sdks/python/apache_beam/transforms/sideinputs_test.py | 5 ++++- 3 files changed, 13 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a50efccc/sdks/python/apache_beam/runners/common.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index e500fd8..cc834ba 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -82,8 +82,9 @@ class DoFnRunner(Receiver): if isinstance(side_input, sideinputs.SideInputMap) else {global_window: side_input} for side_input in side_inputs] - if side_inputs and all(side_input.is_globally_windowed() - for side_input in side_inputs): + if side_inputs and all( + isinstance(side_input, dict) or side_input.is_globally_windowed() + for side_input in side_inputs): args, kwargs = util.insert_values_in_args( args, kwargs, [side_input[global_window] for side_input in side_inputs]) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a50efccc/sdks/python/apache_beam/transforms/sideinputs.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/sideinputs.py b/sdks/python/apache_beam/transforms/sideinputs.py index 182179a..f3a7178 100644 --- a/sdks/python/apache_beam/transforms/sideinputs.py +++ b/sdks/python/apache_beam/transforms/sideinputs.py @@ -178,7 +178,8 @@ class SideInputMap(object): """Represents a mapping of windows to side input values.""" def __init__(self, view_class, view_options, iterable): - self._window_mapping_fn = view_options['window_mapping_fn'] + self._window_mapping_fn = view_options.get( + 'window_mapping_fn', _global_window_mapping_fn) self._view_class = view_class self._view_options = view_options self._iterable = iterable @@ -207,3 +208,7 @@ class _FilteringIterable(object): for wv in self._iterable: if self._target_window in wv.windows: yield wv.value + + def __reduce__(self): + # Pickle self as an already filtered list. + return list, (list(self),) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a50efccc/sdks/python/apache_beam/transforms/sideinputs_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py index 28e324a..caf3652 100644 --- a/sdks/python/apache_beam/transforms/sideinputs_test.py +++ b/sdks/python/apache_beam/transforms/sideinputs_test.py @@ -27,12 +27,15 @@ from apache_beam.transforms.util import assert_that, equal_to class SideInputsTest(unittest.TestCase): + def create_pipeline(self): + return beam.Pipeline('DirectPipelineRunner') + def run_windowed_side_inputs(self, elements, main_window_fn, side_window_fn=None, side_input_type=beam.pvalue.AsList, combine_fn=None, expected=None): - with beam.Pipeline('DirectPipelineRunner') as p: + with self.create_pipeline() as p: pcoll = p | beam.Create(elements) | beam.Map( lambda t: window.TimestampedValue(t, t)) main = pcoll | 'WindowMain' >> beam.WindowInto(main_window_fn)