Add Cython DoFnContext and Receiver stubs.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8efc231c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8efc231c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8efc231c Branch: refs/heads/python-sdk Commit: 8efc231c867fcf5267ba21a8a33ee306b87d9945 Parents: dc42467 Author: Robert Bradshaw <rober...@google.com> Authored: Tue Jul 19 12:22:08 2016 -0700 Committer: Robert Bradshaw <rober...@google.com> Committed: Wed Jul 20 13:06:21 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/runners/common.pxd | 20 ++++++++++++--- sdks/python/apache_beam/runners/common.py | 34 ++++++++++++++++++++++++- 2 files changed, 50 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8efc231c/sdks/python/apache_beam/runners/common.pxd ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd index 480c056..e855376 100644 --- a/sdks/python/apache_beam/runners/common.pxd +++ b/sdks/python/apache_beam/runners/common.pxd @@ -15,17 +15,31 @@ # limitations under the License. # -cdef type SideOutputValue, TimestampedValue, WindowedValue +from apache_beam.utils.windowed_value cimport WindowedValue + +cdef type SideOutputValue, TimestampedValue + cdef class DoFnRunner(object): cdef object dofn cdef object window_fn - cdef object context + cdef object context # TODO(robertwb): Make this a DoFnContext cdef object tagged_receivers cdef object logger cdef object step_name - cdef object main_receivers + cdef object main_receivers # TODO(robertwb): Make this a Receiver cpdef _process_outputs(self, element, results) + + +cdef class DoFnContext(object): + cdef object label + cdef object state + cdef WindowedValue windowed_value + cdef set_element(self, WindowedValue windowed_value) + + +cdef class Receiver(object): + cdef receive(self, WindowedValue windowed_value) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8efc231c/sdks/python/apache_beam/runners/common.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 3c0c3f6..059359c 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -25,8 +25,8 @@ from apache_beam.internal import util from apache_beam.pvalue import SideOutputValue from apache_beam.transforms import core from apache_beam.transforms.window import TimestampedValue -from apache_beam.transforms.window import WindowedValue from apache_beam.transforms.window import WindowFn +from apache_beam.utils.windowed_value import WindowedValue class FakeLogger(object): @@ -188,3 +188,35 @@ class DoFnState(object): """Looks up the counter for this aggregator, creating one if necessary.""" return self._counter_factory.get_aggregator_counter( self.step_name, aggregator) + + +class DoFnContext(object): + + def __init__(self, label, element=None, state=None): + self.label = label + self.state = state + if element is not None: + self.set_element(element) + + def set_element(self, windowed_value): + self.windowed_value = windowed_value + + @property + def element(self): + return self.windowed_value.value + + @property + def timestamp(self): + return self.windowed_value.timestamp + + @property + def windows(self): + return self.windowed_value.windows + + def aggregate_to(self, aggregator, input_value): + self.state.counter_for(aggregator).update(input_value) + + +class Receiver(object): + def receive(self, windowed_value): + raise NotImplementedError