Move pipeline context and add more tests.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/deff128f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/deff128f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/deff128f Branch: refs/heads/master Commit: deff128ff07ccc67956e1d5a94a64f2a31b224c8 Parents: b2da21e Author: Robert Bradshaw <rober...@gmail.com> Authored: Thu Mar 9 09:21:33 2017 -0800 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Thu Mar 9 20:29:02 2017 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/coders/coders.py | 93 ++++++++++++++++++++ sdks/python/apache_beam/pipeline.py | 62 ------------- .../apache_beam/runners/pipeline_context.py | 88 ++++++++++++++++++ .../runners/pipeline_context_test.py | 49 +++++++++++ sdks/python/apache_beam/transforms/core.py | 1 + .../apache_beam/transforms/trigger_test.py | 18 +--- sdks/python/apache_beam/transforms/window.py | 2 +- .../apache_beam/transforms/window_test.py | 6 +- sdks/python/apache_beam/utils/urns.py | 2 +- 9 files changed, 238 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/coders/coders.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index fd72af8..9f5a97a 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -266,6 +266,12 @@ class BytesCoder(FastCoder): def is_deterministic(self): return True + def __eq__(self, other): + return type(self) == type(other) + + def __hash__(self): + return hash(type(self)) + class VarIntCoder(FastCoder): """Variable-length integer coder.""" @@ -276,6 +282,12 @@ class VarIntCoder(FastCoder): def is_deterministic(self): return True + def __eq__(self, other): + return type(self) == type(other) + + def __hash__(self): + return hash(type(self)) + class FloatCoder(FastCoder): """A coder used for floating-point values.""" @@ -286,6 +298,12 @@ class FloatCoder(FastCoder): def is_deterministic(self): return True + def __eq__(self, other): + return type(self) == type(other) + + def __hash__(self): + return hash(type(self)) + class TimestampCoder(FastCoder): """A coder used for timeutil.Timestamp values.""" @@ -296,6 +314,12 @@ class TimestampCoder(FastCoder): def is_deterministic(self): return True + def __eq__(self, other): + return type(self) == type(other) + + def __hash__(self): + return hash(type(self)) + class SingletonCoder(FastCoder): """A coder that always encodes exactly one value.""" @@ -309,6 +333,12 @@ class SingletonCoder(FastCoder): def is_deterministic(self): return True + def __eq__(self, other): + return type(self) == type(other) and self._value == other._value + + def __hash__(self): + return hash(self._value) + def maybe_dill_dumps(o): """Pickle using cPickle or the Dill pickler as a fallback.""" @@ -365,6 +395,12 @@ class _PickleCoderBase(FastCoder): def value_coder(self): return self + def __eq__(self, other): + return type(self) == type(other) + + def __hash__(self): + return hash(type(self)) + class PickleCoder(_PickleCoderBase): """Coder using Python's pickle functionality.""" @@ -446,6 +482,12 @@ class FastPrimitivesCoder(FastCoder): def value_coder(self): return self + def __eq__(self, other): + return type(self) == type(other) + + def __hash__(self): + return hash(type(self)) + class Base64PickleCoder(Coder): """Coder of objects by Python pickle, then base64 encoding.""" @@ -503,6 +545,13 @@ class ProtoCoder(FastCoder): # a Map. return False + def __eq__(self, other): + return (type(self) == type(other) + and self.proto_message_type == other.proto_message_type) + + def __hash__(self): + return hash(self.proto_message_type) + @staticmethod def from_type_hint(typehint, unused_registry): if issubclass(typehint, google.protobuf.message.Message): @@ -563,6 +612,13 @@ class TupleCoder(FastCoder): def __repr__(self): return 'TupleCoder[%s]' % ', '.join(str(c) for c in self._coders) + def __eq__(self, other): + return (type(self) == type(other) + and self._coders == self._coders) + + def __hash__(self): + return hash(self._coders) + class TupleSequenceCoder(FastCoder): """Coder of homogeneous tuple objects.""" @@ -586,6 +642,13 @@ class TupleSequenceCoder(FastCoder): def __repr__(self): return 'TupleSequenceCoder[%r]' % self._elem_coder + def __eq__(self, other): + return (type(self) == type(other) + and self._elem_coder == self._elem_coder) + + def __hash__(self): + return hash((type(self), self._elem_coder)) + class IterableCoder(FastCoder): """Coder of iterables of homogeneous objects.""" @@ -619,6 +682,13 @@ class IterableCoder(FastCoder): def __repr__(self): return 'IterableCoder[%r]' % self._elem_coder + def __eq__(self, other): + return (type(self) == type(other) + and self._elem_coder == self._elem_coder) + + def __hash__(self): + return hash((type(self), self._elem_coder)) + class WindowCoder(PickleCoder): """Coder for windows in windowed values.""" @@ -663,6 +733,12 @@ class IntervalWindowCoder(FastCoder): '@type': 'kind:interval_window', } + def __eq__(self, other): + return type(self) == type(other) + + def __hash__(self): + return hash(type(self)) + class WindowedValueCoder(FastCoder): """Coder for windowed values.""" @@ -709,6 +785,16 @@ class WindowedValueCoder(FastCoder): def __repr__(self): return 'WindowedValueCoder[%s]' % self.wrapped_value_coder + def __eq__(self, other): + return (type(self) == type(other) + and self.wrapped_value_coder == other.wrapped_value_coder + and self.timestamp_coder == other.timestamp_coder + and self.window_coder == other.window_coder) + + def __hash__(self): + return hash( + (self.wrapped_value_coder, self.timestamp_coder, self.window_coder)) + class LengthPrefixCoder(FastCoder): """Coder which prefixes the length of the encoded object in the stream.""" @@ -740,3 +826,10 @@ class LengthPrefixCoder(FastCoder): def __repr__(self): return 'LengthPrefixCoder[%r]' % self._value_coder + + def __eq__(self, other): + return (type(self) == type(other) + and self._value_coder == other._value_coder) + + def __hash__(self): + return hash((type(self), self._value_coder)) http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/pipeline.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 9edcf9b..7db39a9 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -52,14 +52,11 @@ import os import shutil import tempfile -from apache_beam import coders from apache_beam import pvalue from apache_beam import typehints from apache_beam.internal import pickler from apache_beam.runners import create_runner from apache_beam.runners import PipelineRunner -from apache_beam.runners.api import beam_runner_api_pb2 -from apache_beam.transforms import core from apache_beam.transforms import ptransform from apache_beam.typehints import TypeCheckError from apache_beam.utils.pipeline_options import PipelineOptions @@ -443,62 +440,3 @@ class AppliedPTransform(object): if v not in visited: visited.add(v) visitor.visit_value(v, self) - - -class PipelineContextMap(object): - """This is a bi-directional map between objects and ids. - - Under the hood it encodes and decodes these objects into runner API - representations. - """ - def __init__(self, context, obj_type, proto_map=None): - self._pipeline_context = context - self._obj_type = obj_type - self._obj_to_id = {} - self._id_to_obj = {} - self._id_to_proto = proto_map if proto_map else {} - self._counter = 0 - - def _unique_ref(self): - self._counter += 1 - return "ref_%s_%s" % (self._obj_type.__name__, self._counter) - - def populate_map(self, proto_map): - for id, obj in self._id_to_obj: - proto_map[id] = self._id_to_proto[id] - - def get_id(self, obj): - if obj not in self._obj_to_id: - id = self._unique_ref() - self._id_to_obj[id] = obj - self._obj_to_id[obj] = id - self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context) - return self._obj_to_id[obj] - - def get_by_id(self, id): - if id not in self._id_to_obj: - self._id_to_obj[id] = self._obj_type.from_runner_api( - self._id_to_proto[id], self._pipeline_context) - return self._id_to_obj[id] - - -class PipelineContext(object): - - _COMPONENT_TYPES = { - 'transforms': AppliedPTransform, - 'pcollections': pvalue.PCollection, - 'coders': coders.Coder, - 'windowing_strategies': core.Windowing, - # TODO: environment - } - - def __init__(self, context_proto=None): - for name, cls in self._COMPONENT_TYPES.items(): - setattr(self, name, - PipelineContextMap(self, cls, getattr(context_proto, name, None))) - - def to_runner_api(self): - context_proto = beam_runner_api_pb2.Components() - for name, cls in self._COMPONENT_TYPES: - getattr(self, name).populate_map(getattr(context_proto, name)) - return context_proto http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/runners/pipeline_context.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/pipeline_context.py b/sdks/python/apache_beam/runners/pipeline_context.py new file mode 100644 index 0000000..4f82774 --- /dev/null +++ b/sdks/python/apache_beam/runners/pipeline_context.py @@ -0,0 +1,88 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from apache_beam import pipeline +from apache_beam import pvalue +from apache_beam import coders +from apache_beam.runners.api import beam_runner_api_pb2 +from apache_beam.transforms import core + + +class _PipelineContextMap(object): + """This is a bi-directional map between objects and ids. + + Under the hood it encodes and decodes these objects into runner API + representations. + """ + def __init__(self, context, obj_type, proto_map=None): + self._pipeline_context = context + self._obj_type = obj_type + self._obj_to_id = {} + self._id_to_obj = {} + self._id_to_proto = proto_map if proto_map else {} + self._counter = 0 + + def _unique_ref(self): + self._counter += 1 + return "ref_%s_%s" % (self._obj_type.__name__, self._counter) + + def populate_map(self, proto_map): + for id, proto in self._id_to_proto.items(): + proto_map[id].CopyFrom(proto) + + def get_id(self, obj): + if obj not in self._obj_to_id: + id = self._unique_ref() + self._id_to_obj[id] = obj + self._obj_to_id[obj] = id + self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context) + return self._obj_to_id[obj] + + def get_by_id(self, id): + if id not in self._id_to_obj: + self._id_to_obj[id] = self._obj_type.from_runner_api( + self._id_to_proto[id], self._pipeline_context) + return self._id_to_obj[id] + + +class PipelineContext(object): + """Used for accessing and constructing the referenced objects of a Pipeline. + """ + + _COMPONENT_TYPES = { + 'transforms': pipeline.AppliedPTransform, + 'pcollections': pvalue.PCollection, + 'coders': coders.Coder, + 'windowing_strategies': core.Windowing, + # TODO: environment + } + + def __init__(self, context_proto=None): + for name, cls in self._COMPONENT_TYPES.items(): + setattr( + self, name, _PipelineContextMap( + self, cls, getattr(context_proto, name, None))) + + @staticmethod + def from_runner_api(proto): + return PipelineContext(proto) + + def to_runner_api(self): + context_proto = beam_runner_api_pb2.Components() + for name in self._COMPONENT_TYPES: + getattr(self, name).populate_map(getattr(context_proto, name)) + return context_proto http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/runners/pipeline_context_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/pipeline_context_test.py b/sdks/python/apache_beam/runners/pipeline_context_test.py new file mode 100644 index 0000000..6091ed8 --- /dev/null +++ b/sdks/python/apache_beam/runners/pipeline_context_test.py @@ -0,0 +1,49 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for the windowing classes.""" + +import unittest + +from apache_beam import coders +from apache_beam.runners import pipeline_context + + +class PipelineContextTest(unittest.TestCase): + + def test_deduplication(self): + context = pipeline_context.PipelineContext() + bytes_coder_ref = context.coders.get_id(coders.BytesCoder()) + bytes_coder_ref2 = context.coders.get_id(coders.BytesCoder()) + self.assertEqual(bytes_coder_ref, bytes_coder_ref2) + + def test_serialization(self): + context = pipeline_context.PipelineContext() + float_coder_ref = context.coders.get_id(coders.FloatCoder()) + bytes_coder_ref = context.coders.get_id(coders.BytesCoder()) + proto = context.to_runner_api() + context2 = pipeline_context.PipelineContext.from_runner_api(proto) + self.assertEqual( + coders.FloatCoder(), + context2.coders.get_by_id(float_coder_ref)) + self.assertEqual( + coders.BytesCoder(), + context2.coders.get_by_id(bytes_coder_ref)) + + +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/transforms/core.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 1fc63b2..3251671 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1235,6 +1235,7 @@ class Windowing(object): trigger=self.triggerfn.to_runner_api(context), accumulation_mode=self.accumulation_mode, output_time=self.output_time_fn, + # TODO(robertwb): Support EMIT_IF_NONEMPTY closing_behavior=beam_runner_api_pb2.EMIT_ALWAYS, allowed_lateness=0) http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/transforms/trigger_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py index cc9e0f5..827aa33 100644 --- a/sdks/python/apache_beam/transforms/trigger_test.py +++ b/sdks/python/apache_beam/transforms/trigger_test.py @@ -25,6 +25,7 @@ import unittest import yaml import apache_beam as beam +from apache_beam.runners import pipeline_context from apache_beam.test_pipeline import TestPipeline from apache_beam.transforms import trigger from apache_beam.transforms.core import Windowing @@ -392,22 +393,7 @@ class RunnerApiTest(unittest.TestCase): AfterWatermark(early=AfterCount(1000), late=AfterCount(1)), Repeatedly(AfterCount(100)), trigger.OrFinally(AfterCount(3), AfterCount(10))): - context = beam.pipeline.PipelineContext() - self.assertEqual( - trigger_fn, - TriggerFn.from_runner_api(trigger_fn.to_runner_api(context), context)) - - def test_windowing_strategy_encoding(self): - for trigger_fn in ( - DefaultTrigger(), - AfterAll(AfterCount(1), AfterCount(10)), - AfterFirst(AfterCount(10), AfterCount(100)), - AfterEach(AfterCount(100), AfterCount(1000)), - AfterWatermark(early=AfterCount(1000)), - AfterWatermark(early=AfterCount(1000), late=AfterCount(1)), - Repeatedly(AfterCount(100)), - trigger.OrFinally(AfterCount(3), AfterCount(10))): - context = beam.pipeline.PipelineContext() + context = pipeline_context.PipelineContext() self.assertEqual( trigger_fn, TriggerFn.from_runner_api(trigger_fn.to_runner_api(context), context)) http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/transforms/window.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index c763a96..3878dff 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -73,6 +73,7 @@ class OutputTimeFn(object): OUTPUT_AT_EOW = beam_runner_api_pb2.END_OF_WINDOW OUTPUT_AT_EARLIEST = beam_runner_api_pb2.EARLIEST_IN_PANE OUTPUT_AT_LATEST = beam_runner_api_pb2.LATEST_IN_PANE + # TODO(robertwb): Add this to the runner API or remove it. OUTPUT_AT_EARLIEST_TRANSFORMED = 'OUTPUT_AT_EARLIEST_TRANSFORMED' @staticmethod @@ -167,7 +168,6 @@ class WindowFn(object): return pickler.loads(fn_parameter.value) def to_runner_api_parameter(self, context): - raise TypeError(self) return (urns.PICKLED_WINDOW_FN, wrappers_pb2.BytesValue(value=pickler.dumps(self))) http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/transforms/window_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py index c79739a..99be02c 100644 --- a/sdks/python/apache_beam/transforms/window_test.py +++ b/sdks/python/apache_beam/transforms/window_test.py @@ -19,7 +19,7 @@ import unittest -from apache_beam import pipeline +from apache_beam.runners import pipeline_context from apache_beam.test_pipeline import TestPipeline from apache_beam.transforms import CombinePerKey from apache_beam.transforms import combiners @@ -238,7 +238,7 @@ class RunnerApiTest(unittest.TestCase): FixedWindows(37), SlidingWindows(2, 389), Sessions(5077)): - context = pipeline.PipelineContext() + context = pipeline_context.PipelineContext() self.assertEqual( window_fn, WindowFn.from_runner_api(window_fn.to_runner_api(context), context)) @@ -251,7 +251,7 @@ class RunnerApiTest(unittest.TestCase): Windowing(SlidingWindows(10, 15, 21), AfterCount(28), output_time_fn=OutputTimeFn.OUTPUT_AT_LATEST, accumulation_mode=AccumulationMode.DISCARDING)): - context = pipeline.PipelineContext() + context = pipeline_context.PipelineContext() self.assertEqual( windowing, Windowing.from_runner_api(windowing.to_runner_api(context), context)) http://git-wip-us.apache.org/repos/asf/beam/blob/deff128f/sdks/python/apache_beam/utils/urns.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py index 186c99c..936e2cb 100644 --- a/sdks/python/apache_beam/utils/urns.py +++ b/sdks/python/apache_beam/utils/urns.py @@ -21,4 +21,4 @@ FIXED_WINDOWS_FN = "beam:window_fn:fixed_windows:v0.1" SLIDING_WINDOWS_FN = "beam:window_fn:sliding_windows:v0.1" SESSION_WINDOWS_FN = "beam:window_fn:session_windows:v0.1" -PICKLED_CODER = "dataflow:coder:pickled_python:v0.1" +PICKLED_CODER = "beam:coder:pickled_python:v0.1"