Remove direct references to iobase.Native*
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/60e271b5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/60e271b5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/60e271b5 Branch: refs/heads/python-sdk Commit: 60e271b5fe6e42f241b20554ddafd410e87735eb Parents: 807013a Author: Robert Bradshaw <rober...@google.com> Authored: Thu Oct 6 17:50:41 2016 -0700 Committer: Robert Bradshaw <rober...@google.com> Committed: Mon Oct 10 10:30:00 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/bigquery.py | 9 ++-- sdks/python/apache_beam/io/fileio.py | 17 +++--- sdks/python/apache_beam/io/fileio_test.py | 73 ++++++++++++++------------ sdks/python/apache_beam/io/iobase.py | 8 +-- sdks/python/apache_beam/io/pubsub.py | 5 +- sdks/python/apache_beam/pipeline_test.py | 2 +- 6 files changed, 60 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/60e271b5/sdks/python/apache_beam/io/bigquery.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py index 5508eaa..60d85df 100644 --- a/sdks/python/apache_beam/io/bigquery.py +++ b/sdks/python/apache_beam/io/bigquery.py @@ -117,6 +117,7 @@ from apache_beam.internal import auth from apache_beam.internal.json_value import from_json_value from apache_beam.internal.json_value import to_json_value from apache_beam.io import iobase +from apache_beam.runners.dataflow.native_io import iobase as dataflow_io from apache_beam.utils import retry from apache_beam.utils.options import GoogleCloudOptions @@ -280,7 +281,7 @@ def _parse_table_reference(table, dataset=None, project=None): # BigQuerySource, BigQuerySink. -class BigQuerySource(iobase.NativeSource): +class BigQuerySource(dataflow_io.NativeSource): """A source based on a BigQuery table.""" def __init__(self, table=None, dataset=None, project=None, query=None, @@ -345,7 +346,7 @@ class BigQuerySource(iobase.NativeSource): source=self, test_bigquery_client=test_bigquery_client) -class BigQuerySink(iobase.NativeSink): +class BigQuerySink(dataflow_io.NativeSink): """A sink based on a BigQuery table.""" def __init__(self, table, dataset=None, project=None, schema=None, @@ -459,7 +460,7 @@ class BigQuerySink(iobase.NativeSink): # BigQueryReader, BigQueryWriter. -class BigQueryReader(iobase.NativeSourceReader): +class BigQueryReader(dataflow_io.NativeSourceReader): """A reader for a BigQuery source.""" def __init__(self, source, test_bigquery_client=None): @@ -516,7 +517,7 @@ class BigQueryReader(iobase.NativeSourceReader): yield row -class BigQueryWriter(iobase.NativeSinkWriter): +class BigQueryWriter(dataflow_io.NativeSinkWriter): """The sink writer for a BigQuerySink.""" def __init__(self, sink, test_bigquery_client=None, buffer_size=None): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/60e271b5/sdks/python/apache_beam/io/fileio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index 54ca891..c248f12 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -32,6 +32,7 @@ import weakref from apache_beam import coders from apache_beam.io import iobase from apache_beam.io import range_trackers +from apache_beam.runners.dataflow.native_io import iobase as dataflow_io __all__ = ['TextFileSource', 'TextFileSink'] @@ -106,7 +107,7 @@ class CompressionTypes(object): return cls.UNCOMPRESSED -class NativeFileSource(iobase.NativeSource): +class NativeFileSource(dataflow_io.NativeSource): """A source implemented by Dataflow service from a GCS or local file or files. This class is to be only inherited by sources natively implemented by Cloud @@ -185,7 +186,7 @@ class NativeFileSource(iobase.NativeSource): return NativeFileSourceReader(self) -class NativeFileSourceReader(iobase.NativeSourceReader, +class NativeFileSourceReader(dataflow_io.NativeSourceReader, coders.observable.ObservableMixin): """The source reader for a NativeFileSource. @@ -302,7 +303,7 @@ class NativeFileSourceReader(iobase.NativeSourceReader, raise NotImplementedError def get_progress(self): - return iobase.ReaderProgress(position=iobase.ReaderPosition( + return dataflow_io.ReaderProgress(position=dataflow_io.ReaderPosition( byte_offset=self.range_tracker.last_record_start)) def request_dynamic_split(self, dynamic_split_request): @@ -328,7 +329,7 @@ class NativeFileSourceReader(iobase.NativeSourceReader, 'of work to be completed is out of the valid range (0, ' '1). Requested: %r', dynamic_split_request) return - split_position = iobase.ReaderPosition() + split_position = dataflow_io.ReaderPosition() split_position.byte_offset = ( self.range_tracker.position_at_fraction(percent_complete)) else: @@ -339,7 +340,7 @@ class NativeFileSourceReader(iobase.NativeSourceReader, return if self.range_tracker.try_split(split_position.byte_offset): - return iobase.DynamicSplitResultWithPosition(split_position) + return dataflow_io.DynamicSplitResultWithPosition(split_position) else: return @@ -964,7 +965,7 @@ class TextFileSink(FileSink): file_handle.write('\n') -class NativeFileSink(iobase.NativeSink): +class NativeFileSink(dataflow_io.NativeSink): """A sink implemented by Dataflow service to a GCS or local file or files. This class is to be only inherited by sinks natively implemented by Cloud @@ -1021,7 +1022,7 @@ class NativeFileSink(iobase.NativeSink): self.compression_type == other.compression_type) -class NativeFileSinkWriter(iobase.NativeSinkWriter): +class NativeFileSinkWriter(dataflow_io.NativeSinkWriter): """The sink writer for a NativeFileSink. This class is to be only inherited by sink writers natively implemented by @@ -1127,7 +1128,7 @@ class TextFileReader(NativeFileSourceReader): yield False, self.source.coder.decode(line), delta_offset -class TextMultiFileReader(iobase.NativeSourceReader): +class TextMultiFileReader(dataflow_io.NativeSourceReader): """A reader for a multi-file text source.""" def __init__(self, source): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/60e271b5/sdks/python/apache_beam/io/fileio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py index 7da0149..77d6c45 100644 --- a/sdks/python/apache_beam/io/fileio_test.py +++ b/sdks/python/apache_beam/io/fileio_test.py @@ -30,6 +30,7 @@ import apache_beam as beam from apache_beam import coders from apache_beam.io import fileio from apache_beam.io import iobase +from apache_beam.runners.dataflow.native_io import iobase as dataflow_io # TODO: Add tests for file patterns (ie not just individual files) for both # uncompressed @@ -316,7 +317,7 @@ class TestTextFileSource(unittest.TestCase): else: self.assertIsNotNone(actual_response.stop_position) self.assertIsInstance(actual_response.stop_position, - iobase.ReaderPosition) + dataflow_io.ReaderPosition) self.assertIsNotNone(actual_response.stop_position.byte_offset) self.assertEqual(expected_response.stop_position.byte_offset, actual_response.stop_position.byte_offset) @@ -338,8 +339,8 @@ class TestTextFileSource(unittest.TestCase): for percent_complete in percents_complete: self.try_splitting_reader_at( reader, - iobase.DynamicSplitRequest( - iobase.ReaderProgress(percent_complete=percent_complete)), + dataflow_io.DynamicSplitRequest( + dataflow_io.ReaderProgress(percent_complete=percent_complete)), None) # Cursor passed beginning of file. @@ -349,8 +350,8 @@ class TestTextFileSource(unittest.TestCase): for percent_complete in percents_complete: self.try_splitting_reader_at( reader, - iobase.DynamicSplitRequest( - iobase.ReaderProgress(percent_complete=percent_complete)), + dataflow_io.DynamicSplitRequest( + dataflow_io.ReaderProgress(percent_complete=percent_complete)), None) def test_zlib_file_unsplittable(self): @@ -368,8 +369,8 @@ class TestTextFileSource(unittest.TestCase): for percent_complete in percents_complete: self.try_splitting_reader_at( reader, - iobase.DynamicSplitRequest( - iobase.ReaderProgress(percent_complete=percent_complete)), + dataflow_io.DynamicSplitRequest( + dataflow_io.ReaderProgress(percent_complete=percent_complete)), None) # Cursor passed beginning of file. @@ -379,8 +380,8 @@ class TestTextFileSource(unittest.TestCase): for percent_complete in percents_complete: self.try_splitting_reader_at( reader, - iobase.DynamicSplitRequest( - iobase.ReaderProgress(percent_complete=percent_complete)), + dataflow_io.DynamicSplitRequest( + dataflow_io.ReaderProgress(percent_complete=percent_complete)), None) def test_update_stop_position_for_percent_complete(self): @@ -397,33 +398,35 @@ class TestTextFileSource(unittest.TestCase): # Splitting at end of the range should be unsuccessful self.try_splitting_reader_at( reader, - iobase.DynamicSplitRequest(iobase.ReaderProgress(percent_complete=0)), + dataflow_io.DynamicSplitRequest( + dataflow_io.ReaderProgress(percent_complete=0)), None) self.try_splitting_reader_at( reader, - iobase.DynamicSplitRequest(iobase.ReaderProgress(percent_complete=1)), + dataflow_io.DynamicSplitRequest( + dataflow_io.ReaderProgress(percent_complete=1)), None) # Splitting at positions on or before start offset of the last record self.try_splitting_reader_at( reader, - iobase.DynamicSplitRequest( - iobase.ReaderProgress(percent_complete=0.2)), + dataflow_io.DynamicSplitRequest( + dataflow_io.ReaderProgress(percent_complete=0.2)), None) self.try_splitting_reader_at( reader, - iobase.DynamicSplitRequest( - iobase.ReaderProgress(percent_complete=0.4)), + dataflow_io.DynamicSplitRequest( + dataflow_io.ReaderProgress(percent_complete=0.4)), None) # Splitting at a position after the start offset of the last record should # be successful self.try_splitting_reader_at( reader, - iobase.DynamicSplitRequest( - iobase.ReaderProgress(percent_complete=0.6)), - iobase.DynamicSplitResultWithPosition( - iobase.ReaderPosition(byte_offset=15))) + dataflow_io.DynamicSplitRequest( + dataflow_io.ReaderProgress(percent_complete=0.6)), + dataflow_io.DynamicSplitResultWithPosition( + dataflow_io.ReaderPosition(byte_offset=15))) def test_update_stop_position_percent_complete_for_position(self): lines = ['aaaa', 'bbbb', 'cccc', 'dddd', 'eeee'] @@ -439,28 +442,28 @@ class TestTextFileSource(unittest.TestCase): # Splitting at end of the range should be unsuccessful self.try_splitting_reader_at( reader, - iobase.DynamicSplitRequest( - iobase.ReaderProgress(position=iobase.ReaderPosition( + dataflow_io.DynamicSplitRequest( + dataflow_io.ReaderProgress(position=dataflow_io.ReaderPosition( byte_offset=0))), None) self.try_splitting_reader_at( reader, - iobase.DynamicSplitRequest( - iobase.ReaderProgress(position=iobase.ReaderPosition( + dataflow_io.DynamicSplitRequest( + dataflow_io.ReaderProgress(position=dataflow_io.ReaderPosition( byte_offset=25))), None) # Splitting at positions on or before start offset of the last record self.try_splitting_reader_at( reader, - iobase.DynamicSplitRequest( - iobase.ReaderProgress(position=iobase.ReaderPosition( + dataflow_io.DynamicSplitRequest( + dataflow_io.ReaderProgress(position=dataflow_io.ReaderPosition( byte_offset=5))), None) self.try_splitting_reader_at( reader, - iobase.DynamicSplitRequest( - iobase.ReaderProgress(position=iobase.ReaderPosition( + dataflow_io.DynamicSplitRequest( + dataflow_io.ReaderProgress(position=dataflow_io.ReaderPosition( byte_offset=10))), None) @@ -468,11 +471,11 @@ class TestTextFileSource(unittest.TestCase): # be successful self.try_splitting_reader_at( reader, - iobase.DynamicSplitRequest( - iobase.ReaderProgress(position=iobase.ReaderPosition( + dataflow_io.DynamicSplitRequest( + dataflow_io.ReaderProgress(position=dataflow_io.ReaderPosition( byte_offset=15))), - iobase.DynamicSplitResultWithPosition( - iobase.ReaderPosition(byte_offset=15))) + dataflow_io.DynamicSplitResultWithPosition( + dataflow_io.ReaderPosition(byte_offset=15))) def run_update_stop_position_exhaustive(self, lines, newline): """An exhaustive test for dynamic splitting. @@ -551,13 +554,13 @@ class TestTextFileSource(unittest.TestCase): elif records_to_read == 0: expected_split_response = None # unstarted else: - expected_split_response = iobase.DynamicSplitResultWithPosition( - stop_position=iobase.ReaderPosition(byte_offset=stop_offset)) + expected_split_response = dataflow_io.DynamicSplitResultWithPosition( + stop_position=dataflow_io.ReaderPosition(byte_offset=stop_offset)) split_response = self.try_splitting_reader_at( reader, - iobase.DynamicSplitRequest(progress=iobase.ReaderProgress( - iobase.ReaderPosition(byte_offset=stop_offset))), + dataflow_io.DynamicSplitRequest(progress=dataflow_io.ReaderProgress( + dataflow_io.ReaderPosition(byte_offset=stop_offset))), expected_split_response) # Reading remaining records from the updated reader. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/60e271b5/sdks/python/apache_beam/io/iobase.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index f070b39..ac20732 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -43,7 +43,7 @@ from apache_beam.transforms import core from apache_beam.transforms import ptransform from apache_beam.transforms import window -from apache_beam.runners.dataflow.native_io.iobase import * +#from apache_beam.runners.dataflow.native_io.iobase import * # Encapsulates information about a bundle of a source generated when method @@ -704,10 +704,10 @@ class Write(ptransform.PTransform): self.sink = sink def apply(self, pcoll): - from apache_beam.runners.dataflow.native_io import iobase as native_iobase - if isinstance(self.sink, native_iobase.NativeSink): + from apache_beam.runners.dataflow.native_io import iobase as dataflow_io + if isinstance(self.sink, dataflow_io.NativeSink): # A native sink - return pcoll | 'native_write' >> native_iobase._NativeWrite(self.sink) + return pcoll | 'native_write' >> dataflow_io._NativeWrite(self.sink) elif isinstance(self.sink, Sink): # A custom sink return pcoll | WriteImpl(self.sink) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/60e271b5/sdks/python/apache_beam/io/pubsub.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/pubsub.py b/sdks/python/apache_beam/io/pubsub.py index a55e7cd..1f5989a 100644 --- a/sdks/python/apache_beam/io/pubsub.py +++ b/sdks/python/apache_beam/io/pubsub.py @@ -24,9 +24,10 @@ from __future__ import absolute_import from apache_beam import coders from apache_beam.io import iobase +from apache_beam.runners.dataflow.native_io import iobase as dataflow_io -class PubSubSource(iobase.NativeSource): +class PubSubSource(dataflow_io.NativeSource): """Source for reading from a given Cloud Pub/Sub topic. Attributes: @@ -59,7 +60,7 @@ class PubSubSource(iobase.NativeSource): 'PubSubSource is not supported in local execution.') -class PubSubSink(iobase.NativeSink): +class PubSubSink(dataflow_io.NativeSink): """Sink for writing to a given Cloud Pub/Sub topic.""" def __init__(self, topic, coder=coders.StrUtf8Coder()): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/60e271b5/sdks/python/apache_beam/pipeline_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 8a0d246..8c1b3ba 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -21,12 +21,12 @@ import gc import logging import unittest -from apache_beam.io.iobase import NativeSource from apache_beam.pipeline import Pipeline from apache_beam.pipeline import PipelineOptions from apache_beam.pipeline import PipelineVisitor from apache_beam.pvalue import AsIter from apache_beam.pvalue import SideOutputValue +from apache_beam.runners.dataflow.native_io.iobase import NativeSource from apache_beam.transforms import CombinePerKey from apache_beam.transforms import Create from apache_beam.transforms import FlatMap