Repository: incubator-beam Updated Branches: refs/heads/python-sdk ea642428f -> ec00c530c
Optimize WindowedValueCoder Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0c90fb80 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0c90fb80 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0c90fb80 Branch: refs/heads/python-sdk Commit: 0c90fb80f3961848aa82667e8891cfebf4dbc351 Parents: ea64242 Author: Robert Bradshaw <rober...@google.com> Authored: Tue Nov 1 16:12:19 2016 -0700 Committer: Robert Bradshaw <rober...@google.com> Committed: Wed Nov 9 13:26:44 2016 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/coders/coder_impl.pxd | 4 ++++ sdks/python/apache_beam/coders/coder_impl.py | 21 ++++++++++++++------ .../apache_beam/coders/coders_test_common.py | 8 ++------ 3 files changed, 21 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0c90fb80/sdks/python/apache_beam/coders/coder_impl.pxd ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd b/sdks/python/apache_beam/coders/coder_impl.pxd index e021c2e..74444ff 100644 --- a/sdks/python/apache_beam/coders/coder_impl.pxd +++ b/sdks/python/apache_beam/coders/coder_impl.pxd @@ -26,6 +26,7 @@ cimport libc.stdlib cimport libc.string from .stream cimport InputStream, OutputStream +from apache_beam.utils cimport windowed_value cdef object loads, dumps, create_InputStream, create_OutputStream, ByteCountingOutputStream, get_varint_size @@ -137,3 +138,6 @@ cdef class WindowedValueCoderImpl(StreamCoderImpl): @cython.locals(c=CoderImpl) cpdef get_estimated_size_and_observables(self, value, bint nested=?) + + @cython.locals(wv=windowed_value.WindowedValue) + cpdef encode_to_stream(self, value, OutputStream stream, bint nested) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0c90fb80/sdks/python/apache_beam/coders/coder_impl.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index d075814..47a837f 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -29,7 +29,7 @@ from types import NoneType from apache_beam.coders import observable from apache_beam.utils.timestamp import Timestamp -from apache_beam.utils.windowed_value import WindowedValue +from apache_beam.utils import windowed_value # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports try: @@ -535,19 +535,28 @@ class WindowedValueCoderImpl(StreamCoderImpl): """A coder for windowed values.""" def __init__(self, value_coder, timestamp_coder, window_coder): + # TODO(robertwb): Do we need the ability to customize timestamp_coder? self._value_coder = value_coder self._timestamp_coder = timestamp_coder self._windows_coder = TupleSequenceCoderImpl(window_coder) def encode_to_stream(self, value, out, nested): - self._value_coder.encode_to_stream(value.value, out, True) - self._timestamp_coder.encode_to_stream(value.timestamp, out, True) - self._windows_coder.encode_to_stream(value.windows, out, True) + wv = value # type cast + self._value_coder.encode_to_stream(wv.value, out, True) + if isinstance(self._timestamp_coder, TimestampCoderImpl): + # Avoid creation of Timestamp object. + out.write_bigendian_int64(wv.timestamp_micros) + else: + self._timestamp_coder.encode_to_stream(wv.timestamp, out, True) + self._windows_coder.encode_to_stream(wv.windows, out, True) def decode_from_stream(self, in_stream, nested): - return WindowedValue( + return windowed_value.create( self._value_coder.decode_from_stream(in_stream, True), - self._timestamp_coder.decode_from_stream(in_stream, True), + # Avoid creation of Timestamp object. + in_stream.read_bigendian_int64() + if isinstance(self._timestamp_coder, TimestampCoderImpl) + else self._timestamp_coder.decode_from_stream(in_stream, True).micros, self._windows_coder.decode_from_stream(in_stream, True)) def get_estimated_size_and_observables(self, value, nested=False): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0c90fb80/sdks/python/apache_beam/coders/coders_test_common.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index 1af8347..adeb6a5 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -26,6 +26,7 @@ import dill import coders import observable from apache_beam.utils import timestamp +from apache_beam.utils import windowed_value from apache_beam.coders import proto2_coder_test_messages_pb2 as test_message @@ -237,12 +238,7 @@ class CodersTest(unittest.TestCase): # Test nested WindowedValue observable. coder = coders.WindowedValueCoder(iter_coder) observ = FakeObservableIterator() - try: - value = coders.coder_impl.WindowedValue(observ) - except TypeError: - # We are running tests with a fake WindowedValue implementation so as to - # not pull in the rest of the SDK. - value = coders.coder_impl.WindowedValue(observ, 0, []) + value = windowed_value.WindowedValue(observ, 0, ()) self.assertEqual( coder.get_impl().get_estimated_size_and_observables(value)[1], [(observ, elem_coder.get_impl())])