A few improvements to Apache Beam Python's FileIO. - Ensuring that AUTO compression works properly for FileSinks. - Introducing __enter__ and __exit__ in _CompressedFile to allow use of "with", and updating textio accordingly.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e85f67a1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e85f67a1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e85f67a1 Branch: refs/heads/python-sdk Commit: e85f67a1a467a26259a849bd20c42e89f165828e Parents: c1440f7 Author: Gus Katsiapis <katsia...@katsiapis-linux.mtv.corp.google.com> Authored: Fri Nov 18 18:31:20 2016 -0800 Committer: Luke Cwik <lc...@google.com> Committed: Mon Nov 21 11:29:07 2016 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/io/fileio.py | 10 +++++- sdks/python/apache_beam/io/fileio_test.py | 48 +++++++++++++++++++++++--- sdks/python/apache_beam/io/textio.py | 6 +--- sdks/python/apache_beam/io/textio_test.py | 26 ++++++++++++++ 4 files changed, 80 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e85f67a1/sdks/python/apache_beam/io/fileio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index 4d0eea6..1dcd622 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -749,6 +749,12 @@ class _CompressedFile(object): def seekable(self): return False + def __enter__(self): + return self + + def __exit__(self, exception_type, exception_value, traceback): + self.close() + class FileSink(iobase.Sink): """A sink to a GCS or local files. @@ -855,7 +861,9 @@ class FileSink(iobase.Sink): return tmp_dir def open_writer(self, init_result, uid): - return FileSinkWriter(self, os.path.join(init_result, uid)) + # A proper suffix is needed for AUTO compression detection. + suffix = os.path.basename(self.file_path_prefix) + self.file_name_suffix + return FileSinkWriter(self, os.path.join(init_result, uid) + suffix) def finalize_write(self, init_result, writer_results): writer_results = sorted(writer_results) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e85f67a1/sdks/python/apache_beam/io/fileio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py index 9d1e424..098ace1 100644 --- a/sdks/python/apache_beam/io/fileio_test.py +++ b/sdks/python/apache_beam/io/fileio_test.py @@ -38,10 +38,7 @@ from apache_beam.transforms.display import DisplayData from apache_beam.transforms.display_test import DisplayDataItemMatcher # TODO: Add tests for file patterns (ie not just individual files) for both -# uncompressed - -# TODO: Update code to not use NamedTemporaryFile (or to use it in a way that -# doesn't violate its assumptions). +# compressed and uncompressed files. class TestTextFileSource(unittest.TestCase): @@ -721,6 +718,49 @@ class TestNativeTextFileSink(unittest.TestCase): with bz2.BZ2File(self.path, 'r') as f: self.assertEqual(f.read().splitlines(), []) + def test_write_dataflow(self): + pipeline = beam.Pipeline('DirectPipelineRunner') + pcoll = pipeline | beam.core.Create('Create', self.lines) + pcoll | 'Write' >> beam.Write(fileio.NativeTextFileSink(self.path)) # pylint: disable=expression-not-assigned + pipeline.run() + + read_result = [] + for file_name in glob.glob(self.path + '*'): + with open(file_name, 'r') as f: + read_result.extend(f.read().splitlines()) + + self.assertEqual(read_result, self.lines) + + def test_write_dataflow_auto_compression(self): + pipeline = beam.Pipeline('DirectPipelineRunner') + pcoll = pipeline | beam.core.Create('Create', self.lines) + pcoll | 'Write' >> beam.Write( # pylint: disable=expression-not-assigned + fileio.NativeTextFileSink( + self.path, file_name_suffix='.gz')) + pipeline.run() + + read_result = [] + for file_name in glob.glob(self.path + '*'): + with gzip.GzipFile(file_name, 'r') as f: + read_result.extend(f.read().splitlines()) + + self.assertEqual(read_result, self.lines) + + def test_write_dataflow_auto_compression_unsharded(self): + pipeline = beam.Pipeline('DirectPipelineRunner') + pcoll = pipeline | beam.core.Create('Create', self.lines) + pcoll | 'Write' >> beam.Write( # pylint: disable=expression-not-assigned + fileio.NativeTextFileSink( + self.path + '.gz', shard_name_template='')) + pipeline.run() + + read_result = [] + for file_name in glob.glob(self.path + '*'): + with gzip.GzipFile(file_name, 'r') as f: + read_result.extend(f.read().splitlines()) + + self.assertEqual(read_result, self.lines) + class MyFileSink(fileio.FileSink): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e85f67a1/sdks/python/apache_beam/io/textio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py index 9c89b68..bb664d1 100644 --- a/sdks/python/apache_beam/io/textio.py +++ b/sdks/python/apache_beam/io/textio.py @@ -85,11 +85,9 @@ class _TextSource(filebasedsource.FileBasedSource): def read_records(self, file_name, range_tracker): start_offset = range_tracker.start_position() - read_buffer = _TextSource.ReadBuffer('', 0) - file_to_read = self.open_file(file_name) - try: + with self.open_file(file_name) as file_to_read: if start_offset > 0: # Seeking to one position before the start index and ignoring the # current line. If start_position is at beginning if the line, that line @@ -116,8 +114,6 @@ class _TextSource(filebasedsource.FileBasedSource): if num_bytes_to_next_record < 0: break next_record_start_position += num_bytes_to_next_record - finally: - file_to_read.close() def _find_separator_bounds(self, file_to_read, read_buffer): # Determines the start and end positions within 'read_buffer.data' of the http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e85f67a1/sdks/python/apache_beam/io/textio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py index acdac47..b1d3fb0 100644 --- a/sdks/python/apache_beam/io/textio_test.py +++ b/sdks/python/apache_beam/io/textio_test.py @@ -491,6 +491,32 @@ class TextSinkTest(unittest.TestCase): self.assertEqual(read_result, self.lines) + def test_write_dataflow_auto_compression(self): + pipeline = beam.Pipeline('DirectPipelineRunner') + pcoll = pipeline | beam.core.Create('Create', self.lines) + pcoll | 'Write' >> WriteToText(self.path, file_name_suffix='.gz') # pylint: disable=expression-not-assigned + pipeline.run() + + read_result = [] + for file_name in glob.glob(self.path + '*'): + with gzip.GzipFile(file_name, 'r') as f: + read_result.extend(f.read().splitlines()) + + self.assertEqual(read_result, self.lines) + + def test_write_dataflow_auto_compression_unsharded(self): + pipeline = beam.Pipeline('DirectPipelineRunner') + pcoll = pipeline | beam.core.Create('Create', self.lines) + pcoll | 'Write' >> WriteToText(self.path + '.gz', shard_name_template='') # pylint: disable=expression-not-assigned + pipeline.run() + + read_result = [] + for file_name in glob.glob(self.path + '*'): + with gzip.GzipFile(file_name, 'r') as f: + read_result.extend(f.read().splitlines()) + + self.assertEqual(read_result, self.lines) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO)