Runner API encoding of common coders.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9cc004fb Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9cc004fb Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9cc004fb Branch: refs/heads/master Commit: 9cc004fb0c32234b541cd622a0d0ab4c5c3d2389 Parents: ef4239a Author: Robert Bradshaw <rober...@gmail.com> Authored: Tue Aug 22 10:54:21 2017 -0700 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Thu Aug 24 15:41:46 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/coders/coders.py | 42 ++++++++++++++++++-- .../apache_beam/coders/coders_test_common.py | 4 +- sdks/python/apache_beam/utils/urns.py | 11 ++++- 3 files changed, 52 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/9cc004fb/sdks/python/apache_beam/coders/coders.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index 0ea5f7c..e204369 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -206,9 +206,9 @@ class Coder(object): @classmethod def register_urn(cls, urn, parameter_type, fn=None): - """Registeres a urn with a constructor. + """Registers a urn with a constructor. - For example, if 'beam:fn:foo' had paramter type FooPayload, one could + For example, if 'beam:fn:foo' had parameter type FooPayload, one could write `RunnerApiFn.register_urn('bean:fn:foo', FooPayload, foo_from_proto)` where foo_from_proto took as arguments a FooPayload and a PipelineContext. This function can also be used as a decorator rather than passing the @@ -228,7 +228,6 @@ class Coder(object): return register def to_runner_api(self, context): - from apache_beam.portability.api import beam_runner_api_pb2 urn, typed_param, components = self.to_runner_api_parameter(context) return beam_runner_api_pb2.Coder( spec=beam_runner_api_pb2.SdkFunctionSpec( @@ -257,6 +256,22 @@ class Coder(object): google.protobuf.wrappers_pb2.BytesValue(value=serialize_coder(self)), ()) + @staticmethod + def register_structured_urn(urn, cls): + """Register a coder that's completely defined by its urn and its + component(s), if any, which are passed to construct the instance. + """ + cls.to_runner_api_parameter = ( + lambda self, unused_context: (urn, None, self._get_component_coders())) + + # pylint: disable=unused-variable + @Coder.register_urn(urn, None) + def from_runner_api_parameter(unused_payload, components, unused_context): + if components: + return cls(*components) + else: + return cls() + @Coder.register_urn(urns.PICKLED_CODER, google.protobuf.wrappers_pb2.BytesValue) def _pickle_from_runner_api_parameter(payload, components, context): @@ -337,6 +352,9 @@ class BytesCoder(FastCoder): return hash(type(self)) +Coder.register_structured_urn(urns.BYTES_CODER, BytesCoder) + + class VarIntCoder(FastCoder): """Variable-length integer coder.""" @@ -353,6 +371,9 @@ class VarIntCoder(FastCoder): return hash(type(self)) +Coder.register_structured_urn(urns.VAR_INT_CODER, VarIntCoder) + + class FloatCoder(FastCoder): """A coder used for floating-point values.""" @@ -757,6 +778,9 @@ class IterableCoder(FastCoder): return hash((type(self), self._elem_coder)) +Coder.register_structured_urn(urns.ITERABLE_CODER, IterableCoder) + + class GlobalWindowCoder(SingletonCoder): """Coder for global windows.""" @@ -770,6 +794,9 @@ class GlobalWindowCoder(SingletonCoder): } +Coder.register_structured_urn(urns.GLOBAL_WINDOW_CODER, GlobalWindowCoder) + + class IntervalWindowCoder(FastCoder): """Coder for an window defined by a start timestamp and a duration.""" @@ -791,6 +818,9 @@ class IntervalWindowCoder(FastCoder): return hash(type(self)) +Coder.register_structured_urn(urns.INTERVAL_WINDOW_CODER, IntervalWindowCoder) + + class WindowedValueCoder(FastCoder): """Coder for windowed values.""" @@ -847,6 +877,9 @@ class WindowedValueCoder(FastCoder): (self.wrapped_value_coder, self.timestamp_coder, self.window_coder)) +Coder.register_structured_urn(urns.WINDOWED_VALUE_CODER, WindowedValueCoder) + + class LengthPrefixCoder(FastCoder): """For internal use only; no backwards-compatibility guarantees. @@ -886,3 +919,6 @@ class LengthPrefixCoder(FastCoder): def __hash__(self): return hash((type(self), self._value_coder)) + + +Coder.register_structured_urn(urns.LENGTH_PREFIX_CODER, LengthPrefixCoder) http://git-wip-us.apache.org/repos/asf/beam/blob/9cc004fb/sdks/python/apache_beam/coders/coders_test_common.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index 577c53a..8b0353d 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -26,6 +26,7 @@ import dill from apache_beam.transforms.window import GlobalWindow from apache_beam.utils.timestamp import MIN_TIMESTAMP import observable +from apache_beam.runners import pipeline_context from apache_beam.transforms import window from apache_beam.utils import timestamp from apache_beam.utils import windowed_value @@ -90,7 +91,8 @@ class CodersTest(unittest.TestCase): self.assertEqual(coder.get_impl().get_estimated_size_and_observables(v), (coder.get_impl().estimate_size(v), [])) copy1 = dill.loads(dill.dumps(coder)) - copy2 = dill.loads(dill.dumps(coder)) + context = pipeline_context.PipelineContext() + copy2 = coders.Coder.from_runner_api(coder.to_runner_api(context), context) for v in values: self.assertEqual(v, copy1.decode(copy2.encode(v))) if coder.is_deterministic(): http://git-wip-us.apache.org/repos/asf/beam/blob/9cc004fb/sdks/python/apache_beam/utils/urns.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py index acf729f..18959be 100644 --- a/sdks/python/apache_beam/utils/urns.py +++ b/sdks/python/apache_beam/utils/urns.py @@ -35,7 +35,6 @@ SESSION_WINDOWS_FN = "beam:windowfn:session_windows:v0.1" PICKLED_DO_FN = "beam:dofn:pickled_python:v0.1" PICKLED_DO_FN_INFO = "beam:dofn:pickled_python_info:v0.1" PICKLED_COMBINE_FN = "beam:combinefn:pickled_python:v0.1" -PICKLED_CODER = "beam:coder:pickled_python:v0.1" PICKLED_TRANSFORM = "beam:ptransform:pickled_python:v0.1" PARDO_TRANSFORM = "beam:ptransform:pardo:v0.1" @@ -50,6 +49,16 @@ WINDOW_INTO_TRANSFORM = "beam:ptransform:window_into:v0.1" PICKLED_SOURCE = "beam:source:pickled_python:v0.1" +PICKLED_CODER = "beam:coder:pickled_python:v0.1" +BYTES_CODER = "urn:beam:coders:bytes:0.1" +VAR_INT_CODER = "urn:beam:coders:varint:0.1" +INTERVAL_WINDOW_CODER = "urn:beam:coders:interval_window:0.1" +ITERABLE_CODER = "urn:beam:coders:stream:0.1" +KV_CODER = "urn:beam:coders:kv:0.1" +LENGTH_PREFIX_CODER = "urn:beam:coders:length_prefix:0.1" +GLOBAL_WINDOW_CODER = "urn:beam:coders:urn:beam:coders:global_window:0.1" +WINDOWED_VALUE_CODER = "urn:beam:coders:windowed_value:0.1" + class RunnerApiFn(object): """Abstract base class that provides urn registration utilities.