Fix and add test for ReadFromAvro transform.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8bc965b6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8bc965b6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8bc965b6 Branch: refs/heads/python-sdk Commit: 8bc965b61becf5f92bcd3ff4468fa53fde5b6e6b Parents: f9c565b Author: Robert Bradshaw <rober...@gmail.com> Authored: Sat Sep 24 01:26:31 2016 -0700 Committer: Robert Bradshaw <rober...@google.com> Committed: Mon Sep 26 12:17:35 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/avroio.py | 5 ++--- sdks/python/apache_beam/io/avroio_test.py | 8 ++++++++ 2 files changed, 10 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8bc965b6/sdks/python/apache_beam/io/avroio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py index 3415d22..82b30be 100644 --- a/sdks/python/apache_beam/io/avroio.py +++ b/sdks/python/apache_beam/io/avroio.py @@ -34,8 +34,7 @@ __all__ = ['ReadFromAvro'] class ReadFromAvro(PTransform): """A ``PTransform`` for reading avro files.""" - def __init__(self, label=None, file_pattern=None, min_bundle_size=0, - **kwargs): + def __init__(self, file_pattern=None, min_bundle_size=0): """Initializes ``ReadFromAvro``. Uses source '_AvroSource' to read a set of Avro files defined by a given @@ -70,7 +69,7 @@ class ReadFromAvro(PTransform): splitting the input into bundles. **kwargs: Additional keyword arguments to be passed to the base class. """ - super(ReadFromAvro, self).__init__(label, **kwargs) + super(ReadFromAvro, self).__init__() self._file_pattern = file_pattern self._min_bundle_size = min_bundle_size http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8bc965b6/sdks/python/apache_beam/io/avroio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py index c21ed57..e0c211f 100644 --- a/sdks/python/apache_beam/io/avroio_test.py +++ b/sdks/python/apache_beam/io/avroio_test.py @@ -20,8 +20,12 @@ import os import tempfile import unittest +import apache_beam as beam +from apache_beam.io import avroio from apache_beam.io import filebasedsource from apache_beam.io import source_test_utils +from apache_beam.transforms.util import assert_that +from apache_beam.transforms.util import equal_to # Importing following private class for testing purposes. from apache_beam.io.avroio import _AvroSource as AvroSource @@ -223,6 +227,10 @@ class TestAvro(unittest.TestCase): source_test_utils.readFromSource(source, None, None) self.assertEqual(0, exn.exception.message.find('Unexpected sync marker')) + def test_pipeline(self): + path = self._write_data() + with beam.Pipeline('DirectPipelineRunner') as p: + assert_that(p | avroio.ReadFromAvro(path), equal_to(self.RECORDS)) if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO)