Repository: incubator-beam Updated Branches: refs/heads/python-sdk 26ff65795 -> 53ab635c7
Make TextFileReader observable This allows future implementation of size tracking for elements in side input sources. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2f4054ba Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2f4054ba Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2f4054ba Branch: refs/heads/python-sdk Commit: 2f4054ba37da1c1100f45a572d96e7a6e2e60152 Parents: 26ff657 Author: Charles Chen <c...@google.com> Authored: Mon Jul 25 11:44:22 2016 -0700 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Thu Jul 28 11:05:55 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/fileio.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2f4054ba/sdks/python/apache_beam/io/fileio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index 3afaae8..b1e091b 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -745,10 +745,12 @@ class NativeTextFileSink(iobase.NativeSink): # TextFileReader, TextMultiFileReader. -class TextFileReader(iobase.NativeSourceReader): +class TextFileReader(iobase.NativeSourceReader, + coders.observable.ObservableMixin): """A reader for a text file source.""" def __init__(self, source): + super(TextFileReader, self).__init__() self.source = source self.start_offset = self.source.start_offset or 0 self.end_offset = self.source.end_offset @@ -778,6 +780,7 @@ class TextFileReader(iobase.NativeSourceReader): self._file.seek(self.start_offset - 1) self.current_offset -= 1 line = self._file.readline() + self.notify_observers(line, is_encoded=True) self.current_offset += len(line) else: self._file.seek(self.start_offset) @@ -801,6 +804,7 @@ class TextFileReader(iobase.NativeSourceReader): # a dynamic split request from the service. return line = self._file.readline() + self.notify_observers(line, is_encoded=True) self.current_offset += len(line) if self.source.strip_trailing_newlines: line = line.rstrip('\n')